diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index fccedddabc..59591af83a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -2562,45 +2562,153 @@ func TestEntryOrdering(t *testing.T) { }, }, } + inputForOrderingPreemptedWorkloads := []entry{ + { + Info: workload.Info{ + Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ + Name: "old-mid-recently-preempted-in-queue", + CreationTimestamp: metav1.NewTime(now), + }, Spec: kueue.WorkloadSpec{ + Priority: ptr.To[int32](1), + }, Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadPreempted, + Status: metav1.ConditionTrue, + Reason: kueue.InClusterQueueReason, + LastTransitionTime: metav1.NewTime(now.Add(5 * time.Second)), + }, + }, + }}, + }, + }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ + Name: "old-mid-recently-reclaimed-while-borrowing", + CreationTimestamp: metav1.NewTime(now), + }, Spec: kueue.WorkloadSpec{ + Priority: ptr.To[int32](1), + }, Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadPreempted, + Status: metav1.ConditionTrue, + Reason: kueue.InCohortReclaimWhileBorrowingReason, + LastTransitionTime: metav1.NewTime(now.Add(6 * time.Second)), + }, + }, + }}, + }, + }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ + Name: "old-mid-more-recently-reclaimed-while-borrowing", + CreationTimestamp: metav1.NewTime(now), + }, Spec: kueue.WorkloadSpec{ + Priority: ptr.To[int32](1), + }, Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadPreempted, + Status: metav1.ConditionTrue, + Reason: kueue.InCohortReclaimWhileBorrowingReason, + LastTransitionTime: metav1.NewTime(now.Add(7 * time.Second)), + }, + }, + }}, + }, + }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ + Name: "old-mid-not-preempted-yet", + CreationTimestamp: metav1.NewTime(now.Add(time.Second)), + }, Spec: kueue.WorkloadSpec{ + Priority: ptr.To[int32](1), + }}, + }, + }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ + Name: "preemptor", + CreationTimestamp: metav1.NewTime(now.Add(7 * time.Second)), + }, Spec: kueue.WorkloadSpec{ + Priority: ptr.To[int32](2), + }}, + }, + }, + } for _, tc := range []struct { name string + input []entry prioritySorting bool workloadOrdering workload.Ordering wantOrder []string }{ { name: "Priority sorting is enabled (default) using pods-ready Eviction timestamp (default)", + input: input, prioritySorting: true, workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}, wantOrder: []string{"new_high_pri", "old", "recently_evicted", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"}, }, { name: "Priority sorting is enabled (default) using pods-ready Creation timestamp", + input: input, prioritySorting: true, workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp}, wantOrder: []string{"new_high_pri", "recently_evicted", "old", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"}, }, { name: "Priority sorting is disabled using pods-ready Eviction timestamp", + input: input, prioritySorting: false, workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}, wantOrder: []string{"old", "recently_evicted", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"}, }, { name: "Priority sorting is disabled using pods-ready Creation timestamp", + input: input, prioritySorting: false, workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp}, wantOrder: []string{"recently_evicted", "old", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"}, }, + { + name: "Some workloads are preempted; Priority sorting is disabled", + input: inputForOrderingPreemptedWorkloads, + prioritySorting: false, + wantOrder: []string{ + "old-mid-recently-preempted-in-queue", + "old-mid-not-preempted-yet", + "old-mid-recently-reclaimed-while-borrowing", + "preemptor", + "old-mid-more-recently-reclaimed-while-borrowing", + }, + }, + { + name: "Some workloads are preempted; Priority sorting is enabled", + input: inputForOrderingPreemptedWorkloads, + prioritySorting: true, + wantOrder: []string{ + "preemptor", + "old-mid-recently-preempted-in-queue", + "old-mid-recently-reclaimed-while-borrowing", + "old-mid-more-recently-reclaimed-while-borrowing", + "old-mid-not-preempted-yet", + }, + }, } { t.Run(tc.name, func(t *testing.T) { t.Cleanup(features.SetFeatureGateDuringTest(t, features.PrioritySortingWithinCohort, tc.prioritySorting)) sort.Sort(entryOrdering{ - entries: input, + entries: tc.input, workloadOrdering: tc.workloadOrdering}, ) - order := make([]string, len(input)) - for i, e := range input { + order := make([]string, len(tc.input)) + for i, e := range tc.input { order[i] = e.Obj.Name } if diff := cmp.Diff(tc.wantOrder, order); diff != "" { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 5e26391b91..3b40a6589d 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -540,6 +540,15 @@ func (o Ordering) GetQueueOrderTimestamp(w *kueue.Workload) *metav1.Time { return &evictedCond.LastTransitionTime } } + if !features.Enabled(features.PrioritySortingWithinCohort) { + if preemptedCond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadPreempted); preemptedCond != nil && + preemptedCond.Status == metav1.ConditionTrue && + preemptedCond.Reason == kueue.InCohortReclaimWhileBorrowingReason { + // We add an epsilon to make sure the timestamp of the preempted + // workload is strictly greater that the preemptor's + return &metav1.Time{Time: preemptedCond.LastTransitionTime.Add(time.Millisecond)} + } + } return &w.CreationTimestamp } diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 865edfaf21..1a1d98d34a 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -783,4 +783,121 @@ var _ = ginkgo.Describe("Preemption", func() { testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "alpha", "4").Obj()) }) }) + + ginkgo.Context("When borrowWithinCohort is used and PrioritySortingWithinCohort disabled", func() { + var ( + aCQ, bCQ, cCQ *kueue.ClusterQueue + aLQ, bLQ *kueue.LocalQueue + defaultFlavor *kueue.ResourceFlavor + ) + + ginkgo.BeforeEach(func() { + gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, false)).To(gomega.Succeed()) + defaultFlavor = testing.MakeResourceFlavor("default").Obj() + gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed()) + + aCQ = testing.MakeClusterQueue("a-cq"). + Cohort("all"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "0", "10").Obj(), + ). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.Preempt, + }). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, aCQ)).To(gomega.Succeed()) + aLQ = testing.MakeLocalQueue("a-lq", ns.Name).ClusterQueue(aCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, aLQ)).To(gomega.Succeed()) + + bCQ = testing.MakeClusterQueue("b-cq"). + Cohort("all"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5", "5").Obj(), + ). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.Preempt, + }). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + BorrowWithinCohort: &kueue.BorrowWithinCohort{ + Policy: kueue.BorrowWithinCohortPolicyLowerPriority, + MaxPriorityThreshold: ptr.To(veryHighPriority), + }, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, bCQ)).To(gomega.Succeed()) + bLQ = testing.MakeLocalQueue("b-lq", ns.Name).ClusterQueue(bCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, bLQ)).To(gomega.Succeed()) + + cCQ = testing.MakeClusterQueue("c-cq"). + Cohort("all"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5").Obj(), + ). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.Preempt, + }). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, cCQ)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, true)).To(gomega.Succeed()) + gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(ctx, k8sClient, aCQ, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, bCQ, true) + util.ExpectObjectToBeDeleted(ctx, k8sClient, cCQ, true) + }) + + ginkgo.It("should allow preempting workloads while borrowing", func() { + var aWl, b1Wl, b2Wl *kueue.Workload + + ginkgo.By("Create a mid priority workload in aCQ and await for admission", func() { + aWl = testing.MakeWorkload("a-low", ns.Name). + Queue(aLQ.Name). + Priority(midPriority). + Request(corev1.ResourceCPU, "4"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aWl)).To(gomega.Succeed()) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, aWl) + }) + + ginkgo.By("Create a high priority workload b1 in bCQ and await for admission", func() { + b1Wl = testing.MakeWorkload("b1-high", ns.Name). + Queue(bLQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "4"). + Obj() + gomega.Expect(k8sClient.Create(ctx, b1Wl)).To(gomega.Succeed()) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b1Wl) + }) + + ginkgo.By("Create a high priority workload b2 in bCQ", func() { + b2Wl = testing.MakeWorkload("b2-high", ns.Name). + Queue(bLQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "4"). + Obj() + gomega.Expect(k8sClient.Create(ctx, b2Wl)).To(gomega.Succeed()) + }) + + ginkgo.By("Await for preemption of the workload in aCQ and admission of b2", func() { + util.FinishEvictionForWorkloads(ctx, k8sClient, aWl) + util.ExpectWorkloadsToBePending(ctx, k8sClient, aWl) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b2Wl) + }) + }) + }) })