kubeapiserver源码剖析与开发(八):自定义资源控制器开发(二)

2023年 8月 13日 28.5k 0

上一篇文章我们讲了控制器的主要原理,这一篇我们就来说说使用 kubebuilder 开发一个自定义的控制器。

kubebuilder 是什么

kubebuilder 是开发自定义控制器的脚手架工具,能给我们搭建好控制器的整个骨架,我们只需要专心编写控制(调谐)逻辑即可,大大方便了控制器的开发流程。

kubebuilder 为我们做了什么

我们还是看这张控制器原理图

pP9ZSzt.png

上半部分是 client-go 实现的 informer 机制,上篇我们已经说明了其工作原理,这里我们不再赘述,我们看下半部分。

当 watch 的资源有事件(增、删、改)发生时,上半部分 informer 会调用事先注册在 informer 上的事件处理函数(Resource Event Handlers),这个事件处理函数是 kubebuilder 帮我们实现的,而且 informer 的创建和事件处理函数注册也是 kubebuilder 完成的。

这个事件处理函数一般会被实现为一个队列的入队操作,言下之意的就是,当有事件发生时,事件处理函数会把事件放入队列中,等待其他协程来队列获取事件(出队操作),进而进行后续的调谐。

图中的 Workequeue 是一个队列,是由 client-go 实现的, 由 kubebuilder 初始化创建,用户也不需要关心。

kubuilder 会起一个协程从 Workequeue 中取事件,也就是图中 Process Item 做的事。Process Item 是一个无限循环,如果队列中没有数据可以读取,就会一直阻塞,直到获取到一个数据(也就是事件),取到数据后,会新启动一个协程,开始调用我们自己编写的调谐函数,这个调谐函数也是在 kubebuilder 在初始化的时候注册的。

最后就是 Handle Object 了,对事件进行处理,也就是开始调谐,将现状调谐成预期状态,也就是我们要写程序的地方。

所以在整个下半部分,我们唯一要做的就是编写调谐函数。

那么我们现在开始使用 kubebuilder 开始开发一个自定义控制器。

我们先构造一个场景:假设我们的集群中会用到多种机型,每种机型有不同的用处,可以通过 deployment 的 nodeSelector 来选择 Pod 所要运行的带有某些标签的机器,这些机器上的标签需要我们在部署或者扩容后手动给机器打标签,如果有一个控制器,当有机器扩容后,我们去 CMDB 查询该机器的机型,然后根据 CR (用户自定义资源)中的定义(比如什么机型可以打某些标签),自动给机器打标签;又或者当 CR 发生变化时,如我们在 CR 中新增了某种机型可以打的标签,或者减少某种机型可以打的标签,那么需要给现存的机器增加或者减少标签。

下面我们就开始编写这个控制器

下载 kubebuilder

curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)" 
chmod +x kubebuilder 
sudo mv kubebuilder /usr/local/bin/

GoPath 下创建工程目录

mkdir -p autolabel

初始化工程

kubebuilder init --domain my.domain --repo my.domain/node-mgmt

创建 api

kubebuilder create api --group node-mgmt --version v1 --kind Autolabel

上面的 --domian 和 --group 两个参数组合才是真正的 group,例如这个例子中我们的group 就是 node-mgmt.my.domain。

执行完上述命令后,我们的工程就创建完了,目录结构如下图

pPAhHfJ.jpg

我们先来解释下这些文件和目录的作用

  • Dockerfile
    构建 docker 镜像
  • Makefile
    标准的 makefile 文件,可以运行 make {目标} 执行 Makefile 中对应的目标,提供了二进制编译、docker 进行编译、部署控制器到 k8s 集群中等能力。
  • api
    定义我们创建的 api,该目录下面可以有多个版本的 api定义,因为我们现在只创建了一个版本所以只有 v1。所谓的 api 定义,就是结构体,如下就是 kubebuilder 给我们创建好的 api 定义框架,我们需要根据实际情况修改这个文件,给 CRD 增加字段。一般我们只需要在 AutolabelSpec 这个结构体内加字段即可,如果你的 CRD 是需要记录状态,则在 AutolabelStatus 这个结构体中增加字段,其他不用修改。
type AutolabelSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// Foo is an example field of Autolabel. Edit autolabel_types.go to remove/update
	Foo string `json:"foo,omitempty"`
}

// AutolabelStatus defines the observed state of Autolabel
type AutolabelStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Autolabel is the Schema for the autolabels API
type Autolabel struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   AutolabelSpec   `json:"spec,omitempty"`
	Status AutolabelStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// AutolabelList contains a list of Autolabel
type AutolabelList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Autolabel `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Autolabel{}, &AutolabelList{})
}
  • bin
    存放二进制的目录,如我们执行 make build 产生的二进制文件就在该目录下。

  • cmd/main.go
    工程 main 函数入口

  • config/crd
    用户自定义资源的描述,用于在 kube-apiserver 注册 api,用于告诉 kube-apiserver 这个 api 属于哪个 group,kind 是什么,有哪些版本。这样用户在创建 cr 的时候,kube-apiserver 就能识别出这个 api 的类型。

    如果你运行 make deploy 的话,config/crd 下面的 crd 就会被安装到集群中。

  • config/rbac
    用户存放 role、rolebinding、serviceAccount 文件的,主要的作用就是用于给控制器添加访问集群权限的。默认的权限只能访问这里自定义的 API 资源,如果你需要增加集群内的其他资源的访问,需要在代码中增加注释,我们后面说。

  • internal/controller
    该目录就是我们写控制器调谐逻辑部分的文件了,需要我们自己写的代码都在该文件内。

我们先根据上面假设的场景修改一下 api 目录下 api 的定义

type AutolabelSpec struct {
	ServerTypeAndLabelsMap map[string][]string `json:"server_type_and_labels"`
}

AutolabelSpec 只有一个成员,用来存放机型需要打的标签。

然后我们开始写调谐逻辑

首先我们要感知 node 的变化,在这里我们关注 node 扩容或者 node 的标签发生变化这两种事件,只有这两种事件才有可能需要给节点打标签。但是 kubebuilder 在初始化时创建 informer 进行 list/watch 的时候,只会为你创建你定义的 API 对象的 informer,所以目前我们还无法 list/watch 其他资源。但是,kubebuilder 为我们提供了 watch 其他资源的能力,我们只需要修改下面的方法,就可以达到目的

// internal/controller/autolabel_controller.go

func (r *AutolabelReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&nodemgmtv1.Autolabel{}).
		Watches(&v1.Node{}, &handler.EnqueueRequestForObject{}).
		Complete(r)
}

在上述方法中,我们加入了 Watches 方法,表示该资源属于自定义资源的子资源,kubebuilder 也会为 Watches 参数中提供的 API 类型进行 list/watch。除了 Watches 方法外,还有 Owns 方法可以使用,他们的区别在于:Owns 参数指定的 API,其实例中的 ownerReferences 字段为 For 参数的类型时才会触发注册在 informer 上的事件处理函数,而 Watches 则没有这个限制。关于 For、Owns、Watches 具体的区别可以参考这篇博文。

好了现在我们已经可以感知 node 的变化了,下面是具体实现的代码

/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
	"context"
	"k8s.io/apimachinery/pkg/api/errors"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"strings"

	"k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/klog/v2"
	nodemgmtv1 "my.domain/node-mgmt/api/v1"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"
)

// AutolabelReconciler reconciles a Autolabel object
type AutolabelReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=node-mgmt.my.domain,resources=autolabels,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=node-mgmt.my.domain,resources=autolabels/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=node-mgmt.my.domain,resources=autolabels/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Autolabel object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (r *AutolabelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

	var node v1.Node
	var autoLabel nodemgmtv1.Autolabel
	// 因为我们watch 了 node 的变化,所以判断是哪种 API 资源类型发生变化
	//(可能是)autolabel 发生变化,也肯能是 Node
	err := r.Get(ctx, req.NamespacedName, &node, &client.GetOptions{})
	// 没有找到,可能是 node 被删除了,也有可能是变更的资源是 autolabel
	if errors.IsNotFound(err) {
		err = r.Get(ctx, req.NamespacedName, &autoLabel, &client.GetOptions{})
		// 没有找到,那么 autolabel 被删除了,我们不做任何操作
		if errors.IsNotFound(err) {
			return ctrl.Result{}, nil
		} else if err != nil {
			klog.Errorf("get autolabel obj failed: ", err.Error())
			return ctrl.Result{}, nil
		} else {
			// autolabel被修改,需要list 所有node 查看 node 上的标签是否还符合 autolabel 的标签定义
			var nodes v1.NodeList
			if err = r.List(ctx, &nodes, &client.ListOptions{}); err != nil {
				klog.Errorf("list node failed: ", err.Error())
				return ctrl.Result{}, nil
			}
			for _, node := range nodes.Items {
				serverType, ok := node.Labels["server_type"]
				if !ok {
					klog.Infof("node %s doesn't have server_type label", node.Name)
					continue
				}
				// 从 autolabel 中拿出这种机器应该打的标签
				expectedLabels, ok := autoLabel.Spec.ServerTypeAndLabelsMap[serverType]
				if !ok {
					klog.Infof("autolabel doesn't support server_type: %s", serverType)
					continue
				}
				_ = Label(r, ctx, expectedLabels, node)
			}
		}
	} else if err != nil {
		//TODO do log
		return ctrl.Result{}, nil
	} else {
		// node 被变动,查看是否有要求的标签
		// 先 list 所有 autolabel 对象,这里我们假设一个集群只能存在一个 autolabel 资源,这样不会冲突
		var autolabelList nodemgmtv1.AutolabelList
		if err = r.List(ctx, &autolabelList, &client.ListOptions{}); err != nil {
			klog.Errorf("list autolabel obj failed: %s", err.Error())
		}
		if len(autolabelList.Items) > 0 {
			al := autolabelList.Items[0]
			serverType, ok := node.Labels["server_type"]
			if !ok {
				klog.Errorf("node %s does't have server_type label", node.Name)
				return ctrl.Result{}, nil
			}
			expectedLabels := al.Spec.ServerTypeAndLabelsMap[serverType]
			_ = Label(r, ctx, expectedLabels, node)
		}
	}
	return ctrl.Result{}, nil
}

func Label(r *AutolabelReconciler, ctx context.Context, expectedLabels []string, node v1.Node) error {
	needPatch := false
	for _, l := range expectedLabels {
		k, v := strings.Split(l, ":")[0], strings.Split(l, ":")[1]
		matched := false
		for key, value := range node.Labels {
			if k == key && v == value {
				matched = true
			}
		}
		if !matched {
			node.Labels[k] = v
			needPatch = true
		}
	}
	if needPatch {
		if needPatch {
			if err := r.Patch(ctx, &node, client.MergeFrom(&node)); err != nil {
				klog.Errorf("patch node %s failed, err: ", node.Name, err.Error())
				return err
			} else {
				klog.Infof("label node %s successfully", node.Name)
				return nil
			}
		}
	}
	return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *AutolabelReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&nodemgmtv1.Autolabel{}).
		Watches(&v1.Node{}, &handler.EnqueueRequestForObject{}).
		Complete(r)
}

说一下原理:
当节点发生变化或者 autolabel 资源发生变化都可以触发调谐,所以在 Reconcile 方法中我们得识别是哪种资源发生变化。
如果是节点发生变化,则我们把 autolabel 中该机型应该打的标签都拿出来逐个和节点上比较,如果节点没有该标签,则给节点打上缺失的标签。
如果是 autolabel 资源发生变化,则把所有 node list 出来,逐个去判断节点上缺失的标签,然后给节点打上标签。

以上就是 autolabel 的基本开发原理和流程了。

相关文章

KubeSphere 部署向量数据库 Milvus 实战指南
探索 Kubernetes 持久化存储之 Longhorn 初窥门径
征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
那些年在 Terraform 上吃到的糖和踩过的坑
无需 Kubernetes 测试 Kubernetes 网络实现
Kubernetes v1.31 中的移除和主要变更

发布评论