diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index e606b17349..9c12b69ef3 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -260,6 +260,17 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a } } + // before allocating to a node, run prebind fn + if err := ssn.PreBindFn(task, bestNode.Name); err != nil { + klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err) + // If prebind failed, we need to unallocate the task + err = stmt.Unallocate(task) + if err != nil { + klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + } + continue + } + // Allocate idle resource to the task. if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) { klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 8daa720031..b5cd9afa65 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -43,6 +43,7 @@ func (backfill *Action) Initialize() {} func (backfill *Action) Execute(ssn *framework.Session) { klog.V(5).Infof("Enter Backfill ...") defer klog.V(5).Infof("Leaving Backfill ...") + stmt := framework.NewStatement(ssn) predicateFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { var statusSets api.StatusSets @@ -91,6 +92,17 @@ func (backfill *Action) Execute(ssn *framework.Session) { } } + // before allocating to a node, run prebind fn + if err := ssn.PreBindFn(task, node.Name); err != nil { + klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err) + // If prebind failed, we need to unallocate the task + err = stmt.Unallocate(task) + if err != nil { + klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + } + continue + } + klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node); err != nil { klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 7c52481ce5..c5ee45a746 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -277,3 +277,6 @@ type VictimTasksFn func([]*TaskInfo) []*TaskInfo // AllocatableFn is the func declaration used to check whether the task can be allocated type AllocatableFn func(*QueueInfo, *TaskInfo) bool + +// PreBindFn is the func declaration used to do pre-bind operations. +type PreBindFn func(task *TaskInfo, nodeName string) error diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 34ada7c541..971ace30d4 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -101,6 +101,7 @@ type Session struct { reservedNodesFns map[string]api.ReservedNodesFn victimTasksFns map[string][]api.VictimTasksFn jobStarvingFns map[string]api.ValidateFn + preBindFns map[string]api.PreBindFn } func openSession(cache cache.Cache) *Session { diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index a95fc80b28..694584dec5 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -150,6 +150,11 @@ func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) { ssn.jobStarvingFns[name] = fn } +// AddPreBindFns add preBindFns function +func (ssn *Session) AddPreBindFns(name string, fn api.PreBindFn) { + ssn.preBindFns[name] = fn +} + // Reclaimable invoke reclaimable function of the plugins func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo @@ -807,3 +812,21 @@ func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.Pri } return victimsQueue } + +// PreBindFn invoke pre-bind function of the plugins +// is called before binding a task to a node. +func (ssn *Session) PreBindFn(task *api.TaskInfo, node string) error { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + pfn, found := ssn.preBindFns[plugin.Name] + if !found { + continue + } + err := pfn(task, node) + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 22bd7ebb92..31d490e7ec 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -311,6 +311,11 @@ func (s *Statement) allocate(task *api.TaskInfo) error { return nil } +// Unallocate is called when the prebind function fails +func (s *Statement) Unallocate(task *api.TaskInfo) error { + return s.unallocate(task) +} + // unallocate the pod for task func (s *Statement) unallocate(task *api.TaskInfo) error { s.ssn.cache.RevertVolumes(task, task.PodVolumes)