Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce preemptions within ClusterQueue when preemption with borrowing is enabled #2110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ func candidatesOnlyFromQueue(candidates []*workload.Info, clusterQueue string) [
return result
}

func candidatesFromCQOrUnderThreshold(candidates []*workload.Info, clusterQueue string, threshold int32) []*workload.Info {
result := make([]*workload.Info, 0, len(candidates))
for _, wi := range candidates {
if wi.ClusterQueue == clusterQueue || priority.Priority(wi.Obj) < threshold {
result = append(result, wi)
}
}
return result
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
resPerFlv := resourcesRequiringPreemption(assignment)
Expand Down Expand Up @@ -115,6 +125,10 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
// have lower priority, and so they will not preempt the preemptor when
// requeued.
if borrowWithinCohort {
if !queueUnderNominalInAllRequestedResources(wlReq, cq) {
// It can only preempt workloads from another CQ if they are strictly under allowBorrowingBelowPriority.
candidates = candidatesFromCQOrUnderThreshold(candidates, wl.ClusterQueue, *thresholdPrio)
}
return minimalPreemptions(wlReq, cq, snapshot, resPerFlv, candidates, true, thresholdPrio)
}

Expand Down
76 changes: 74 additions & 2 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func TestPreemption(t *testing.T) {
Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyNever,
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To[int32](0),
Expand Down Expand Up @@ -1006,6 +1006,78 @@ func TestPreemption(t *testing.T) {
},
}),
},
"use BorrowWithinCohort; only preempt from CQ if no workloads below threshold and already above nominal": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a_standard_1", "").
Priority(1).
Request(corev1.ResourceCPU, "10").
ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_standard_2", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_1", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_2", "").
Priority(2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Priority(3).
Request(corev1.ResourceCPU, "1").
Obj(),
targetCQ: "b_standard",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/b_standard_1"),
},
"use BorrowWithinCohort; preempt from CQ and from other CQs with workloads below threshold": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b_standard_high", "").
Priority(2).
Request(corev1.ResourceCPU, "10").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "10").Obj()).
Obj(),
*utiltesting.MakeWorkload("b_standard_mid", "").
Priority(1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b_standard").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_best_effort_low", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a_best_effort_lower", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a_best_effort").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Priority(2).
Request(corev1.ResourceCPU, "2").
Obj(),
targetCQ: "b_standard",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/b_standard_mid", "/a_best_effort_lower"),
},
"reclaim quota from lender": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("lend1-low", "").
Expand Down