volcano 的原文地址在 xiaorui.cc, 后面对 volcano 的架构及技术实现原理会持续补充.
- [controller 控制器实现](#volcano controller 的实现)
- [QueueController 控制器](#QueueController 控制器)
- [启动入口 Run](#启动入口 Run)
- processNextWorkItem
- handleQueue
- [state 状态处理](#state 状态处理)
- [三种 queue 处理方法](#三种 queue 处理方法)
- [JobController 控制器](#JobController 控制器)
- [启动入口 Run](#启动入口 Run)
- worker.processNextReq
- SyncJob
- KillJob
- State
- [PodGroupController 控制器](#PodGroupController 控制器)
- [启动入口 Run](#启动入口 Run)
- processNextReq
- [QueueController 控制器](#QueueController 控制器)
// Run the controller.
func Run(opt *options.ServerOption) error {
// 获取启动 controllers 的方法
run := startControllers(config, opt)
// 创建选举客户端
leaderElectionClient, err := kubeclientset.NewForConfig(rest.AddUserAgent(config, "leader-election"))
if err != nil {
return err
}
// ...
// 创建选举锁对象
rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
opt.LockObjectNamespace,
"vc-controller-manager",
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: eventRecorder,
})
if err != nil {
return fmt.Errorf("couldn't create resource lock: %v", err)
}
// 进行选举,如果拿到锁,则执行控制器逻辑,没拿到则等待。
// 如果在拿到锁后,发生异常导致续锁失败,导致被抢占,则退出进程。
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
})
return fmt.Errorf("lost lease")
}
func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
controllerOpt := &framework.ControllerOption{}
controllerOpt.SchedulerName = opt.SchedulerName
controllerOpt.WorkerNum = opt.WorkerThreads
controllerOpt.MaxRequeueNum = opt.MaxRequeueNum
controllerOpt.KubeClient = kubeclientset.NewForConfigOrDie(config)
controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)
return func(ctx context.Context) {
framework.ForeachController(func(c framework.Controller) {
if err := c.Initialize(controllerOpt); err != nil {
return
}
go c.Run(ctx.Done())
})
<-ctx.Done()
}
}
// 启动时 queue、job、pg 会注册到这里。
var controllers = map[string]Controller{}
func ForeachController(fn func(controller Controller)) {
// 遍历执行各个控制器。
for _, ctrl := range controllers {
fn(ctrl)
}
}
启动 queue、pg、cmd informer,在等待同步完成后,异步调用 worker 和 commandWorker 协程。
func (c *queuecontroller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer c.commandQueue.ShutDown()
klog.Infof("Starting queue controller.")
defer klog.Infof("Shutting down queue controller.")
go c.queueInformer.Informer().Run(stopCh)
go c.pgInformer.Informer().Run(stopCh)
go c.cmdInformer.Informer().Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.queueSynced, c.pgSynced, c.cmdSynced) {
klog.Errorf("unable to sync caches for queue controller.")
return
}
go wait.Until(c.worker, 0, stopCh)
go wait.Until(c.commandWorker, 0, stopCh)
<-stopCh
}
监听 queue 队列的数据,该队列的数据由 queueInformer eventHandler 来产生。
func (c *queuecontroller) worker() {
for c.processNextWorkItem() {
}
}
func (c *queuecontroller) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(obj)
req, ok := obj.(*apis.Request)
if !ok {
klog.Errorf("%v is not a valid queue request struct.", obj)
return true
}
// 这里是 handleQueue
err := c.syncHandler(req)
c.handleQueueErr(err, obj)
return true
}
根据 queue 状态返回不同的状态处理方法,然后变更状态。
func (c *queuecontroller) handleQueue(req *apis.Request) error {
// 获取 queue 对象
queue, err := c.queueLister.Get(req.QueueName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("get queue %s failed for %v", req.QueueName, err)
}
// 根据 queue 状态返回不同的状态处理方法
queueState := queuestate.NewState(queue)
if queueState == nil {
return fmt.Errorf("queue %s state %s is invalid", queue.Name, queue.Status.State)
}
// 处理状态,本质都是更新 queue 对象状态.
if err := queueState.Execute(req.Action); err != nil {
return err
}
return nil
}
queue 有各种各样的状态处理方法,这里拿 openState 举例说明.
type openState struct {
queue *v1beta1.Queue
}
func (os *openState) Execute(action v1alpha1.Action) error {
switch action {
case v1alpha1.OpenQueueAction:
// open 状态,进行 syncQueue 调和.
return SyncQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
status.State = v1beta1.QueueStateOpen
})
case v1alpha1.CloseQueueAction:
// close 状态,进行 queue 收尾操作.
return CloseQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
if len(podGroupList) == 0 {
status.State = v1beta1.QueueStateClosed
return
}
status.State = v1beta1.QueueStateClosing
})
default:
// 其他状态,则调用 syncQueue 调和.
return SyncQueue(os.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {
specState := os.queue.Status.State
if len(specState) == 0 || specState == v1beta1.QueueStateOpen {
status.State = v1beta1.QueueStateOpen
return
}
if specState == v1beta1.QueueStateClosed {
if len(podGroupList) == 0 {
status.State = v1beta1.QueueStateClosed
return
}
status.State = v1beta1.QueueStateClosing
return
}
status.State = v1beta1.QueueStateUnknown
})
}
}
- 获取 queue 对应的 podGroup 集合
- 根据各个 podGroup 的状态,累计 queue status 指标
- 更新 queue 对象
- 从 client 获取最新的 queue 对象
- 更新 queue 的状态
- 从 client 获取最新的 queue 对象
- 更新 queue 的状态
根据 job 获取和构建 pods 信息,然后按照 action 决定创建和销毁 pods.
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
// 启动 informer
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
go cc.pgInformer.Informer().Run(stopCh)
go cc.svcInformer.Informer().Run(stopCh)
go cc.cmdInformer.Informer().Run(stopCh)
go cc.pcInformer.Informer().Run(stopCh)
go cc.queueInformer.Informer().Run(stopCh)
// 等待 informer 同步完成
cache.WaitForCacheSync(stopCh, cc.jobSynced, cc.podSynced, cc.pgSynced,
cc.svcSynced, cc.cmdSynced, cc.pvcSynced, cc.pcSynced, cc.queueSynced)
// 监听处理 commands 信号.
go wait.Until(cc.handleCommands, 0, stopCh)
// 并发启动 worker
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
}
// jobcache 清理
go cc.cache.Run(stopCh)
// 处理异常的 task
go wait.Until(cc.processResyncTask, 0, stopCh)
klog.Infof("JobController is running ...... ")
}
volcano 为了提高处理性能,在 jobcontroller 内部抽象了队列数组,启动多个 worker 协程,每个协程绑定一个消费的 queue,而入队时会根据 fnv_hash(namespace/job) 取摸的方式来获取对应的 queue。
通过多队列多 worker 模式来提高处理性能,而且更重要的是可保证 job 级别的有序处理。
func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)
for cc.processNextReq(i) {
}
}
func (cc *jobcontroller) processNextReq(count uint32) bool {
// 获取 worker 对应的 queue 对象.
queue := cc.queueList[count]
obj, shutdown := queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}
req := obj.(apis.Request)
defer queue.Done(req)
// 拼装 ns/job-name
key := jobcache.JobKeyByReq(&req)
// 如果入队异常,则重新选择入队.
if !cc.belongsToThisRoutine(key, count) {
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}
// 从缓存中获取 jobinfo,cache 的数据是由 informer eventhandler 来操作的.
jobInfo, err := cc.cache.Get(key)
if err != nil {
return true
}
// 根据 job 状态获取不同的状态处理方法.
st := state.NewState(jobInfo)
if st == nil {
return true
}
// 应用 job 中指定的策略
action := applyPolicies(jobInfo.Job, &req)
if err := st.Execute(action); err != nil {
// ...
}
queue.Forget(req)
return true
}
pkg/controllers/job/job_controller_actions.go
初始化 job 相关配置.
- 根据 job 对象从 queue informer lister 里获取 queue 对象.
- initiateJob
- initJobStatus, init job status
- createJobIOIfNotExist, 如果 job 没有绑定 pvc,为其分配 pvc
- createOrUpdatePodGroup, 为 pg 创建 podgroup.
- 遍历 job.spec.tasks 根据当前 jobinfo 状态构建需要创建和删除的 pods 集合.
- createJobPod 在构建 pod 对象时会赋值 SchedulerName 自定义调度器,默认为
volcano
- 判断 jobInfo 里是否有对应的 pod
- 如果不存在,则进行创建.
- 如果存在,则在 pods 字段里删掉, 然后删掉一些缩容后多余的 pods.
- createJobPod 在构建 pod 对象时会赋值 SchedulerName 自定义调度器,默认为
- 更新 job 的状态
- 更新内部的 cache 组件
pkg/controllers/job/job_controller_actions.go
- 遍历 jobinfo 的 pods 集合, 调用
deleteJobPod
删除 pod 对象. - 更新 job 状态
- 删除 job 关联的 podgroup 对象.
job state 用来处理不同 job 状态下的动作行为.
NewState
根据 job 状态构建不同的方法.
// NewState gets the state from the volcano job Phase.
func NewState(jobInfo *apis.JobInfo) State {
job := jobInfo.Job
switch job.Status.State.Phase {
case vcbatch.Pending:
return &pendingState{job: jobInfo}
case vcbatch.Running:
return &runningState{job: jobInfo}
case vcbatch.Restarting:
return &restartingState{job: jobInfo}
case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
return &finishedState{job: jobInfo}
case vcbatch.Terminating:
return &terminatingState{job: jobInfo}
case vcbatch.Aborting:
return &abortingState{job: jobInfo}
case vcbatch.Aborted:
return &abortedState{job: jobInfo}
case vcbatch.Completing:
return &completingState{job: jobInfo}
}
return &pendingState{job: jobInfo}
}
拿 pending state 待处理方法举例说明,异常状态都走 killjob
, 而正常状态则走 syncjob
.
其他 state 实现类似,根据不同的 action 走 killjob 或 syncjob。
type pendingState struct {
job *apis.JobInfo
}
func (ps *pendingState) Execute(action v1alpha1.Action) error {
switch action {
case v1alpha1.RestartJobAction:
// 如果需要重启, 则在 killjob 里判断 restart 次数,满足阈值则进行 pod 删除.
return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool {
status.RetryCount++
status.State.Phase = vcbatch.Restarting
return true
})
case v1alpha1.AbortJobAction:
// 终止,则直接进行 pod 删除.
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Aborting
return true
})
case v1alpha1.CompleteJobAction:
// 当任务已完成,则进行收尾删除.
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Completing
return true
})
case v1alpha1.TerminateJobAction:
// 任务终止,也需要进行删除。
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {
status.State.Phase = vcbatch.Terminating
return true
})
default:
// 其他 action,都走 syncjob,该逻辑主要是 reconcile 调和.
return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
status.State.Phase = vcbatch.Running
return true
}
return false
})
}
}
pg 的逻辑比较简单,就是维护 podgroup pod.
启动 pod 和 pg informer,等待着两个 informer 数据同步完毕后,则启动一个协程执行 worker.
func (pg *pgcontroller) Run(stopCh <-chan struct{}) {
go pg.podInformer.Informer().Run(stopCh)
go pg.pgInformer.Informer().Run(stopCh)
cache.WaitForCacheSync(stopCh, pg.podSynced, pg.pgSynced)
go wait.Until(pg.worker, 0, stopCh)
klog.Infof("PodgroupController is running ...... ")
}
func (pg *pgcontroller) worker() {
for pg.processNextReq() {
}
}
func (pg *pgcontroller) processNextReq() bool {
obj, shutdown := pg.queue.Get()
if shutdown {
return false
}
req := obj.(podRequest)
defer pg.queue.Done(req)
// 从 pod informer lister 获取 pg 关联的 pod 对象.
pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)
if err != nil {
return true
}
// 如果 pg 不存在, 则进行创建
if err := pg.createNormalPodPGIfNotExist(pod); err != nil {
pg.queue.AddRateLimited(req)
return true
}
// If no error, forget it.
pg.queue.Forget(req)
return true
}
volcano 社区没以前活跃了,几个月前提交的代码,现在都没有合并。😅