Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix using PrioritySortingWithinCohort=false and borrowWithinCohort #2807

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 111 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2562,45 +2562,153 @@ func TestEntryOrdering(t *testing.T) {
},
},
}
inputForOrderingPreemptedWorkloads := []entry{
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-recently-preempted-in-queue",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InClusterQueueReason,
LastTransitionTime: metav1.NewTime(now.Add(5 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-recently-reclaimed-while-borrowing",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InCohortReclaimWhileBorrowingReason,
LastTransitionTime: metav1.NewTime(now.Add(6 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-more-recently-reclaimed-while-borrowing",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InCohortReclaimWhileBorrowingReason,
LastTransitionTime: metav1.NewTime(now.Add(7 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-not-preempted-yet",
CreationTimestamp: metav1.NewTime(now.Add(time.Second)),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "preemptor",
CreationTimestamp: metav1.NewTime(now.Add(7 * time.Second)),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](2),
}},
},
},
}
for _, tc := range []struct {
name string
input []entry
prioritySorting bool
workloadOrdering workload.Ordering
wantOrder []string
}{
{
name: "Priority sorting is enabled (default) using pods-ready Eviction timestamp (default)",
input: input,
prioritySorting: true,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp},
wantOrder: []string{"new_high_pri", "old", "recently_evicted", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is enabled (default) using pods-ready Creation timestamp",
input: input,
prioritySorting: true,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp},
wantOrder: []string{"new_high_pri", "recently_evicted", "old", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is disabled using pods-ready Eviction timestamp",
input: input,
prioritySorting: false,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp},
wantOrder: []string{"old", "recently_evicted", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is disabled using pods-ready Creation timestamp",
input: input,
prioritySorting: false,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp},
wantOrder: []string{"recently_evicted", "old", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"},
},
{
name: "Some workloads are preempted; Priority sorting is disabled",
input: inputForOrderingPreemptedWorkloads,
prioritySorting: false,
wantOrder: []string{
"old-mid-recently-preempted-in-queue",
"old-mid-not-preempted-yet",
"old-mid-recently-reclaimed-while-borrowing",
"preemptor",
"old-mid-more-recently-reclaimed-while-borrowing",
},
},
{
name: "Some workloads are preempted; Priority sorting is enabled",
input: inputForOrderingPreemptedWorkloads,
prioritySorting: true,
wantOrder: []string{
"preemptor",
"old-mid-recently-preempted-in-queue",
"old-mid-recently-reclaimed-while-borrowing",
"old-mid-more-recently-reclaimed-while-borrowing",
"old-mid-not-preempted-yet",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Cleanup(features.SetFeatureGateDuringTest(t, features.PrioritySortingWithinCohort, tc.prioritySorting))
sort.Sort(entryOrdering{
entries: input,
entries: tc.input,
workloadOrdering: tc.workloadOrdering},
)
order := make([]string, len(input))
for i, e := range input {
order := make([]string, len(tc.input))
for i, e := range tc.input {
order[i] = e.Obj.Name
}
if diff := cmp.Diff(tc.wantOrder, order); diff != "" {
Expand Down
9 changes: 9 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,15 @@ func (o Ordering) GetQueueOrderTimestamp(w *kueue.Workload) *metav1.Time {
return &evictedCond.LastTransitionTime
}
}
if !features.Enabled(features.PrioritySortingWithinCohort) {
if preemptedCond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadPreempted); preemptedCond != nil &&
preemptedCond.Status == metav1.ConditionTrue &&
preemptedCond.Reason == kueue.InCohortReclaimWhileBorrowingReason {
// We add an epsilon to make sure the timestamp of the preempted
// workload is strictly greater that the preemptor's
return &metav1.Time{Time: preemptedCond.LastTransitionTime.Add(time.Millisecond)}
}
}
return &w.CreationTimestamp
}

Expand Down
117 changes: 117 additions & 0 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,4 +783,121 @@ var _ = ginkgo.Describe("Preemption", func() {
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "alpha", "4").Obj())
})
})

ginkgo.Context("When borrowWithinCohort is used and PrioritySortingWithinCohort disabled", func() {
var (
aCQ, bCQ, cCQ *kueue.ClusterQueue
aLQ, bLQ *kueue.LocalQueue
defaultFlavor *kueue.ResourceFlavor
)

ginkgo.BeforeEach(func() {
gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, false)).To(gomega.Succeed())
defaultFlavor = testing.MakeResourceFlavor("default").Obj()
gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed())

aCQ = testing.MakeClusterQueue("a-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "0", "10").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, aCQ)).To(gomega.Succeed())
aLQ = testing.MakeLocalQueue("a-lq", ns.Name).ClusterQueue(aCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, aLQ)).To(gomega.Succeed())

bCQ = testing.MakeClusterQueue("b-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5", "5").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To(veryHighPriority),
},
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, bCQ)).To(gomega.Succeed())
bLQ = testing.MakeLocalQueue("b-lq", ns.Name).ClusterQueue(bCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, bLQ)).To(gomega.Succeed())

cCQ = testing.MakeClusterQueue("c-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, cCQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, true)).To(gomega.Succeed())
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, aCQ, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, bCQ, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, cCQ, true)
})

ginkgo.It("should allow preempting workloads while borrowing", func() {
var aWl, b1Wl, b2Wl *kueue.Workload

ginkgo.By("Create a mid priority workload in aCQ and await for admission", func() {
aWl = testing.MakeWorkload("a-low", ns.Name).
Queue(aLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, aWl)).To(gomega.Succeed())
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, aWl)
})

ginkgo.By("Create a high priority workload b1 in bCQ and await for admission", func() {
b1Wl = testing.MakeWorkload("b1-high", ns.Name).
Queue(bLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, b1Wl)).To(gomega.Succeed())
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b1Wl)
})

ginkgo.By("Create a high priority workload b2 in bCQ", func() {
b2Wl = testing.MakeWorkload("b2-high", ns.Name).
Queue(bLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, b2Wl)).To(gomega.Succeed())
})

ginkgo.By("Await for preemption of the workload in aCQ and admission of b2", func() {
util.FinishEvictionForWorkloads(ctx, k8sClient, aWl)
util.ExpectWorkloadsToBePending(ctx, k8sClient, aWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b2Wl)
})
})
})
})