Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add prebind for scheduler #3621

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}
}

// before allocating to a node, run prebind fn
if err := ssn.PreBindFn(task, bestNode.Name); err != nil {
klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err)
// If prebind failed, we need to unallocate the task
err = stmt.Unallocate(task)
if err != nil {
klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}
continue
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name)
Expand Down
12 changes: 12 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (backfill *Action) Initialize() {}
func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")
stmt := framework.NewStatement(ssn)

predicateFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets api.StatusSets
Expand Down Expand Up @@ -91,6 +92,17 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}
}

// before allocating to a node, run prebind fn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will preBind be used for? And why don't put it in ssn.Allocate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue detail: #3618

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs @Monokaix to help explain.
I think it is a method to expand to adapt to k8s plugins.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because k8s is always changing, I'm not sure if this incremental adaptation is a good design approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in issue,without preBind plugin's logic is scattered in framework,for volumeBinidng plugin, both framewok and plugin hold cache and lead to inconsistent cache and memory leak, it's also unacceptable.
If you guys have better thoughts, any suggestions are welcome: )

if err := ssn.PreBindFn(task, node.Name); err != nil {
klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err)
// If prebind failed, we need to unallocate the task
err = stmt.Unallocate(task)
if err != nil {
klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}
continue
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,6 @@ type VictimTasksFn func([]*TaskInfo) []*TaskInfo

// AllocatableFn is the func declaration used to check whether the task can be allocated
type AllocatableFn func(*QueueInfo, *TaskInfo) bool

// PreBindFn is the func declaration used to do pre-bind operations.
type PreBindFn func(task *TaskInfo, nodeName string) error
1 change: 1 addition & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Session struct {
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
preBindFns map[string]api.PreBindFn
}

func openSession(cache cache.Cache) *Session {
Expand Down
23 changes: 23 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) {
ssn.jobStarvingFns[name] = fn
}

// AddPreBindFns add preBindFns function
func (ssn *Session) AddPreBindFns(name string, fn api.PreBindFn) {
ssn.preBindFns[name] = fn
}

// Reclaimable invoke reclaimable function of the plugins
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
Expand Down Expand Up @@ -807,3 +812,21 @@ func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.Pri
}
return victimsQueue
}

// PreBindFn invoke pre-bind function of the plugins
// is called before binding a task to a node.
func (ssn *Session) PreBindFn(task *api.TaskInfo, node string) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
pfn, found := ssn.preBindFns[plugin.Name]
if !found {
continue
}
err := pfn(task, node)
if err != nil {
return err
}
}
}
return nil
}
5 changes: 5 additions & 0 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ func (s *Statement) allocate(task *api.TaskInfo) error {
return nil
}

// Unallocate is called when the prebind function fails
func (s *Statement) Unallocate(task *api.TaskInfo) error {
return s.unallocate(task)
}

// unallocate the pod for task
func (s *Statement) unallocate(task *api.TaskInfo) error {
s.ssn.cache.RevertVolumes(task, task.PodVolumes)
Expand Down
Loading