背景
上篇文章咱们说了 Volcano 控制器原理,这篇文章来看下调度器核心逻辑。
调度器简介
接着终于到了 Volcano 的核心控制器部分。其实上部分 Controller 所有的调谐,最终都是为了能做好批调度。和其他文章一样,先看看官方的描述:
官方描述:
通俗点:Volcano 调度过程中会执行一系列的 Action,Action 中执行什么算法逻辑,就取决于注册进去的 plugins。具体如何配置 Actions 和 Plugins,这需要扒扒源码了。主要其实代码中多了个 shuffle Action,官方文档有些落后了。
调度器源码实现
从控制器 CRD 到调度器,调度器如何识别 PodGroup
此时必须要注意,调度器代码中的 JobInfo 并非 控制器所识别的 Volcano Job,而是对 PodGroup 的封装。
咱们先到调度器的 main 入口来看,除了框架类的代码外,只有一个 Run 函数。而 Run 函数中有个 NewScheduler 方法。NewScheduler 方法中有个 schedcache.New() 方法...。顺着代码,终于来到了主角 addEventHandler()(此处只是为了看 PodGroup 到 TaskInfo 的映射,讲的比较粗,细节后续会再提到)。
// cmd/scheduler/main.go
func main() {
// ...
if err := app.Run(s); err != nil {
// ...
}
}
// cmd/scheduler/app/server.go
// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
// ...
sched, err := scheduler.NewScheduler(config, opt)
// ...
}
// pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
// ...
cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners)
scheduler := &Scheduler{
schedulerConf: opt.SchedulerConf,
fileWatcher: watcher,
cache: cache,
schedulePeriod: opt.SchedulePeriod,
dumper: schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
}
return scheduler, nil
}
// pkg/scheduler/cache/cache.go
// New returns a Cache implementation.
func New(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) Cache {
return newSchedulerCache(config, schedulerNames, defaultQueue, nodeSelectors, nodeWorkers, ignoredProvisioners)
}
// pkg/scheduler/cache/cache.go
func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
// ...
// add all events handlers
sc.addEventHandler()
// ...
}
从 addEventHandler() 中可以看到调度器监听了好多资源,包括 PodGroup,主要看看其中的 AddFunc,也就是 pkg/scheduler/cache/event_handlers.go 中的 AddPodGroupV1beta1 方法。此时会看到一个 schedulingapi.PodGroup 结构体,注意该结构体并非控制器中的 PodGroup crd,而是一个 Wrapper。
// PodGroup is a collection of Pod; used for batch workload.
type PodGroup struct {
scheduling.PodGroup
// Version represents the version of PodGroup
Version string
}
// pkg/scheduler/cache/event_handlers.go
// AddPodGroupV1beta1 add podgroup to scheduler cache
func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {
ss, ok := obj.(*schedulingv1beta1.PodGroup)
podgroup := scheduling.PodGroup{}
pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
if err := sc.setPodGroup(pg); err != nil {
}
}
在 setGroup 中可以发现把 PodGroup 塞到了 Jobs map 中,而 Jobs map 类型为 map[schedulingapi.JobID]*schedulingapi.JobInfo 。此时 PodGroup 变成了 JobInfo。
// pkg/scheduler/cache/event_handlers.go
// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPodGroup(ss *schedulingapi.PodGroup) error {
job := getJobID(ss)
if _, found := sc.Jobs[job]; !found {
sc.Jobs[job] = schedulingapi.NewJobInfo(job)
}
sc.Jobs[job].SetPodGroup(ss)
// TODO(k82cn): set default queue in admission.
if len(ss.Spec.Queue) == 0 {
sc.Jobs[job].Queue = schedulingapi.QueueID(sc.defaultQueue)
}
metrics.UpdateE2eSchedulingStartTimeByJob(sc.Jobs[job].Name, string(sc.Jobs[job].Queue), sc.Jobs[job].Namespace,
sc.Jobs[job].CreationTimestamp.Time)
return nil
}
调度器如何识别 Pod
此时必须要注意,调度器代码中的 TaskInfo 并非 控制器所识别的 Volcano Job 中的 Task,而是对 Pod 的封装。
在上一节的 addEventHandler() 中,同样可以看到对 Pod 资源的监听,同样来看下其中的 addFunc,也就是 pkg/scheduler/cache/event_handlers.go 中的 AddPod。
// AddPod add pod to scheduler cache
func (sc *SchedulerCache) AddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.addPod(pod)
}
AddPod 的动作很明显了,就是将 Pod 转换为 Task,并执行 addTasks 函数(将 TaskInfo 保存到 Tasks map 中。同时更新 Nodes map)。
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi, err := sc.NewTaskInfo(pod)
return sc.addTask(pi)
}
启动调度
上节说到,调度器的 Run() 函数中有个 NewScheduler 函数,而在 Run 中,除了初始化 Scheduler 对象外,另一个重要的动作就是 Scheduler 对象的 Run 方法。
// cmd/scheduler/app/server.go
// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
// ...
sched, err := scheduler.NewScheduler(config, opt)
// ...
run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
}
}
先来瞅一眼 Scheduler 对象的结构,其中最值的关注的自然是 actions 和 plugins。
// Scheduler watches for new unscheduled pods for volcano. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
metricsConf map[string]string
dumper schedcache.Dumper
}
接着来到 Run 函数,其中有个定时执行的逻辑 runOnce。
// pkg/scheduler/scheduler.go
// Run initializes and starts the Scheduler. It loads the configuration,
// initializes the cache, and begins the scheduling process.
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
}
而在 runOnce 函数中首先初始化了 Session 对象,其次依次执行其对应的 actions。此时对官方图中 Session 开始对应起来。
// pkg/scheduler/scheduler.go
func (pc *Scheduler) runOnce() {
// ...
ssn := framework.OpenSession(pc.cache, plugins, configurations)
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
// ...
}
那么这里调用了那些 plugins 和 actions 呢,各个 action 中又要执行那些 plugins 呢。别急,本文一一道来。
首先看看有哪些 actions 和 plguins。在 Scheduler.Run() 中有个 loadSchedulerConf 方法。该方法中默认在文件中读取 actions 和 plugins 配置。
// pkg/scheduler/scheduler.go
func (pc *Scheduler) loadSchedulerConf() {
// ...
var config string
if len(pc.schedulerConf) != 0 {
confData, err := os.ReadFile(pc.schedulerConf)
if err != nil {
klog.Errorf("Failed to read the Scheduler config in '%s', using previous configuration: %v",
pc.schedulerConf, err)
return
}
config = strings.TrimSpace(string(confData))
}
actions, plugins, configurations, metricsConf, err := unmarshalSchedulerConf(config)
// ...
}
默认配置如下变量 defaultSchedulerConf 所示,包含 enqueue、allocate 和 backfill 三个 action 以及多个 plugins。
var defaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
`
看到这儿后,有些读者费解,这么多 plugins 如何与 actions 一一对应上呢,某个 action 里面需要执行哪些 plugins 呢。其实在 OpenSession 方法中,尝试调用了每个 plugin 的 OnSessionOpen 方法,该方法由各个 plugin 方自己实现,主要将自身逻辑注册到各个 action 的 funcmap 中。
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
// ...
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
klog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {
plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
plugin.OnSessionOpen(ssn)
}
}
}
return ssn
}
以经典的 gang plugin 为例,看看 OnSessionOpen 方法,其他插件读者可自行查看。
// pkg/scheduler/plugins/gang/gang.go
func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
// ....
// 将 preemptableFn 注册到 Reclaimable 和 Preemptable 两个 action 的 map 中
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
var victims []*api.TaskInfo
jobOccupiedMap := map[api.JobID]int32{}
for _, preemptee := range preemptees {
job := ssn.Jobs[preemptee.Job]
if _, found := jobOccupiedMap[job.UID]; !found {
jobOccupiedMap[job.UID] = job.ReadyTaskNum()
}
if jobOccupiedMap[job.UID] > job.MinAvailable {
jobOccupiedMap[job.UID]--
victims = append(victims, preemptee)
} else {
klog.V(4).Infof("Can not preempt task <%v/%v> because job %s ready num(%d) <= MinAvailable(%d) for gang-scheduling",
preemptee.Namespace, preemptee.Name, job.Name, jobOccupiedMap[job.UID], job.MinAvailable)
}
}
return victims, util.Permit
}
ssn.AddReclaimableFn(gp.Name(), preemptableFn)
ssn.AddPreemptableFn(gp.Name(), preemptableFn)
}
经典动作 Enqueue
Enqueue action负责通过一系列的过滤算法筛选出符合要求的待调度任务并将它们送入待调度队列。经过这个action,任务的状态将由pending变为inqueue。
前文提到,runOnce 方法中会调用各个 action 的 Execute 方法。总的来说,Enqueue action 的 Execute 方法定义了三个局部变量 queues、queueSet 和 jobsMap,然后执行了两个 for 循环。其中 queues 是一个以 queue 为 item 的优先队列,queueSet 是一个队列 id list 形成的字符串,jobsMap 是队列 id 到 队列的映射。
// pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {
// ......
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueSet := sets.NewString()
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
// ......
}
klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
for {
// ......
}
}
第一个 for 循环代码如下。首先遍历 jobs (对应 PodGroup Wrapper)的过程中判断用到了哪些 Volcano Queue,将这些 Queue 保存到 queueSet 和 queues 中;其次将处于 Pending 状态的 jobs 加入到 jobsMap 中。
// pkg/scheduler/actions/enqueue/enqueue.go
// 这个 Job 是 Volcano 自定义资源 Job,不是 K8s 里的 Job;这里开始遍历所有 jobs
for _, job := range ssn.Jobs {
if job.ScheduleStartTimestamp.IsZero() {
ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
Time: time.Now(),
}
}
// 如果 job 中定义的 Queue 在 Session 中存在,那就执行
// queueSet.Insert(string(queue.UID)) 和
// queues.Push(queue);注意这里 Push 进去的是 queue
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if !queueSet.Has(string(queue.UID)) {
klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
// 这里构建了一个 queue UID 的 set 和一个 queue 队列(优先级队列,heap 实现)
queueSet.Insert(string(queue.UID))
queues.Push(queue)
}
if job.IsPending() {
// 如果 job 指定的 queue 还没存到 jobsMap 里,则创建一个对应的 PriorityQueue
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
// 将 job 加到指定 queue 中
jobsMap[job.Queue].Push(job)
}
}
第二个 for 循环的代码如下。总的来说就是不断的从优先 queue 中获取最高优先级的 queue,然后根据 queue 的 UID 找到本地 jobsMap 里对应的 jobs 队列(PodGroup 的 Wrapper),最后从 jobs 队列中找到优先级最高的 Job,并将其状态置为 Inqueue。
for {
// 没有队列,退出循环
if queues.Empty() {
break
}
// 从优先级队列 queues 中 Pop 一个高优的队列出来
queue := queues.Pop().(*api.QueueInfo)
// 如果这个高优队列在 jobsMap 里没有保存相应的 jobs,也就是为空,那就继续下一轮循环
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
// jobs 也是一个优先级队列,Pop 一个高优 job 出来
job := jobs.Pop().(*api.JobInfo)
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
ssn.JobEnqueued(job)
// Phase 更新为 "Inqueue"
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
// 将当前 job 加入到 ssn.Jobs map
ssn.Jobs[job.UID] = job
}
// 将前面 Pop 出来的 queue 加回到 queues 中,直到 queue 中没有 job,这样逐步 queues 为空空,上面的 Empty() 方法就会返回 true,然后循环退出。
queues.Push(queue)
}
经典动作 Allocate
Allocate action负责通过一系列的预选和优选算法筛选出最适合的节点。
如下代码所示,Allocate action 的 Execute 方法中定义了 queues 和 jobsMap 变量,然后执行了 pickUpQueuesAndJobs 和 allocateResources 方法。其中 queues 是一个元素为优先级队列的优先级队列;jobsMap 是一个 map,key 为 queue id,value 为优先级队列,也就是一个特定的 queue,queue 中存着 jobs。
// pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}
alloc.session = ssn
alloc.pickUpQueuesAndJobs(queues, jobsMap)
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
alloc.allocateResources(queues, jobsMap)
}
而 pickUpQueuesAndJobs 方法中核心逻辑只有一个。遍历 jobs,将其按照 queue 不同存到 jobsMap 中。allocateResources 方法的逻辑主要是按照优先级依次给 tasks 寻找最合适的 node,找到后“预占”资源,于是按顺序逐步给所有的 tasks 都找到了最佳节点(这块代码很复杂,后期补上哈)。
未完待续
今天就先到这儿啦。看完后依旧有些费解的地方,后续有空补上哈。