《k8s1.13版本源码分析》调度器初始化
- 概述
- 从 –config 开始
- options.Option 对象
- config.Config对象
- runCommand
- ApplyFeatureGates
- 默认算法注册
- 特性开关
- Scheduler 的创建
- 调度算法源
- policy / provider 如何生效
- 默认生效的算法
1. 概述
今天我们要做一些琐碎的知识点分析,比如调度器启动的时候默认配置是怎么来的?默认生效了哪些调度算法?自定义的算法是如何注入的?诸如这些问题,我们顺带会看一下调度器相关的一些数据结构的含义。看完前面这些节的分析后再看完本篇文章你可能会有一种醍醐灌顶的感觉哦~
2. 从 –config 开始
如果我们编译出来一个 kube-scheduler 二进制文件,运行./kube-scheduler -h后会看到很多的帮助信息,这些信息是分组的,比如第一组 Misc,差不多是“大杂烩”的意思,不好分类的几个 flag,其实也是最重要的几个 flag,如下:

很好理解,第一个红框框圈出来的–config用于指定配置文件,老版本的各种参数基本都不建议使用了,所以这个 config flag 指定的 config 文件中基本包含了所有可配置项,我们看一下代码中获取这个 flag 的相关代码:
cmd/kube-scheduler/app/options/options.go:143
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
// 关注 --config
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
return nfs
}
上述代码中有几个点可以关注到:
上面代码中可以看到o.ConfigFile接收了config配置,我们看看Option类型是什么样子的~
2.1. options.Option 对象
Options对象包含运行一个 Scheduler 所需要的所有参数
cmd/kube-scheduler/app/options/options.go:55
type Options struct {
// 和命令行帮助信息的分组是一致的
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
CombinedInsecureServing *CombinedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
Deprecated *DeprecatedOptions
// config 文件的路径
ConfigFile string
// 如果指定了,会输出 config 的默认配置到这个文件
WriteConfigTo string
Master string
}
前面的 flag 相关代码中写到配置文件的内容给了o.ConfigFile,也就是Options.ConfigFile,那这个属性怎么使用呢?
我们来看下面这个 ApplyTo() 函数,这个函数要做的事情是把 options 配置 apply 给 scheduler app configuration(这个对象后面会讲到):
cmd/kube-scheduler/app/options/options.go:162
// 把 Options apply 给 Config
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// --config 没有使用的情况
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// 使用 Deprecated 的配置
if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
return err
}
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
// 加载 config 文件中的内容
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// 上面加载到的配置赋值给 Config中的 ComponentConfig
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
// ……
return nil
}
这个函数中可以看到用 –config 和不用 –config 两种情况下 options 是如何应用到schedulerappconfig.Config中的。那么这里提到的 Config 对象又是什么呢?
2.2. config.Config对象
Config 对象包含运行一个 Scheduler 所需要的所有 context
cmd/kube-scheduler/app/config/config.go:32
type Config struct {
// 调度器配置对象
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
LoopbackClientConfig *restclient.Config
InsecureServing *apiserver.DeprecatedInsecureServingInfo
InsecureMetricsServing *apiserver.DeprecatedInsecureServingInfo
Authentication apiserver.AuthenticationInfo
Authorization apiserver.AuthorizationInfo
SecureServing *apiserver.SecureServingInfo
Client clientset.Interface
InformerFactory informers.SharedInformerFactory
PodInformer coreinformers.PodInformer
EventClient v1core.EventsGetter
Recorder record.EventRecorder
Broadcaster record.EventBroadcaster
LeaderElection *leaderelection.LeaderElectionConfig
}
所以前面的c.ComponentConfig = o.ComponentConfig这行代码也就是把 Options 中的 ComponentConfig 赋值给了 Config 中的 ComponentConfig;是哪里的逻辑让 Options 和 Config 对象产生了关联呢?(也就是说前面提到的 ApplyTo() 方法是再哪里被调用的?)
继续跟下去可以找到Config()函数,从这个函数的返回值*schedulerappconfig.Config可以看到它的目的,是需要得到一个 schedulerappconfig.Config,代码不长:
cmd/kube-scheduler/app/options/options.go:221
func (o *Options) Config() (*schedulerappconfig.Config, error) {
// ……
c := &schedulerappconfig.Config{}
// 前面我们看到的 ApplyTo() 函数
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// Prepare kube clients.
// ……
// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
// Set up leader election if enabled.
// ……
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElection = leaderElectionConfig
return c, nil
}
那调用这个Config()函数的地方又在哪里呢?继续跟就到 runCommand 里面了~
2.3. runCommand
runCommand 这个函数我们不陌生:
cmd/kube-scheduler/app/server.go:117
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
// ……
// 这个地方完成了前面说到的配置文件和命令行参数等读取和应用工作
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%vn", err)
os.Exit(1)
}
stopCh := make(chan struct{})
// Get the completed config
cc := c.Complete()
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
// 这里有一堆逻辑
algorithmprovider.ApplyFeatureGates()
// Configz registration.
// ……
return Run(cc, stopCh)
}
runCommand 在最开始的时候我们有见到过,已经到 cobra 入口的 Run 中了:
cmd/kube-scheduler/app/server.go:85
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%vn", err)
os.Exit(1)
}
},
上面涉及到2个知识点:
- ApplyFeatureGates
- Run 中的逻辑
我们下面分别来看看~
3. ApplyFeatureGates
这个函数跟进去可以看到如下几行简单的代码,这里很自然我们能够想到继续跟defaults.ApplyFeatureGates(),但是不能只看到这个函数哦,具体来看:
pkg/scheduler/algorithmprovider/plugins.go:17
package algorithmprovider
import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}
到这里分2条路:
- import defaults 这个 package 的时候有一个init()函数调用的逻辑
- defaults.ApplyFeatureGates() 函数调用本身。
3.1. 默认算法注册
pkg/scheduler/algorithmprovider/defaults/defaults.go:38
func init() {
// ……
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
// ……
}
init()函数中我们先关注 registerAlgorithmProvider() 函数,这里从字面上可以得到不少信息,大胆猜一下:是不是注册了默认的预选算法和优选算法?
pkg/scheduler/algorithmprovider/defaults/defaults.go:222
func registerAlgorithmProvider(predSet, priSet sets.String) {
// 注册 algorithm provider. 默认使用 DefaultProvider
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
}
看到这里可以关注到 AlgorithmProvider 这个概念,后面会讲到。
先看一下里面调用的注册函数是怎么实现的:
pkg/scheduler/factory/plugins.go:387
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
// 很明显,关键逻辑在这里
algorithmProviderMap[name] = AlgorithmProviderConfig{
FitPredicateKeys: predicateKeys,
PriorityFunctionKeys: priorityKeys,
}
return name
}
首先,algorithmProviderMap 这个变量是一个包级变量,在86行做的定义:algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
这里的 key 有2种情况:
- “DefaultProvider”
- “ClusterAutoscalerProvider”
混合云场景用得到 ClusterAutoscalerProvider,大家感兴趣可以研究一下 ClusterAutoscaler 特性,这块我们先不说。默认的情况是生效的 DefaultProvider,这块逻辑后面还会提到。
然后这个 map 的 value 的类型是一个简单的 struct:
pkg/scheduler/factory/plugins.go:99
type AlgorithmProviderConfig struct {
FitPredicateKeys sets.String
PriorityFunctionKeys sets.String
}
接着看一下defaultPredicates()函数
pkg/scheduler/algorithmprovider/defaults/defaults.go:106
func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
),
// ……
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),
// ……
)
}
这个函数里面就2中类型的玩法,简化一些可以理解成上面这个样子,我们一个个来看。
先认识一下 sets.NewString()函数要干嘛:
vendor/k8s.io/apimachinery/pkg/util/sets/string.go:27
type String map[string]Empty
// NewString creates a String from a list of values.
func NewString(items ...string) String {
ss := String{}
ss.Insert(items...)
return ss
}
// ……
// Insert adds items to the set.
func (s String) Insert(items ...string) {
for _, item := range items {
s[item] = Empty{}
}
}
如上,很简单的类型封装。里面的Empty是:type Empty struct{},所以本质上就是要用map[string]struct{}这个类型罢了。
因此上面defaultPredicates()函数中sets.NewString()内每一个参数本质上就是一个 string 类型了,我们来看这一个个 string 是怎么来的。
pkg/scheduler/factory/plugins.go:195
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
// 唯一值的关注的逻辑
fitPredicateMap[name] = predicateFactory
// 返回 name
return name
}
这个函数要返回一个 string 我们已经知道了,里面的逻辑也只有这一行需要我们关注:fitPredicateMap[name] = predicateFactory,这个 map 类型也是一个包级变量:fitPredicateMap = make(map[string]FitPredicateFactory),所以前面讲的注册本质也就是在填充这个变量而已。理解fitPredicateMap[name] = predicateFactory中 fitPredicateMap 的 key 和 value,也就知道了这里的 Register 要做什么。
defaultPredicates()中的第二种注册方式 RegisterFitPredicate 区别不大,函数体也是调用的 RegisterFitPredicateFactory():
pkg/scheduler/factory/plugins.go:106
func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}
3.2. 特性开关
pkg/scheduler/algorithmprovider/defaults/defaults.go:183
func ApplyFeatureGates() {
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
factory.RemoveFitPredicate(predicates.CheckNodeMemoryPressurePred)
factory.RemoveFitPredicate(predicates.CheckNodeDiskPressurePred)
factory.RemoveFitPredicate(predicates.CheckNodePIDPressurePred)
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeConditionPred)
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeMemoryPressurePred)
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeDiskPressurePred)
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodePIDPressurePred)
factory.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)
factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.PodToleratesNodeTaintsPred)
factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.CheckNodeUnschedulablePred)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1)
factory.InsertPriorityKeyToAlgorithmProviderMap(factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1))
}
}
这个函数看着几十行,实际上只在重复一件事情,增加或删除一些预选和优选算法。我们看一下这里的一些逻辑:
utilfeature.DefaultFeatureGate.Enabled() 函数要做的事情是判断一个 feature 是否开启;函数参数本质只是一个字符串:
pkg/features/kube_features.go:25
const ( AppArmor utilfeature.Feature = "AppArmor" DynamicKubeletConfig utilfeature.Feature = "DynamicKubeletConfig" // …… )
这里定义了很多的 feature,然后定义了哪些 feature 是开启的,处在 alpha 还是 beta 或者 GA 等:
pkg/features/kube_features.go:405
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
AppArmor: {Default: true, PreRelease: utilfeature.Beta},
DynamicKubeletConfig: {Default: true, PreRelease: utilfeature.Beta},
ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta},
ExperimentalCriticalPodAnnotation: {Default: false, PreRelease: utilfeature.Alpha},
DevicePlugins: {Default: true, PreRelease: utilfeature.Beta},
TaintBasedEvictions: {Default: true, PreRelease: utilfeature.Beta},
RotateKubeletServerCertificate: {Default: true, PreRelease: utilfeature.Beta},
// ……
}
所以回到前面ApplyFeatureGates()的逻辑,utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition)要判断的是 TaintNodesByCondition 这个特性是否开启了,如果开启了就把 predicates 中 “CheckNodeCondition”, “CheckNodeMemoryPressure”, “CheckNodePIDPressurePred”, “CheckNodeDiskPressure” 这几个算法去掉,把 “PodToleratesNodeTaints”, “CheckNodeUnschedulable” 加上。接着对于特性 “ResourceLimitsPriorityFunction” 的处理也是同一个逻辑。
4. Scheduler 的创建
我们换一条线,从 Scheduler 对象的创建再来看另外几个知识点。
前面分析到runCommand()函数的时候我们说到了需要关注最后一行return Run(cc, stopCh)的逻辑,在Run()函数中主要的逻辑就是创建 Scheduler 和启动 Scheduler;现在我们来看创建逻辑:
cmd/kube-scheduler/app/server.go:174
sched, err := scheduler.New(cc.Client, cc.InformerFactory.Core().V1().Nodes(), cc.PodInformer, cc.InformerFactory.Core().V1().PersistentVolumes(), cc.InformerFactory.Core().V1().PersistentVolumeClaims(), cc.InformerFactory.Core().V1().ReplicationControllers(), cc.InformerFactory.Apps().V1().ReplicaSets(), cc.InformerFactory.Apps().V1().StatefulSets(), cc.InformerFactory.Core().V1().Services(), cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), storageClassInformer, cc.Recorder, cc.ComponentConfig.AlgorithmSource, stopCh, scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
这里调用了一个New()函数,传了很多参数进去。New()函数的定义如下:
pkg/scheduler/scheduler.go:131
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh map[string]algorithm.FitPredicate
是不是很熟悉呢?
行,今天就讲到这里~
