本文分析 k8s controller 中 informer 启动的基本流程
不论是 k8s 自身组件,还是自己编写 controller,都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑。
如何高效可靠进行事件监听,k8s 客户端工具包 client-go 提供了一个通用的 informer 包,通过 informer,可以方便和高效的进行 controller 开发。
informer 包提供了如下的一些功能:
1、本地缓存(store)
2、索引机制(indexer)
3、Handler 注册功能(eventHandler)
1、informer 架构
整个 informer 机制架构如下图(图片源自 Client-go):
图片
可以看到这张图分为上下两个部分,上半部分由 client-go 提供,下半部分则是需要自己实现的控制循环逻辑
本文主要分析上半部分的逻辑,包括下面几个组件:
1.1、Reflector:
从图上可以看到 Reflector 是一个和 apiserver 交互的组件,通过 list 和 watch api 将资源对象压入队列
1.2、DeltaFifo:
DeltaFifo的结构体示意如下:
type DeltaFIFO struct {
...
// We depend on the property that items in the s et are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
...
}
主要分为两部分,fifo 和 delta
(1)fifo:先进先出队列
对应结构体中的 queue,结构体示例如下:
[default/centos-fd77b5886-pfrgn, xxx, xxx]
(2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个 map,结构体示例如下:
map:{"default/centos-fd77b5886-pfrgn":[{Replaced &Pod{ObjectMeta: ${pod参数}], "xxx": [{},{}]}
消费者从 queue 中 pop 出对象进行消费,并从 items 获取具体的消费操作(执行动作 Update/Deleted/Sync,和执行的对象 object spec)
1.3、Indexer:
client-go 用来存储资源对象并自带索引功能的本地存储,deltaFIFO 中 pop 出的对象将存储到 Indexer。
indexer 与 etcd 集群中的数据保持一致,从而 client-go 可以直接从本地缓存获取资源对象,减少 apiserver 和 etcd 集群的压力。
2、一个基本例子
func main() {
stopCh := make(chan struct{})
defer close(stopCh)
// (1)New a k8s clientset
masterUrl := "172.27.32.110:8080"
config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")
if err != nil {
klog.Errorf("BuildConfigFromFlags err, err: %v", err)
}
clientset, err := k.NewForConfig(config)
if err != nil {
klog.Errorf("Get clientset err, err: %v", err)
}
// (2)New a sharedInformers factory
sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
// (3)Register a informer
// f.informers[informerType] = informer,
// the detail for informer is build in NewFilteredPodInformer()
podInformer := sharedInformers.Core().V1().Pods().Informer()
// (4)Register event handler
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
klog.Infof("Get new obj: %v", mObj)
klog.Infof("Get new obj name: %s", mObj.GetName())
},
})
// (5)Start all informers
sharedInformers.Start(stopCh)
// (6)A cronjob for cache sync
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
klog.Infof("Cache sync fail!")
}
// (7)Use lister
podLister := sharedInformers.Core().V1().Pods().Lister()
pods, err := podLister.List(labels.Everything())
if err != nil {
klog.Infof("err: %v", err)
}
klog.Infof("len(pods), %d", len(pods))
for _, v := range pods {
klog.Infof("pod: %s", v.Name)
}