Kubernetes clientgo 源码分析 workqueue

2023年 7月 9日 45.7k 0

概述

源码版本信息

  • Project: kubernetes
  • Branch: master
  • Last commit id: d25d741c
  • Date: 2021-09-26

自定义控制器涉及到的 client-go 组件整体工作流程,大致如下图:

今天我们来详细研究下 workqueue 相关代码。client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列,延时队列,限速队列,后一个队列以前一个队列的实现为基础,层层添加新功能,我们按照 Queue、DelayingQueue、RateLimitingQueue 的顺序层层拨开来看限速队列是如何实现的。

Queue

接口和结构体

先看接口定义:

  • k8s.io/client-go/util/workqueue/queue.go:26
type Interface interface {
   Add(item interface{})  // 添加一个元素
   Len() int              // 元素个数
   Get() (item interface{}, shutdown bool) // 获取一个元素,第二个返回值和 channel 类似,标记队列是否关闭了
   Done(item interface{}) // 标记一个元素已经处理完
   ShutDown()             // 关闭队列
   ShuttingDown() bool    // 是否正在关闭
}

这个基础的队列接口定义很清晰,我们继续来看其实现的类型:

type Type struct {
   queue []t            // 定义元素的处理顺序,里面所有元素都应该在 dirty set 中有,而不能出现在 processing set 中
   dirty set            // 标记所有需要被处理的元素
   processing set       // 当前正在被处理的元素,当处理完后需要检查该元素是否在 dirty set 中,如果有则添加到 queue 里

   cond *sync.Cond      // 条件锁
   shuttingDown bool    // 是否正在关闭
   metrics queueMetrics
   unfinishedWorkUpdatePeriod time.Duration
   clock                      clock.Clock
}

Queue 的工作逻辑大致是这样,里面的三个属性 queue、dirty、processing 都保存 items,但是含义有所不同:

  • queue:这是一个 []t 类型,也就是一个切片,因为其有序,所以这里当作一个列表来存储 item 的处理顺序。
  • dirty:这是一个 set 类型,也就是一个集合,这个集合存储的是所有需要处理的 item,这些 item 也会保存在 queue 中,但是 set 里是无需的,set 的特性是唯一。
  • processing:这也是一个 set,存放的是当前正在处理的 item,也就是说这个 item 来自 queue 出队的元素,同时这个元素会被从 dirty 中删除。

下面分别介绍 set 类型和 Queue 接口的集合核心方法的实现。

set

上面提到的 dirty 和 processing 字段都是 set 类型,set 相关定义如下:

type empty struct{}
type t interface{}
type set map[t]empty

func (s set) has(item t) bool {
   _, exists := s[item]
   return exists
}

func (s set) insert(item t) {
   s[item] = empty{}
}

func (s set) delete(item t) {
   delete(s, item)
}

set 是一个空接口到空结构体的 map,也就是实现了一个集合的功能,集合元素是 interface{} 类型,也就是可以存储任意类型。而 map 的 value 是 struct{} 类型,也就是空。这里利用 map 的 key 唯一的特性实现了一个集合类型,附带三个方法 has()insert()delete() 来实现集合相关操作。

Add()

Add() 方法用于标记一个 item 需要被处理,代码如下:

func (q *Type) Add(item interface{}) {
   q.cond.L.Lock()
   defer q.cond.L.Unlock()
   if q.shuttingDown { // 如果 queue 正在被关闭,则返回
      return
   }
   if q.dirty.has(item) { // 如果 dirty set 中已经有了该 item,则返回
      return
   }

   q.metrics.add(item)

   q.dirty.insert(item) // 添加到 dirty set 中
   if q.processing.has(item) { // 如果正在被处理,则返回
      return
   }

   q.queue = append(q.queue, item) // 如果没有正在处理,则加到 q.queue 中
   q.cond.Signal() // 通知某个 getter 有新 item 到来
}

Get()

func (q *Type) Get() (item interface{}, shutdown bool) {
   q.cond.L.Lock()
   defer q.cond.L.Unlock()
   for len(q.queue) == 0 && !q.shuttingDown { // 如果 q.queue 为空,并且没有正在关闭,则等待下一个 item 的到来
      q.cond.Wait()
   }
   if len(q.queue) == 0 { // 这时候如果 q.queue 长度还是 0,说明 q.shuttingDown 为 true,所以直接返回
      return nil, true
   }

   item, q.queue = q.queue[0], q.queue[1:] // 获取 q.queue 第一个元素,同时更新 q.queue

   q.metrics.get(item)

   q.processing.insert(item) // 刚才获取到的 q.queue 第一个元素放到 processing set 中
   q.dirty.delete(item) // dirty set 中删除该元素

   return item, false // 返回 item
}

Done()

func (q *Type) Done(item interface{}) {
   q.cond.L.Lock()
   defer q.cond.L.Unlock()

   q.metrics.done(item)

   q.processing.delete(item) // processing set 中删除该 item
   if q.dirty.has(item) { // 如果 dirty 中还有,说明还需要再次处理,放到 q.queue 中
      q.queue = append(q.queue, item)
      q.cond.Signal() // 通知某个 getter 有新的 item
   }
}

DelayingQueue

接口和结构体

还是先看接口定义:

  • k8s.io/client-go/util/workqueue/delaying_queue.go:30
type DelayingInterface interface {
   Interface
   // AddAfter adds an item to the workqueue after the indicated duration has passed
   AddAfter(item interface{}, duration time.Duration)
}

相比 Queue 这里只是多了一个 AddAfter(item interface{}, duration time.Duration) 方法,望文生义,也就是延时添加 item。

结构体定义:

type delayingType struct {
   Interface               // 用来嵌套普通 Queue
   clock clock.Clock       // 计时器
   stopCh chan struct{}
   stopOnce sync.Once      // 用来确保 ShutDown() 方法只执行一次
   heartbeat clock.Ticker  // 默认10s的心跳,后面用在一个大循环里,避免没有新 item 时一直卡住
   waitingForAddCh chan *waitFor  // 传递 waitFor 的 channel,默认大小 1000
   metrics retryMetrics
}

对于延时队列,我们关注的入口方法肯定就是新增的 AddAfter() 了,看这个方法的具体的逻辑前我们先看下上面提到的 waitFor 类型。

waitFor

先看下 waitFor 结构定义,代码如下:

type waitFor struct {
   data    t          // 准备添加到队列中的数据
   readyAt time.Time  // 应该被加入队列的时间
   index int          // 在 heap 中的索引
}

然后可以注意到有这样一行代码:

type waitForPriorityQueue []*waitFor

这里定义了一个 waitFor 的优先级队列,用最小堆的方式来实现,这个类型实现了 heap.Interface 接口,我们具体看下源码:

// 添加一个 item 到队列中
func (pq *waitForPriorityQueue) Push(x interface{}) {
   n := len(*pq)
   item := x.(*waitFor)
   item.index = n
   *pq = append(*pq, item) // 添加到队列的尾巴
}

// 从队列尾巴移除一个 item
func (pq *waitForPriorityQueue) Pop() interface{} {
   n := len(*pq)
   item := (*pq)[n-1]
   item.index = -1
   *pq = (*pq)[0:(n - 1)]
   return item
}

// 获取队列第一个 item
func (pq waitForPriorityQueue) Peek() interface{} {
   return pq[0]
}

NewDelayingQueue

接着看一下 DelayingQueue 相关的几个 New 函数,理解了这里的逻辑,才能继续往后面分析 AddAfter() 方法。

// 这里可以传递一个名字
func NewNamedDelayingQueue(name string) DelayingInterface {
   return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}

// 上面一个函数只是调用当前函数,附带一个名字,这里加了一个指定 clock 的能力
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
  return newDelayingQueue(clock, NewNamed(name), name) // 注意这里的 NewNamed() 函数
}

func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
   ret := &delayingType{
      Interface:       q,
      clock:           clock,
      heartbeat:       clock.NewTicker(maxWait), // 10s 一次心跳
      stopCh:          make(chan struct{}),
      waitingForAddCh: make(chan *waitFor, 1000),
      metrics:         newRetryMetrics(name),
   }

   go ret.waitingLoop() // 留意这里的函数调用
   return ret
}

上面涉及到两个细节:

  • NewNamed(name)
  • go ret.waitingLoop()

NewNamed() 函数用于创建一个前面提到的 Queue 的对应类型 Type 对象,这个值被传递给了 newDelayingQueue() 函数,进而赋值给了 delayingType{} 对象的 Interface 字段,于是后面 delayingType 类型才能直接调用 Type 类型实现的方法。

func NewNamed(name string) *Type {
   rc := clock.RealClock{}
   return newQueue(
      rc,
      globalMetricsFactory.newQueueMetrics(name, rc),
      defaultUnfinishedWorkUpdatePeriod,
   )
}

waitingLoop() 方法逻辑不少,我们单独放到下面一个小节。

waitingLoop()

这个方法是实现延时队列的核心逻辑所在:

func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// 队列里没有 item 时实现等待用的
never := make( 0 {
// 获取第一个 item
entry := waitingForQueue.Peek().(*waitFor)
// 时间还没到,先不处理
if entry.readyAt.After(now) {
break
}
// 时间到了,pop 出第一个元素;注意 waitingForQueue.Pop() 是最后一个 item,heap.Pop() 是第一个元素
entry = heap.Pop(waitingForQueue).(*waitFor)
// 将数据加到延时队列里
q.Add(entry.data)
// map 里删除已经加到延时队列的 item
delete(waitingEntryByData, entry.data)
}

// 如果队列中有 item,就用第一个 item 的等待时间初始化计时器,如果为空则一直等待
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}

select {
case

相关文章

KubeSphere 部署向量数据库 Milvus 实战指南
探索 Kubernetes 持久化存储之 Longhorn 初窥门径
征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
那些年在 Terraform 上吃到的糖和踩过的坑
无需 Kubernetes 测试 Kubernetes 网络实现
Kubernetes v1.31 中的移除和主要变更

发布评论