Kubebuilder实战

2023年 9月 1日 52.9k 0

Kubebuilder实战

学习kubebuilder之前可以先了解一些基本概念有助于后续开发,参考连接:juejin.cn/post/696287…

本篇为juejin.cn/post/709935…该文的实践版本,成果复现了文中的项目并且排除了一项bug,和多处细节问题

准备环境

  • go version v1.15+.
  • docker version 17.03+.
  • kubenetes version v1.14.3+.
  • kustomize v3.1.0+ (推荐本地安装,此处有坑后续详细说明)
  • kubebuilder version v3.4.0

项目介绍

假设一个Nginx的QPS(服务器一秒内处理的请求数)上限为500,如果外部访问的QPS达到了600,为了保证服务质量,必须扩容一个Nginx来分摊请求。

在Kubernetes环境中,如果外部请求超过了单个Pod的处理极限,我们则可以增加Pod数量来达到横向扩容的目的。

假设我们的服务是无状态服务,我们来利用kubebuilder来开发一个operator,来模拟我们已上所述的场景。

项目初始化

在开发 Operator 之前我们需要先提前想好我们的 CRD 资源对象,比如我们想要通过下面的 CR 资源来创建我们的Operator :

apiVersion: elasticweb.example.com/v1
kind: ElasticWeb
metadata:
  name: elasticweb-sample
  namespace: dev
spec:
  image: nginx:1.17.1  # 镜像
  port: 30003          # 外部访问的端口
  singlePodsQPS: 800   # 单个 Pod 的 QPS
  totalQPS: 2400       # 总 QPS
  • 使用脚手架初始化项目

    mkdir app-operator && cd app-operator
    go mod init app-operator
    kubebuilder init --domain example.com
    ###以下为打印执行结果
    kubebuilder init --domain example.com
    Writing kustomize manifests for you to edit...
    Writing scaffold for you to edit...
    ...
    
  • 创建API

    #脚手架创建完成后,然后定义资源API:
    kubebuilder create api --group elasticweb --version v1 --kind Elasticweb #此处原文为EL,与后文的使用结构体不一致,记得修改!!!
    #打印内容
    asticWeb
    Create Resource [y/n]
    y
    Create Controller [y/n]
    y
    Writing kustomize manifests for you to edit...
    Writing scaffold for you to edit...
    ...
    
  • 代码结构

    tree -L 2
    .
    ├── api
    │   └── v1
    ├── bin
    │   ├── controller-gen
    │   ├── kustomize
    │   ├── manager
    │   └── setup-envtest
    ├── config
    │   ├── crd
    │   ├── default
    │   ├── manager
    │   ├── prometheus
    │   ├── rbac
    │   └── samples
    ├── controllers
    │   ├── elasticweb_controller.go
    │   ├── resource.go
    │   └── suite_test.go
    ├── cover.out
    ├── Dockerfile
    ├── go.mod
    ├── go.sum
    ├── hack
    │   └── boilerplate.go.txt
    ├── main.go
    ├── Makefile
    ├── PROJECT
    └── README.md
    
  • 然后根据我们上面设计的 ElasticWeb 这个对象来编辑 Operator 的结构体即可,修改文件 api/v1/elasticweb_types.go 中的 ElasticWebSpec 结构体以及ElasticWebStatus结构体,ElasticWebStatus结构体主要用来记录当前集群实际支持的总QPS:

    // api/v1/elasticweb_types.go
    ​
    type ElasticWebSpec struct {
        Image string `json:"image"`
        Port  *int32 `json:"port"`
        // 单个pod的QPS上限
        SinglePodsQPS *int32 `json:"singlePodsQPS"`
        // 当前整个业务的QPS
        TotalQPS *int32 `json:"totalQPS,omitempty"`
    }
    ​
    type ElasticWebStatus struct {
        // 当前 Kubernetes 集群实际支持的总QPS
        RealQPS *int32 `json:"realQPS"`
    }
    
  • 为了打印的日志方便我们阅读,我们给ElasticWeb添加一个String方法:

    // api/v1/elasticweb_types.go
    ​
    func (e *ElasticWeb) String() string {
        var realQPS string
        if nil == e.Status.RealQPS {
            realQPS = ""
        } else {
            realQPS = strconv.Itoa(int(*e.Status.RealQPS))
        }
    ​
        return fmt.Sprintf("Image [%s], Port [%d], SinglePodQPS [%d], TotalQPS [%d], RealQPS [%s]",
            e.Spec.Image,
            *e.Spec.Port,
            *e.Spec.SinglePodsQPS,
            *e.Spec.TotalQPS,
            realQPS)
    }
    
  • 要注意每次修改完成需要执行make命令重新生成代码:

    make
    #打印结果
    make
    /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
    go fmt ./...
    api/v1/elasticweb_types.go
    go vet ./...
    go build -o bin/manager main.go
    

业务逻辑

  • 首先在目录 controllers 下面创建一个 resource.go文件,用来根据我们的ElasticWeb对象生成对应的deploymentservice以及更新状态。下面的业务逻辑代码主要是实现创建deploy和svc以及对应的更新。

    //此处业务逻辑存在bug,目前没有修复,提供了对应位置的说明,出现的原因如下
    //当k8s集群不支持SCTP协议时,程序创建deploy失效,并按照reconcile逻辑再次创建,这时会陷入svc创建端口占用的死循环中
    //目前感觉比较好的解决方法为svc与deploy分别判断然后分别创建,然后可以多加一步协议判断和日志记录,后续持续更新
    package controllers
    ​
    import (
        v1 "app-operator/api/v1"
        "context"
        "fmt"
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/resource"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/utils/pointer"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
        "sigs.k8s.io/controller-runtime/pkg/log"
    )
    ​
    var (
        ElasticWebCommonLabelKey = "app"
    )
    ​
    const (
        // APP_NAME deployment 中 App 标签名
        APP_NAME = "elastic-app"
        // CONTAINER_PORT 容器的端口号
        // 这里有个坑点,原本想着可以自定义端口,但是最后发现访问不了
        // 所以这句可有可无,Nginx的默认http端口不就80么(狗头保命)
        CONTAINER_PORT = 80
        // CPU_REQUEST 单个POD的CPU资源申请
        CPU_REQUEST = "100m"
        // CPU_LIMIT 单个POD的CPU资源上限
        CPU_LIMIT = "100m"
        // MEM_REQUEST 单个POD的内存资源申请
        MEM_REQUEST = "512Mi"
        // MEM_LIMIT 单个POD的内存资源上限
        MEM_LIMIT = "512Mi"
    )
    ​
    // 根据总QPS以及单个POD的QPS,计算需要多少个Pod
    func getExpectReplicas(elasticWeb *v1.ElasticWeb) int32 {
        // 单个pod的QPS
        singlePodQPS := *elasticWeb.Spec.SinglePodsQPS
        // 期望的总QPS
        totalQPS := *elasticWeb.Spec.TotalQPS
        // 需要创建的副本数
        replicas := totalQPS / singlePodQPS
    ​
        if totalQPS%singlePodQPS != 0 {
            replicas += 1
        }
        return replicas
    }
    ​
    // CreateServiceIfNotExists  创建service
    func CreateServiceIfNotExists(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb, req ctrl.Request) error {
        logger := log.FromContext(ctx)
        logger.WithValues("func", "createService")
        svc := &corev1.Service{}
    ​
        svc.Name = elasticWeb.Name
        svc.Namespace = elasticWeb.Namespace
    ​
        svc.Spec = corev1.ServiceSpec{
            Ports: []corev1.ServicePort{
                {
                    Name:     "http",
                    Port:     CONTAINER_PORT,
                    NodePort: *elasticWeb.Spec.Port,
                },
            },
            Type: corev1.ServiceTypeNodePort,
            Selector: map[string]string{
                ElasticWebCommonLabelKey: APP_NAME,
            },
        }
    ​
        // 设置关联关系
        logger.Info("set reference")
        if err := controllerutil.SetControllerReference(elasticWeb, svc, r.Scheme); err != nil {
            logger.Error(err, "SetControllerReference error")
            return err
        }
    ​
        logger.Info("start create service")
        if err := r.Create(ctx, svc); err != nil {
            logger.Error(err, "create service error")
            return err
        }
    ​
        return nil
    }
    ​
    // CreateDeployment 创建deployment
    func CreateDeployment(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb) error {
        logger := log.FromContext(ctx)
        logger.WithValues("func", "createDeploy")
    ​
        // 计算期待pod的数量
        expectReplicas := getExpectReplicas(elasticWeb)
        logger.Info(fmt.Sprintf("expectReplicas [%d]", expectReplicas))
    ​
        deploy := &appsv1.Deployment{}
    ​
        deploy.Labels = map[string]string{
            ElasticWebCommonLabelKey: APP_NAME,
        }
    ​
        deploy.Name = elasticWeb.Name
        deploy.Namespace = elasticWeb.Namespace
    ​
        deploy.Spec = appsv1.DeploymentSpec{
            Replicas: pointer.Int32Ptr(expectReplicas),
            Selector: &metav1.LabelSelector{
                MatchLabels: map[string]string{
                    ElasticWebCommonLabelKey: APP_NAME,
                },
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        ElasticWebCommonLabelKey: APP_NAME,
                    },
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  APP_NAME,
                            Image: elasticWeb.Spec.Image,
                            Ports: []corev1.ContainerPort{
                                {
                                    Name:          "http",
                                    ContainerPort: CONTAINER_PORT,
                                    Protocol:      corev1.ProtocolTCP,
                                },
                            },
                            Resources: corev1.ResourceRequirements{
                                Limits: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(CPU_LIMIT),
                                    corev1.ResourceMemory: resource.MustParse(MEM_LIMIT),
                                },
                                Requests: corev1.ResourceList{
                                    corev1.ResourceCPU:    resource.MustParse(CPU_REQUEST),
                                    corev1.ResourceMemory: resource.MustParse(MEM_REQUEST),
                                },
                            },
                        },
                    },
                },
            },
        }
    ​
        // 建立关联,删除web后会将deploy一起删除
        logger.Info("set reference")
        if err := controllerutil.SetControllerReference(elasticWeb, deploy, r.Scheme); err != nil {
            logger.Error(err, "SetControllerReference error")
            return err
        }
    ​
        // 创建Deployment
        logger.Info("start create deploy")
        if err := r.Create(ctx, deploy); err != nil {
            logger.Error(err, "create deploy error")
            return err
        }
    ​
        logger.Info("create deploy success")
        return nil
    }
    ​
    func UpdateStatus(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb) error {
        logger := log.FromContext(ctx)
        logger.WithValues("func", "updateStatus")
    ​
        // 单个pod的QPS
        singlePodQPS := *elasticWeb.Spec.SinglePodsQPS
    ​
        // pod 总数
        replicas := getExpectReplicas(elasticWeb)
    ​
        // 当pod创建完成后,当前系统的QPS为: 单个pod的QPS * pod总数
        // 如果没有初始化,则需要先初始化
        if nil == elasticWeb.Status.RealQPS {
            elasticWeb.Status.RealQPS = new(int32)
        }
    ​
        *elasticWeb.Status.RealQPS = singlePodQPS * replicas
        logger.Info(fmt.Sprintf("singlePodQPS [%d],replicas [%d],realQPS[%d]", singlePodQPS, replicas, *elasticWeb.Status.RealQPS))
    ​
        if err := r.Update(ctx, elasticWeb); err != nil {
            logger.Error(err, "update instance error")
            return err
        }
        return nil
    }
    
  • 构造完成后,当我们创建 ElasticWeb 的时候就可以在控制器的 Reconcile 函数中去进行逻辑处理了。

    // controllers/elasticweb_controller.go
    ​
    //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs,verbs=get;list;watch;create;update;patch;delete
    //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs/status,verbs=get;update;patch
    //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs/finalizers,verbs=update
    //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //新增注释,使用时删除本注释
    //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete //新增注释,使用时删除本注释
    ​
    func (r *ElasticWebReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
        logger := log.FromContext(ctx)
    ​
        instance := &elasticwebv1.ElasticWeb{}
    ​
        if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
            return ctrl.Result{}, client.IgnoreNotFound(err)
        }
    ​
        logger.Info(fmt.Sprintf("instance:%s", instance.String()))
    ​
        // 获取deployment
        deploy := &appsv1.Deployment{}
        if err := r.Get(ctx, req.NamespacedName, deploy); err != nil {
            if errors.IsNotFound(err) {
                // 如果没有查找到,则需要创建
                logger.Info("deploy not exists")
                // 判断qps的需求,如果qps没有需求,则啥都不做
                if *instance.Spec.TotalQPS < 1 {
                    logger.Info("not need deployment")
                    return ctrl.Result{}, nil
                }
    ​
                // 创建service
                if err = CreateServiceIfNotExists(ctx, r, instance, req); err != nil {
                    return ctrl.Result{}, err
                }
    ​
                // 创建Deploy
                if err := CreateDeployment(ctx, r, instance); err != nil {
                    return ctrl.Result{}, err
                }
    ​
                // 更新状态
                if err := UpdateStatus(ctx, r, instance); err != nil {
                    return ctrl.Result{}, err
                }
    ​
                return ctrl.Result{}, nil
            }
            logger.Error(err, "failed to get deploy")
            return ctrl.Result{}, err
        }
    ​
        // 根据单个Pod的QPS计算期望pod的副本
        expectReplicas := getExpectReplicas(instance)
    ​
        // 获取当前deployment实际的pod副本
        realReplicas := deploy.Spec.Replicas
    ​
        if expectReplicas == *realReplicas {
            logger.Info("not need to reconcile")
            return ctrl.Result{}, nil
        }
    ​
        // 重新赋值
        deploy.Spec.Replicas = &expectReplicas
        // 更新 deploy
        if err := r.Update(ctx, deploy); err != nil {
            logger.Error(err, "update deploy replicas error")
            return ctrl.Result{}, err
        }
    ​
        // 更新状态
        if err := UpdateStatus(ctx, r, instance); err != nil {
            logger.Error(err, "update status error")
            return ctrl.Result{}, err
        }
    ​
        return ctrl.Result{}, nil
    }
    

调试

  • 首先安装CRD对象

    make install
    /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
    /Users/Christian/Documents/code/negan/app-operator/bin/kustomize build config/crd | kubectl apply -f -
    customresourcedefinition.apiextensions.k8s.io/elasticwebs.elasticweb.example.com configured
    
  • 然后运行控制器

    $ make run
    /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
    /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
    go fmt ./...
    controllers/elasticweb_controller.go
    go vet ./...
    go run ./main.go
    1.652941435373431e+09   INFO    controller-runtime.metrics      Metrics server is starting to listen    {"addr": ":8080"}
    1.6529414353737469e+09  INFO    setup   starting manager
    1.6529414353739378e+09  INFO    Starting server {"path": "/metrics", "kind": "metrics", "addr": "[::]:8080"}
    1.652941435373951e+09   INFO    Starting server {"kind": "health probe", "addr": "[::]:8081"}
    1.6529414353741682e+09  INFO    controller.elasticweb   Starting EventSource    {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "source": "kind source: *v1.ElasticWeb"}
    1.652941435374196e+09   INFO    controller.elasticweb   Starting EventSource    {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "source": "kind source: *v1.Deployment"}
    1.652941435374202e+09   INFO    controller.elasticweb   Starting Controller     {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb"}
    1.65294143547575e+09    INFO    controller.elasticweb   Starting workers        {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "worker count": 1}
    
  • 创建CR

    apiVersion: elasticweb.example.com/v1
    kind: ElasticWeb
    metadata:
      name: elasticweb-sample
    spec:
      image: nginx:1.17.1
      port: 30003
      singlePodsQPS: 800
      totalQPS: 2400
    ​
    
  • 开启另一个终端创建资源

    $ kubectl apply -f config/samples/elasticweb_v1_elasticweb.yaml
    elasticweb.elasticweb.example.com/elasticweb-sample created
    

优化

现在我们需要对Deploy进行Watch,Service是的创建包含在创建Deploy的逻辑里,所以Deploy出现变化,我们需要重新进行调谐。当然我们只需要Watch被ElasticWeb控制的这部分独享即可。本质上就是该对象的onwerRef添加为deploy

func (r *ElasticWebReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&elasticwebv1.ElasticWeb{}).
        Owns(&appsv1.Deployment{}). //新增部分
        Complete(r)
}

一些额外的功能按照原文教程可以正常运行,整体说一下项目的坑点

  • kustomize 可以选择先不安装,后续项目make的时候会有一个可以自动执行的yaml,kustomization.yaml,如果自动执行不了,再下载kustomize,并将对应的可执行文件加入到项目的bin目录中,即可正常运行。
  • 运行controller时可以采取debug模式,个人情况是使用的goland的remotedevelopment,可以远程debug,注意配置一下文件,要先run两行go代码,如下图所示,如果你的k8s也无法支持SCTP协议,那么恭喜会陷入前面提到的bug,临时的解决方案是修改deploy创建中关于协议的配置为TCP,然后项目的代码就可以正常运行了,但是项目的bug依旧存在死循环的可能,详细了解请翻阅关于业务逻辑处reconcile部分的注释说明。
  • 在部署时会遇到本项目的第三个bug,一个经典的外网镜像下载问题,Dockerfile中依赖一个外网镜像gcr.io/distroless/static:nonroot,这里我的解决方式参考如下连接www.jianshu.com/p/25b253176…,和文章中不同的是需要注册一个账号,新开账号可以免费试用,但请注意验证的邮箱至少为gmail等支持的邮箱。
  • 参考链接:juejin.cn/post/709935… 来源:稀土掘金

    相关文章

    KubeSphere 部署向量数据库 Milvus 实战指南
    探索 Kubernetes 持久化存储之 Longhorn 初窥门径
    征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
    那些年在 Terraform 上吃到的糖和踩过的坑
    无需 Kubernetes 测试 Kubernetes 网络实现
    Kubernetes v1.31 中的移除和主要变更

    发布评论