Django 通过k8s的api接口实现查看以及远程ssh容器

2023年 7月 9日 61.4k 0

基本思路,django编写页面,通过api调用获取kubernetes的部署情况,列出pod信息,然后根据pod信息埋点url,点击跳转到ssh docker的页面,通过嵌套 https://github.com/kubernetes-ui/container-terminal   实现登录到docker。

具体代码片段

django model 设计:

from django.db import models

# Create your models here.

class K8sHost(models.Model):
    k8s_api = models.CharField(max_length=64, verbose_name=u"k8s连接地址",unique=True,help_text="例如 https://10.255.56.250:6443")
    k8s_api_token = models.TextField(verbose_name=u'k8s连接token',help_text="参考 apiserver token")
    k8s_ws = models.CharField(max_length=64, verbose_name=u"k8s连接地址",unique=True,help_text="例如 ws://10.255.56.250:8080")
    k8s_ws_token = models.CharField(max_length=255,verbose_name=u'k8s websoket连接token',help_text="参考 ws token")
    k8s_name = models.CharField(max_length=255,verbose_name='k8s集群名称',default='default',help_text="给k8s起个名")

    def __str__(self):
        return self.k8s_name

    class Meta:
        verbose_name = "k8s配置"
        verbose_name_plural = verbose_name


class K8sexec(K8sHost):
    class Meta:
        verbose_name = "k8s管理"
        verbose_name_plural = verbose_name
        proxy = True

Django view 设计:

from django.shortcuts import render,render_to_response
from django.contrib.auth.decorators import login_required

# Create your views here.

import json
from django.http import HttpResponse
from k8sapp.utils import K8sApi
from .models import K8sHost

def getnamespacelist(request):
    if request.method == 'GET':
        k8s_apiport = str(request.GET.get('k8s_apiport',None)).strip()
        message = []
        if k8s_apiport:
            k8s_client = K8sApi(confid=int(k8s_apiport))
            namespaces_list = k8s_client.get_namespacelist()
            for item in namespaces_list.items:
                message.append(item.metadata.name)
        return HttpResponse (json.dumps({'message': message}, ensure_ascii=False),
                      content_type="application/json,")
    else:
        return HttpResponse (json.dumps ({'message': "不允许POST请求"}, ensure_ascii=False),
                             content_type="application/json,charset=utf-8")


def getpodlist(request):
    if request.method == "POST":
        messages = ''
        k8s_apiport = str(request.POST.get('k8s_apiport',None)).strip()
        namespace = str(request.POST.get('k8s_namespaces',None)).strip()
        if k8s_apiport and namespace:
            k8s_client = K8sApi(confid=int(k8s_apiport))
            pods_list = k8s_client.get_podlist(namespace=namespace)
            for items in pods_list.items:
                pod_name = items.metadata.name
                pod_namespace = items.metadata.namespace
                pod_creatime = items.metadata.creation_timestamp
                host_ip = items.status.host_ip
                pod_ip = items.status.pod_ip
                messages += '''
                
                
                
                
                %s
                %s
                %s
                %s
                %s
                ''' %(pod_namespace,k8s_apiport,pod_name,pod_name,pod_ip,host_ip,pod_namespace,pod_creatime)
        return HttpResponse(json.dumps({'message': messages}, ensure_ascii=False),
                         content_type="application/json,charset=utf-8")

    else:
        return HttpResponse(json.dumps({'message': "不允许GET请求"}, ensure_ascii=False),
                         content_type="application/json,charset=utf-8")


@login_required()
def connectpod(request):
    if request.method == "GET":
        namespace = str(request.GET.get ('k8s_namespaces', None)).strip()
        k8s_apiport = str(request.GET.get('k8s_apiport', None)).strip()
        pod_name = str(request.GET.get ('k8s_pod', None)).strip()
        token = str(K8sHost.objects.values ('k8s_ws_token').get(id=k8s_apiport)['k8s_ws_token']).strip()
        k8s_url = K8sHost.objects.values('k8s_ws').get(id=k8s_apiport)['k8s_ws']
        return render_to_response ('xtem_pod.html', {'status': 'ok', 'namespace': namespace, 'k8s_url': k8s_url,
                                                     'pod_name': pod_name,'token':token})
    else:
        return render_to_response ('xtem_pod.html', {'status': 'error'})

def podexec(request):
    if request.method == "POST":
        namespace = str(request.POST.get('k8s_namespaces', None)).strip()
        k8s_apiport = str(request.POST.get('k8s_apiport', None)).strip()
        pod_name = str(request.POST.get('k8s_pod',None)).strip()
        command = str(request.POST.get('command', None)).strip ()
        k8s_client = K8sApi(confid=int(k8s_apiport))
        rest = k8s_client.get_pods_exec(podname=pod_name,namespace=namespace,command=command)
    else:
        rest = '不允许除POST之外的任何访问.'
    return HttpResponse(json.dumps({'message': rest}, ensure_ascii=False),content_type="application/json,charset=utf-8")

django k8s接口调用插件:

from kubernetes import client, config
from kubernetes.stream import stream
from .models import K8sHost
# Create a configuration object

class K8sApi:

    def __init__(self,confid):
        self.confid = confid


    def get_client(self):
        baseurl = K8sHost.objects.values('k8s_api').get(id=self.confid)['k8s_api']
        token = str(K8sHost.objects.values('k8s_api_token').get(id=self.confid)['k8s_api_token']).strip()
        aConfiguration = client.Configuration()
        aConfiguration.host = baseurl
        aConfiguration.verify_ssl = False
        aConfiguration.api_key = {"authorization": "Bearer " + token}
        aApiClient = client.ApiClient (aConfiguration)
        v1 = client.CoreV1Api(aApiClient)
        return v1


    def get_podlist(self,namespace):
        client_v1 = self.get_client()
        ret_pod = client_v1.list_namespaced_pod(namespace=namespace)
        return ret_pod


    def get_namespacelist(self):
        client_v1 = self.get_client()
        ret_namespace = client_v1.list_namespace()
        return ret_namespace


    def test_pods_connect(self,podname,namespace,command,container=None):
        client_v1 = self.get_client()
        if stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
                container=container,
                stderr=True, stdin=False,
                stdout=True, tty=False):
            return True
        else:
            return False


    def get_pods_exec(self,podname,namespace,command,container=None):
        client_v1 = self.get_client()
        if container:
            rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
                          container=container,
                          stderr=True, stdin=False,
                          stdout=True, tty=False)
        else:
            rest = stream(client_v1.connect_get_namespaced_pod_exec, podname, namespace, command=command,
                           stderr=True, stdin=False,
                           stdout=True, tty=False)
        return rest

容器连接页面设计:



    
    {{ namespace }} - {{ pod_name }}
    
    
        * {
            box-sizing: border-box;
        }

        body {
            margin: 20px !important;
            font-family: sans-serif;
        }

        label {
            display: block !important;
        }

        label span {
            float: left;
            width: 100px;
            margin-top: 2px;
        }

        label .form-control {
            display: inline !important;
            width: 300px;
        }

        body > div {
            margin-top: 15px;
        }

        kubernetes-container-terminal {

        }
    
    
    
    
    
   
   
    
    
    



    命名空间
    : {{ namespace }} 连接容器名: {{ pod_name }}


    
    

    angular.module('exampleApp', ['kubernetesUI'])
        .config(function (kubernetesContainerSocketProvider) {
            kubernetesContainerSocketProvider.WebSocketFactory = "CustomWebSockets";
        })

        .run(function ($rootScope) {
            $rootScope.baseUrl = "{{ k8s_url }}";
            $rootScope.selfLink = "/api/v1/namespaces/{{ namespace }}/pods/{{ pod_name }}";
            $rootScope.containerName = "";
            $rootScope.accessToken = "{{ token }}";
            $rootScope.preventSocket = true;
        })
        /* Our custom WebSocket factory adapts the url */
        .factory("CustomWebSockets", function ($rootScope) {
            return function CustomWebSocket(url, protocols) {
                url = $rootScope.baseUrl + url;
                if ($rootScope.accessToken)
                    url += "&access_token=" + $rootScope.accessToken;
                return new WebSocket(url, protocols);
            };
        });




实现效果:

相关文章

KubeSphere 部署向量数据库 Milvus 实战指南
探索 Kubernetes 持久化存储之 Longhorn 初窥门径
征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
那些年在 Terraform 上吃到的糖和踩过的坑
无需 Kubernetes 测试 Kubernetes 网络实现
Kubernetes v1.31 中的移除和主要变更

发布评论