Skip to content

Commit

Permalink
Fix when PrioritySortingWithingCohort is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Aug 12, 2024
1 parent 57f8a72 commit f09db5f
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 3 deletions.
114 changes: 111 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2562,45 +2562,153 @@ func TestEntryOrdering(t *testing.T) {
},
},
}
inputForOrderingPreemptedWorkloads := []entry{
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-recently-preempted-in-queue",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InClusterQueueReason,
LastTransitionTime: metav1.NewTime(now.Add(5 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-recently-reclaimed-while-borrowing",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InCohortReclaimWhileBorrowingReason,
LastTransitionTime: metav1.NewTime(now.Add(6 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-more-recently-reclaimed-while-borrowing",
CreationTimestamp: metav1.NewTime(now),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}, Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadPreempted,
Status: metav1.ConditionTrue,
Reason: kueue.InCohortReclaimWhileBorrowingReason,
LastTransitionTime: metav1.NewTime(now.Add(7 * time.Second)),
},
},
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "old-mid-not-preempted-yet",
CreationTimestamp: metav1.NewTime(now.Add(time.Second)),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](1),
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "preemptor",
CreationTimestamp: metav1.NewTime(now.Add(7 * time.Second)),
}, Spec: kueue.WorkloadSpec{
Priority: ptr.To[int32](2),
}},
},
},
}
for _, tc := range []struct {
name string
input []entry
prioritySorting bool
workloadOrdering workload.Ordering
wantOrder []string
}{
{
name: "Priority sorting is enabled (default) using pods-ready Eviction timestamp (default)",
input: input,
prioritySorting: true,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp},
wantOrder: []string{"new_high_pri", "old", "recently_evicted", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is enabled (default) using pods-ready Creation timestamp",
input: input,
prioritySorting: true,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp},
wantOrder: []string{"new_high_pri", "recently_evicted", "old", "new", "high_pri_borrowing", "old_borrowing", "evicted_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is disabled using pods-ready Eviction timestamp",
input: input,
prioritySorting: false,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp},
wantOrder: []string{"old", "recently_evicted", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"},
},
{
name: "Priority sorting is disabled using pods-ready Creation timestamp",
input: input,
prioritySorting: false,
workloadOrdering: workload.Ordering{PodsReadyRequeuingTimestamp: config.CreationTimestamp},
wantOrder: []string{"recently_evicted", "old", "new", "new_high_pri", "old_borrowing", "evicted_borrowing", "high_pri_borrowing", "new_borrowing"},
},
{
name: "Some workloads are preempted; Priority sorting is disabled",
input: inputForOrderingPreemptedWorkloads,
prioritySorting: false,
wantOrder: []string{
"old-mid-recently-preempted-in-queue",
"old-mid-not-preempted-yet",
"old-mid-recently-reclaimed-while-borrowing",
"preemptor",
"old-mid-more-recently-reclaimed-while-borrowing",
},
},
{
name: "Some workloads are preempted; Priority sorting is enabled",
input: inputForOrderingPreemptedWorkloads,
prioritySorting: true,
wantOrder: []string{
"preemptor",
"old-mid-recently-preempted-in-queue",
"old-mid-recently-reclaimed-while-borrowing",
"old-mid-more-recently-reclaimed-while-borrowing",
"old-mid-not-preempted-yet",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Cleanup(features.SetFeatureGateDuringTest(t, features.PrioritySortingWithinCohort, tc.prioritySorting))
sort.Sort(entryOrdering{
entries: input,
entries: tc.input,
workloadOrdering: tc.workloadOrdering},
)
order := make([]string, len(input))
for i, e := range input {
order := make([]string, len(tc.input))
for i, e := range tc.input {
order[i] = e.Obj.Name
}
if diff := cmp.Diff(tc.wantOrder, order); diff != "" {
Expand Down
9 changes: 9 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,15 @@ func (o Ordering) GetQueueOrderTimestamp(w *kueue.Workload) *metav1.Time {
return &evictedCond.LastTransitionTime
}
}
if !features.Enabled(features.PrioritySortingWithinCohort) {
if preemptedCond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadPreempted); preemptedCond != nil &&
preemptedCond.Status == metav1.ConditionTrue &&
preemptedCond.Reason == kueue.InCohortReclaimWhileBorrowingReason {
// We add an epsilon to make sure the timestamp of the preempted
// workload is strictly greater that the preemptor's
return &metav1.Time{Time: preemptedCond.LastTransitionTime.Add(time.Millisecond)}
}
}
return &w.CreationTimestamp
}

Expand Down
117 changes: 117 additions & 0 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,4 +783,121 @@ var _ = ginkgo.Describe("Preemption", func() {
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "alpha", "4").Obj())
})
})

ginkgo.Context("When borrowWithinCohort is used and PrioritySortingWithinCohort disabled", func() {
var (
aCQ, bCQ, cCQ *kueue.ClusterQueue
aLQ, bLQ *kueue.LocalQueue
defaultFlavor *kueue.ResourceFlavor
)

ginkgo.BeforeEach(func() {
gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, false)).To(gomega.Succeed())
defaultFlavor = testing.MakeResourceFlavor("default").Obj()
gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed())

aCQ = testing.MakeClusterQueue("a-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "0", "10").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, aCQ)).To(gomega.Succeed())
aLQ = testing.MakeLocalQueue("a-lq", ns.Name).ClusterQueue(aCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, aLQ)).To(gomega.Succeed())

bCQ = testing.MakeClusterQueue("b-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5", "5").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
MaxPriorityThreshold: ptr.To(veryHighPriority),
},
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, bCQ)).To(gomega.Succeed())
bLQ = testing.MakeLocalQueue("b-lq", ns.Name).ClusterQueue(bCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, bLQ)).To(gomega.Succeed())

cCQ = testing.MakeClusterQueue("c-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "5").Obj(),
).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, cCQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(features.SetEnable(features.PrioritySortingWithinCohort, true)).To(gomega.Succeed())
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, aCQ, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, bCQ, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, cCQ, true)
})

ginkgo.It("should allow preempting workloads while borrowing", func() {
var aWl, b1Wl, b2Wl *kueue.Workload

ginkgo.By("Create a mid priority workload in aCQ and await for admission", func() {
aWl = testing.MakeWorkload("a-low", ns.Name).
Queue(aLQ.Name).
Priority(midPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, aWl)).To(gomega.Succeed())
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, aWl)
})

ginkgo.By("Create a high priority workload b1 in bCQ and await for admission", func() {
b1Wl = testing.MakeWorkload("b1-high", ns.Name).
Queue(bLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, b1Wl)).To(gomega.Succeed())
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b1Wl)
})

ginkgo.By("Create a high priority workload b2 in bCQ", func() {
b2Wl = testing.MakeWorkload("b2-high", ns.Name).
Queue(bLQ.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, b2Wl)).To(gomega.Succeed())
})

ginkgo.By("Await for preemption of the workload in aCQ and admission of b2", func() {
util.FinishEvictionForWorkloads(ctx, k8sClient, aWl)
util.ExpectWorkloadsToBePending(ctx, k8sClient, aWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, b2Wl)
})
})
})
})

0 comments on commit f09db5f

Please sign in to comment.