kubelet 源码剖析(一):Node 状态同步

2024年 1月 21日 88.5k 0

kubelet 在 k8s 架构中是工作在数据面的一个核心组件,它的主要功能包括 pod 生命周期管理(pod 创建、删除、健康检查等)、node 的状态管理等,是 k8s 中最底层的“工人”。

为了能够让管理组件能够及时了解 Node 的最新情况,kubelet 需要定期将所在的 node 的情况上报给控制面组件 kube-apiserver,而 kube-controler-mgr 会监听 kube-apiserver 获取 node 的信息来判断是否需要给节点打污点、驱逐节点上的 pod 等操作,kube-scheduler 则监听 kube-apiserver 获取 node 信息用于调度 pod 时调度算法的数据来源。所以 node 的状态对于整个集群来说是至关重要的。这篇文章,我们就结合 kubelet 和 kube-controler-mgr 源码来看看,node 的状态是如何变化的,node 上的 pod 又是怎么随着 node 的状态变化而变化的。(注:本文源码基于 1.20 版本)

下图是 kubelet 上报状态的一个示意图。

pFPYYfH.png

从图中,我们可以看到 kubelet 有两条路径上报 node 的状态:

  • 续期租约:renew lease
  • 状态同步:sync node status

kube-controler-mgr 会同时使用这两条路径上报的状态判断是否需要修改节点状态或者驱逐 pod 等。下面我们分别看看这两条上报路径具体做了什么。

renew lease

kubelet 会在启动时,开启一个协程,定期去续期租约

// pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
if kl.kubeClient != nil {
        ...
        go kl.nodeLeaseController.Run(wait.NeverStop)
    }
    ...
}
// k8s.io/component-helpers/apimachinery/lease/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
    if c.leaseClient == nil {
        klog.Infof("lease controller has nil lease client, will not claim or renew leases")
        return
    }
    wait.Until(c.sync, c.renewInterval, stopCh)
}

从上面的代码中我们可以看到,这个协程会以 renewInterval 的周期去续期租约,那么这个值是多少呢?我们可以在下面函数中找到这个值在初始化的时候被设置为了租约有效期的 0.25 倍,而租约有效期 NodeLeaseDurationSeconds 默认值是 40s,所以 renewInterval 为 10s。这个可以保证正常情况下,租约在过期前有足够的时间被续期。

// pkg/kubelet/kubelet.go

const nodeLeaseRenewIntervalFraction = 0.25

func NewMainKubelet(...) {
    ...
    leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second
    // 基于 leaseDuration 默认 40s,renewInterval 为10s
    renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)
    ...
}

kubelet 在启动的时候,会调用 kube-apiserver 接口获取 kube-node-lease 命名空间下的本节点租约信息,如果未获取到,则创建租约;如果可以获取到租约,则尝试去续期。我们再结合 kube-controler-mgr 来看这个租约对于节点状态有何影响:

// pkg/controller/nodelifecycle/node_lifecycle_controller.go

// Note: If kubelet never posted the node status, but continues renewing the
// heartbeat leases, the node controller will assume the node is healthy and
// take no action.
func (nc *Controller) tryUpdateNodeHealth(...) {
    ...
    if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
        // 租约过期后,下面代码为修改节点状态为 Unknown 状态
        ...
    }
    return ...
}

nodeHealth.probeTimestamp 为上次租约续期时间,

如果:

当前时间戳 > 上次续约时间戳 + gracePeriod

那么认为租约已经过期,也即 kubelet 没有及时续约,kube-controler-mgr 会把节点状态设置为 Unknown,然后再其他协程中会给节点打污点,驱逐节点上的 pod。上述的 gracePeriod 有两种情况:

  • kubelet刚刚启动时(通过租约是否为空判断),这个值默认为 60s
  • 启动后,这个值默认为 40s,可以看到这时候这个值跟 kubelet 在创建租约的时候给租约设置的租期 40s 是一致的。

如果:

当前时间戳 <= 上次续约时间戳 + gracePeriod

那么认为节点是健康的,直接返回。我们从代码的注释也可以看出,即使 kubelet 没有上报节点节点状态信息,但是租约一直是有效的,那么节点也会被认为是健康的。

那么为什么有了节点状态信息还需要租约呢?不着急,我们先往下看看节点状态信息的上报。

sync node status

我们可以从上图看到,kubelet 上报的节点状态信息主要有两个来源:

  • 调用容器运行时健康检查获取的状态,主要包括容器运行时健康状况和容器网络健康状况
  • 驱逐管理器设置的状态,主要包括两类,第一类是节点和节点上组件的基础信息如节点可分配资源、容器运行时版本、kubelet 开放的端口号等,第二类是节点资源压力类型如磁盘压力、内存压力、pid压力

kubelet 启动时会启动一个协程,这个协程会定期同步节点状态到 kube-apiserver,这个周期为 nodeStatusUpdateFrequency,默认为 10s,那么是否就意味着每 10s 调用一次 kube-apiserver 接口上报状态呢?我们先看代码

// pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
if kl.kubeClient != nil {
        // Start syncing node status immediately, this may set up things the runtime needs to run.

        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
        go kl.fastStatusUpdateOnce()

        // start syncing lease
        go kl.nodeLeaseController.Run(wait.NeverStop)
    }

    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
    ...
}
// pkg/kubelet/kubelet_node_status.go

func (kl *Kubelet) syncNodeStatus() {
    ...
    if err := kl.updateNodeStatus(); err != nil {
        klog.Errorf("Unable to update node status: %v", err)
    }
}

func (kl *Kubelet) updateNodeStatus() error {
    klog.V(5).Infof("Updating node status")
    for i := 0; i < nodeStatusUpdateRetry; i++ {
        if err := kl.tryUpdateNodeStatus(i); err != nil {
            if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
                kl.onRepeatedHeartbeatFailure()
            }
            klog.Errorf("Error updating node status, will retry: %v", err)
        } else {
            return nil
        }
    }
    return fmt.Errorf("update node status exceeds retry count")
}

func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
    opts := metav1.GetOptions{}
    if tryNumber == 0 {
        util.FromApiserverCache(&opts)
    }
    // 调用 kube-apiserver获取当前节点信息
    node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
    if err != nil {
        return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
    }

    originalNode := node.DeepCopy()
    if originalNode == nil {
        return fmt.Errorf("nil %q node object", kl.nodeName)
    }

    // 从 kube-apiserver 中获取Node信息,然后判断podCidr和runtime 的podCidr是否一样
    // 如果不一样,那么调用runtime接口设置podCidr
    podCIDRChanged := false
    if len(node.Spec.PodCIDRs) != 0 {
        // Pod CIDR could have been updated before, so we cannot rely on
        // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
        // actually changed.
        podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
        if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
            klog.Errorf(err.Error())
        }
    }

    kl.setNodeStatus(node)

    now := kl.clock.Now()
    // 还没到上报上报时间,且节点状态没有发生变化,那么直接返回,否则上报状态
    if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
        if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
            kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
            return nil
        }
    }

    // Patch the current status on the API server
    updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
    if err != nil {
        return err
    }
    kl.lastStatusReportTime = now
    kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
    // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
    // those volumes are already updated in the node's status
    kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
    return nil
}

通过上面这段代码,我们先回答 kubelet 上报节点状态的周期是否为 10s 的问题。通过上面代码我们可以看到,kubelet 首先会计算节点的状态在过去 10s 是否已经发生了变化,如果没有发生变化,且:

当前时间戳 < 上次上报时间戳 + nodeStatusReportFrequency

那么本次不上报节点状态,翻译过来就是距离上次上报节点状态的时间还没有超过 nodeStatusReportFrequency,且节点状态没有发生变化,那么就跳过本次状态上报。nodeStatusReportFrequency 默认是 60s。

但是,如果节点状态已经发生了变化,那么无论之前上报的时间距离现在是多久,都会立马上报状态。

我们为什么要讨论上报事件周期呢?我们现在就来回答上面我们提出的“为什么有了节点状态信息还需要租约?”这个问题。

这个问题主要是从 kube-apiserver 和 etcd 的负载出发。如果集群规模较小的话,可能负载不是问题。但是当节点数达到成千上万后,如果频繁调用 kube-apiserver 接口同步节点状态,会给 kube-apiserver 造成很大的压力,因为 lease 是轻量级的,需要同步的数据相比于 node status 少很多况且在一个稳定运行的集群中,node 状态变化比较少,如果频繁上报对网络带宽也是一种压力。实际上,node lease 在 1.14 版本才成为 beta 版本,早期的版本里面并没有这个特性,所以这应该也是集群性能的一个优化。

对于不同规模集群,我们可以尝试去修改这些同步周期参数,对集群调优,达到不同的效果;如小集群,可以适当缩小周期,提升灵敏度;大集群可以适当增大周期,降低 kube-apiserver 和 etcd 的压力;还可以从业务对 node 状态的变化感知是否敏感的角度适当调整参数。但是参数并不能随意的调整,kubelet 的状态同步周期参数必须结合 kube-controler-mgr 的一起调整,具体需要满足下面的限制:

kubelet 参数(kubelet配置文件参数)

  • nodeLeaseDurationSeconds:租约有效期
  • nodeStatusUpdateFrequency:同步节点状态周期(节点状态发生变化)
  • nodeStatusReportFrequency:同步节点状态周期(节点状态无变化)

kube-controler-mgr 参数

  • --node-monitor-period:检查 node 状态的周期,默认 5s
  • --node-monitor-grace-period:可以容忍距离最后一次租约更新的时间长度,默认为 40s,也就是说距离上次租约更新的时间已经超过 40s 了,那么节点的NodeReady、NodeMemoryPressure、NodeDiskPressure、NodePIDPressure 都会被设置为 Unknown 状态,接着其他协程发现节点状态 Unknown,那么会被打上对应的标签;如果租约更新没有超过这个时间,则认为节点是正常的。
  • --pod-eviction-timeout:如果节点不健康时间已经超过该参数设置的时长,那么开始驱逐 Pod。

node-monitor-grace-period 必须是 nodeLeaseDurationSeconds 和 nodeStatusUpdateFrequency(两者取小者) 的 n 倍,也就是意味着 kubelet 可以重试 n 次上报,要是 node-monitor-grace-period 比这两个值小,那就没有意义了。

node-monitor-grace-period 必须小于 pod-eviction-timeout

node-monitor-grace-period 不要设置的过大,否则节点健康状态变化延时很大,节点上的 Pod 无法及时驱逐到其他节点,甚至还有可能会有新 Pod 调度到不健康节点上。

说完同步周期后,我们再来看看节点状态同步的两个数据来源。

运行时数据来源

// pkg/kubelet/kubelet_node_status.go
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
    ...

    kl.setNodeStatus(node)
    ...
}

func (kl *Kubelet) setNodeStatus(node *v1.Node) {
    for i, f := range kl.setNodeStatusFuncs {
        klog.V(5).Infof("Setting node status at position %v", i)
        if err := f(node); err != nil {
            klog.Errorf("Failed to set some node status fields: %s", err)
        }
    }
}

setNodeStatusFuncs 在启动时的时候由如下函数初始化

// pkg/kubelet/kubelet_node_status.go

func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
    // if cloud is not nil, we expect the cloud resource sync manager to exist
    var nodeAddressesFunc func() ([]v1.NodeAddress, error)
    if kl.cloud != nil {
        nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
    }
    var validateHostFunc func() error
    if kl.appArmorValidator != nil {
        validateHostFunc = kl.appArmorValidator.ValidateHost
    }
    var setters []func(n *v1.Node) error
    setters = append(setters,
        nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
        // GetCachedMachineInfo 由 cadvisor设置 machineInfo, err := klet.cadvisor.MachineInfo()
        nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
            kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
        nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
        nodestatus.DaemonEndpoints(kl.daemonEndpoints),
        nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
        nodestatus.GoRuntime(),
    )
    // Volume limits
    setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))

    setters = append(setters,
        nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
        nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
        nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
        nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
        nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
        // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
        // and record state back to the Kubelet runtime object. In the future, I'd like to isolate
        // these side-effects by decoupling the decisions to send events and partial status recording
        // from the Node setters.
        kl.recordNodeSchedulableEvent,
    )
    return setters
}

setNodeStatusFuncs 中被设置了多个设置节点状态的函数,其中的 nodestatus.ReadyCondition() 函数返回的就是一个设置节点 Ready 状态的函数,而 nodestatus.ReadyCondition 的入参就是状态检查函数。我们来看看 runtimeErrors:

func (s *runtimeState) runtimeErrors() error {
    s.RLock()
    defer s.RUnlock()
    errs := []error{}
    if s.lastBaseRuntimeSync.IsZero() {
        errs = append(errs, errors.New("container runtime status check may not have completed yet"))
    } else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
        errs = append(errs, errors.New("container runtime is down"))
    }
    for _, hc := range s.healthChecks {
        if ok, err := hc.fn(); !ok {
            errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
        }
    }
    if s.runtimeError != nil {
        errs = append(errs, s.runtimeError)
    }

    return utilerrors.NewAggregate(errs)
}

healthChecks 在初始化时添加了如下检查函数:

pkg/kubelet/pleg/generic.go

func (g *GenericPLEG) Healthy() (bool, error) {
    relistTime := g.getRelistTime()
    if relistTime.IsZero() {
        return false, fmt.Errorf("pleg has yet to be successful")
    }
    // Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
    metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
    elapsed := g.clock.Since(relistTime)
    if elapsed > relistThreshold {
        return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
    }
    return true, nil
}

这个函数判断运行时健康与否,是根据上次 pleg 运行的成功的时间距离现在的时间戳是否超过了阈值(3min),pleg 定期(1s)会调用运行时接口 relist 节点上 pod/container,如果relist成功那么就会设置 relist 成功的时间戳。在后面的文章中我们会重点讨论 pleg。

然后会在 ReadyCondition 函数中运行 runtimeErrors,要是 runtimeErrors 返回的 err 不为空,那么就会设置节点的 status.condition,注意这里并不是调用接口设置,而是在 runtimeErrors 这一步判断的结果发现错误后,改变内存中的node 状态,等 defaultNodeStatusFuncs 中注册的所有函数都执行完后,会比较 node 状态的前后的值,要是发生了变化,才会调用 kube-apiserver 接口(注意,要是距离上次调用接口超过1min,也会调用接口上报)。

我们再来看看 networkErrors:

pkg/kubelet/runtime.go

func (s *runtimeState) networkErrors() error {
    s.RLock()
    defer s.RUnlock()
    errs := []error{}
    if s.networkError != nil {
        errs = append(errs, s.networkError)
    }
    return utilerrors.NewAggregate(errs)
}

kubelet 在启动的时候,会开启一个协程检查运行时状态(updateRuntimeUp 函数),s.networkError

// pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
    ...
}

func (kl *Kubelet) updateRuntimeUp() {
    kl.updateRuntimeMux.Lock()
    defer kl.updateRuntimeMux.Unlock()

    s, err := kl.containerRuntime.Status()
    if err != nil {
        klog.Errorf("Container runtime sanity check failed: %v", err)
        return
    }
    if s == nil {
        klog.Errorf("Container runtime status is nil")
        return
    }
    // Periodically log the whole runtime status for debugging.
    klog.V(4).Infof("Container runtime status: %v", s)
    networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
    if networkReady == nil || !networkReady.Status {
        klog.Errorf("Container runtime network not ready: %v", networkReady)
        kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
    } else {
        // Set nil if the container runtime network is ready.
        kl.runtimeState.setNetworkState(nil)
    }
    // information in RuntimeReady condition will be propagated to NodeReady condition.
    runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
    // If RuntimeReady is not set or is false, report an error.
    if runtimeReady == nil || !runtimeReady.Status {
        err := fmt.Errorf("Container runtime not ready: %v", runtimeReady)
        klog.Error(err)
        kl.runtimeState.setRuntimeState(err)
        return
    }
    kl.runtimeState.setRuntimeState(nil)
    kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
    kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

在 updateRuntimeUp 中会调用运行时接口kl.containerRuntime.Status() 来获取运行时状态,然后判断 NetworkReady 状态,并设置状态和相应的错误信息。在 networkErrors 中就是判断关于Network Ready 的错误信息是否为空来确定网络是否正常。

驱逐管理器数据来源

在 NewMainKubelet 中首先初始化了驱逐管理器

// pkg/kubelet/kubelet.go
func NewMainKubelet(...) {
    ...
    evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, etcHostsPathFunc)
    klet.evictionManager = evictionManager
    ...
}

在 initializeRuntimeDependentModules 中启动了驱逐管理器

func (kl *Kubelet) initializeRuntimeDependentModules() {
    ...
    kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
    ...
}
func (m *managerImpl) Start(...) {
    ...
    // start the eviction manager monitoring
    go func() {
        for {
            if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
                klog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
                m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
            } else {
                time.Sleep(monitoringInterval)
            }
        }
    }()
}

其中的 synchronize 就是检查节点内存、磁盘等资源的现状函数,然后跟驱逐配置作对比,判断节点是否处于内存或磁盘压力等,如果是存在压力,那么会将处于压力的资源配置到节点状态中去,也就是驱逐管理器的 nodeConditions 中:

// pkg/kubelet/eviction/eviction_manager.go
type managerImpl struct {
    ...
    nodeConditions []v1.NodeConditionType
    ...
}

我们再回到上面状态上报,我们以 MemoryPressureCondition 为例:

// pkg/kubelet/kubelet_node_status.go

func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
    ...
    nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent)
    ...
    
}
// pkg/kubelet/eviction/eviction_manager.go
func (m *managerImpl) IsUnderMemoryPressure() bool {
    m.RLock()
    defer m.RUnlock()
    return hasNodeCondition(m.nodeConditions, v1.NodeMemoryPressure)
}

到这里我们就看到了 hasNodeCondition 获取节点的是否存在某种节点状态的时候,是从驱逐管理器的 nodeConditions 获取的,而 nodeConditions 中的值正是驱逐管理器周期性运行设置的,如果节点没有任何压力,那么这个nodeConditions 数组就是空的。

相关文章

KubeSphere 部署向量数据库 Milvus 实战指南
探索 Kubernetes 持久化存储之 Longhorn 初窥门径
征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
那些年在 Terraform 上吃到的糖和踩过的坑
无需 Kubernetes 测试 Kubernetes 网络实现
Kubernetes v1.31 中的移除和主要变更

发布评论