From 2285aad0736cb81ff4d430296613eb407f61f34b Mon Sep 17 00:00:00 2001 From: Constantin Macaria Date: Wed, 5 Apr 2023 11:04:15 +0300 Subject: [PATCH] Add evicted condition --- apis/kueue/v1beta1/workload_types.go | 13 ++++ pkg/controller/core/workload_controller.go | 26 +++++-- pkg/queue/cluster_queue_best_effort_fifo.go | 2 +- pkg/queue/cluster_queue_impl.go | 5 +- pkg/queue/cluster_queue_impl_test.go | 16 ++--- pkg/queue/cluster_queue_strict_fifo.go | 11 +-- pkg/queue/cluster_queue_strict_fifo_test.go | 28 ++++++++ pkg/scheduler/preemption/preemption.go | 7 ++ pkg/scheduler/scheduler.go | 4 +- pkg/scheduler/scheduler_test.go | 51 ++++++++++++- pkg/workload/workload.go | 13 ++++ pkg/workload/workload_test.go | 49 +++++++++++++ .../scheduler/podsready/scheduler_test.go | 72 +++++++++++++++++++ 13 files changed, 274 insertions(+), 23 deletions(-) diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go index b24c1e1cdc2..52821f1014f 100644 --- a/apis/kueue/v1beta1/workload_types.go +++ b/apis/kueue/v1beta1/workload_types.go @@ -137,6 +137,19 @@ const ( // WorkloadPodsReady means that at least `.spec.podSets[*].count` Pods are // ready or have succeeded. WorkloadPodsReady = "PodsReady" + + // WorkloadEvicted means that the Workload was evicted by a ClusterQueue + WorkloadEvicted = "Evicted" +) + +const ( + // WorkloadEvictedByPreemption indicates that the workload was evicted + // in order to free resources for a workload with a higher priority. + WorkloadEvictedByPreemption = "Preempted" + + // WorkloadEvictedByPodsReadyTimeout indicates that the eviction took + // place due to a PodsReady timeout. + WorkloadEvictedByPodsReadyTimeout = "PodsReadyTimeout" ) // +kubebuilder:object:root=true diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 60049315ee4..adc1d5ec28f 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -127,6 +127,21 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c ctx = ctrl.LoggerInto(ctx, log) log.V(2).Info("Reconciling Workload") + if eviction := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); eviction != nil && eviction.Status == metav1.ConditionTrue && + eviction.Reason == kueue.WorkloadEvictedByPodsReadyTimeout { + if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) { + log.V(2).Info("Cancelling admission of the workload due to exceeding the PodsReady timeout") + err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl, "Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())) + return ctrl.Result{}, client.IgnoreNotFound(err) + } else { + // finish up the eviction + log.V(2).Info("Reset eviction condition") + err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadEvicted, metav1.ConditionFalse, + kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()), "evict") + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } + if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { return ctrl.Result{}, nil } @@ -159,17 +174,18 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req ctrl.Request, wl *kueue.Workload) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) countingTowardsTimeout, recheckAfter := r.admittedNotReadyWorkload(wl, realClock) if !countingTowardsTimeout { return ctrl.Result{}, nil } if recheckAfter > 0 { - klog.V(4).InfoS("Workload not yet ready and did not exceed its timeout", "workload", req.NamespacedName.String(), "recheckAfter", recheckAfter) + log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter) return ctrl.Result{RequeueAfter: recheckAfter}, nil } else { - klog.V(2).InfoS("Cancelling admission of the workload due to exceeding the PodsReady timeout", "workload", req.NamespacedName.String()) - err := workload.UnsetAdmissionWithCondition(ctx, r.client, wl, - "Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())) + log.V(2).Info("Set evicted condition") + err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadEvicted, metav1.ConditionTrue, + kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()), "evict") return ctrl.Result{}, client.IgnoreNotFound(err) } } @@ -520,7 +536,7 @@ func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object, } } -func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q workqueue.RateLimitingInterface, opts ...client.ListOption) { +func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ workqueue.RateLimitingInterface, opts ...client.ListOption) { log := ctrl.LoggerFrom(ctx) lst := kueue.WorkloadList{} opts = append(opts, client.MatchingFields{indexer.WorkloadAdmittedKey: string(metav1.ConditionFalse)}) diff --git a/pkg/queue/cluster_queue_best_effort_fifo.go b/pkg/queue/cluster_queue_best_effort_fifo.go index cf24c8218d0..cc73471c66e 100644 --- a/pkg/queue/cluster_queue_best_effort_fifo.go +++ b/pkg/queue/cluster_queue_best_effort_fifo.go @@ -30,7 +30,7 @@ type ClusterQueueBestEffortFIFO struct { var _ ClusterQueue = &ClusterQueueBestEffortFIFO{} func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) { - cqImpl := newClusterQueueImpl(keyFunc, byCreationTime) + cqImpl := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) cqBE := &ClusterQueueBestEffortFIFO{ clusterQueueBase: cqImpl, } diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index e1d001bacf1..a6867e14343 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -21,6 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -90,7 +91,9 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) { if oldInfo != nil { // update in place if the workload was inadmissible and didn't change // to potentially become admissible. - if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) { + if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) && + equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted), + apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) { c.inadmissibleWorkloads[key] = wInfo return } diff --git a/pkg/queue/cluster_queue_impl_test.go b/pkg/queue/cluster_queue_impl_test.go index e17d2f065c8..cae6db5198b 100644 --- a/pkg/queue/cluster_queue_impl_test.go +++ b/pkg/queue/cluster_queue_impl_test.go @@ -37,7 +37,7 @@ const ( ) func Test_PushOrUpdate(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() if cq.Pending() != 0 { t.Error("ClusterQueue should be empty") @@ -57,7 +57,7 @@ func Test_PushOrUpdate(t *testing.T) { } func Test_Pop(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) now := time.Now() wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj()) wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj()) @@ -80,7 +80,7 @@ func Test_Pop(t *testing.T) { } func Test_Delete(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj() cq.PushOrUpdate(workload.NewInfo(wl1)) @@ -101,7 +101,7 @@ func Test_Delete(t *testing.T) { } func Test_Info(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil { t.Error("workload doesn't exist") @@ -113,7 +113,7 @@ func Test_Info(t *testing.T) { } func Test_AddFromLocalQueue(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() queue := &LocalQueue{ items: map[string]*workload.Info{ @@ -131,7 +131,7 @@ func Test_AddFromLocalQueue(t *testing.T) { } func Test_DeleteFromLocalQueue(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj() qImpl := newLocalQueue(q) wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj() @@ -261,7 +261,7 @@ func TestClusterQueueImpl(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) err := cq.Update(utiltesting.MakeClusterQueue("cq"). NamespaceSelector(&metav1.LabelSelector{ @@ -315,7 +315,7 @@ func TestClusterQueueImpl(t *testing.T) { } func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) { - cq := newClusterQueueImpl(keyFunc, byCreationTime) + cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) cq.namespaceSelector = labels.Everything() wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() cl := utiltesting.NewFakeClient( diff --git a/pkg/queue/cluster_queue_strict_fifo.go b/pkg/queue/cluster_queue_strict_fifo.go index c1772375b96..15963ba6095 100644 --- a/pkg/queue/cluster_queue_strict_fifo.go +++ b/pkg/queue/cluster_queue_strict_fifo.go @@ -31,7 +31,7 @@ type ClusterQueueStrictFIFO struct { var _ ClusterQueue = &ClusterQueueStrictFIFO{} func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) { - cqImpl := newClusterQueueImpl(keyFunc, byCreationTime) + cqImpl := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime) cqStrict := &ClusterQueueStrictFIFO{ clusterQueueBase: cqImpl, } @@ -40,10 +40,10 @@ func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) { return cqStrict, err } -// byCreationTime is the function used by the clusterQueue heap algorithm to sort +// byCreationOrEvictionTime is the function used by the clusterQueue heap algorithm to sort // workloads. It sorts workloads based on their priority. // When priorities are equal, it uses workloads.creationTimestamp. -func byCreationTime(a, b interface{}) bool { +func byCreationOrEvictionTime(a, b interface{}) bool { objA := a.(*workload.Info) objB := b.(*workload.Info) p1 := utilpriority.Priority(objA.Obj) @@ -52,7 +52,10 @@ func byCreationTime(a, b interface{}) bool { if p1 != p2 { return p1 > p2 } - return objA.Obj.CreationTimestamp.Before(&objB.Obj.CreationTimestamp) + + tA := workload.GetSchedulingTimestamp(objA.Obj) + tB := workload.GetSchedulingTimestamp(objB.Obj) + return !tB.Before(tA) } // RequeueIfNotPresent requeues if the workload is not present. diff --git a/pkg/queue/cluster_queue_strict_fifo_test.go b/pkg/queue/cluster_queue_strict_fifo_test.go index 504ba3c555e..d495d5dc62d 100644 --- a/pkg/queue/cluster_queue_strict_fifo_test.go +++ b/pkg/queue/cluster_queue_strict_fifo_test.go @@ -100,6 +100,7 @@ func TestFIFOClusterQueue(t *testing.T) { func TestStrictFIFO(t *testing.T) { t1 := time.Now() t2 := t1.Add(time.Second) + t3 := t2.Add(time.Second) for _, tt := range []struct { name string w1 *kueue.Workload @@ -146,6 +147,33 @@ func TestStrictFIFO(t *testing.T) { }, expected: "w1", }, + { + name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time but w1 was evicted", + w1: &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(t3), + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + Message: "by test", + }, + }, + }, + }, + w2: &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + }, + expected: "w2", + }, { name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time", w1: &kueue.Workload{ diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index fa3895e7faf..02e492c16cc 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -141,6 +141,13 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In errCh.SendErrorWithCancel(err, cancel) return } + + err = workload.UpdateStatus(ctx, p.client, target.Obj, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "Preempted to accommodate a higher priority Workload", "evict") + if err != nil { + errCh.SendErrorWithCancel(err, cancel) + return + } + origin := "ClusterQueue" if cq.Name != target.ClusterQueue { origin = "cohort" diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 983fcaf9034..93a45f4e2e8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -337,7 +337,9 @@ func (e entryOrdering) Less(i, j int) bool { return !aBorrows } // 2. FIFO. - return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp) + aComparisonTimestamp := workload.GetSchedulingTimestamp(a.Obj) + bComparisonTimestamp := workload.GetSchedulingTimestamp(b.Obj) + return aComparisonTimestamp.Before(bComparisonTimestamp) } func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e entry) { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 19c915935ca..73d3a49640e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -737,7 +737,7 @@ func TestEntryOrdering(t *testing.T) { Info: workload.Info{ Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ Name: "gamma", - CreationTimestamp: metav1.NewTime(now.Add(2 * time.Second)), + CreationTimestamp: metav1.NewTime(now.Add(3 * time.Second)), }}, }, }, @@ -745,7 +745,7 @@ func TestEntryOrdering(t *testing.T) { Info: workload.Info{ Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{ Name: "delta", - CreationTimestamp: metav1.NewTime(now.Add(time.Second)), + CreationTimestamp: metav1.NewTime(now.Add(3 * time.Second)), }}, }, assignment: flavorassigner.Assignment{ @@ -754,13 +754,58 @@ func TestEntryOrdering(t *testing.T) { }, }, }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "epsilon", + CreationTimestamp: metav1.NewTime(now), + }, + Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now.Add(2 * time.Second)), + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + }, + }, + }, + }, + }, + assignment: flavorassigner.Assignment{ + TotalBorrow: cache.FlavorResourceQuantities{ + "flavor": {}, + }, + }, + }, + { + Info: workload.Info{ + Obj: &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "zeta", + CreationTimestamp: metav1.NewTime(now.Add(2 * time.Second)), + }, + Status: kueue.WorkloadStatus{ + Conditions: []metav1.Condition{ + { + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(now.Add(2 * time.Second)), + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + }, + }, + }, + }, + }, + }, } sort.Sort(entryOrdering(input)) order := make([]string, len(input)) for i, e := range input { order[i] = e.Obj.Name } - wantOrder := []string{"beta", "gamma", "alpha", "delta"} + wantOrder := []string{"beta", "zeta", "gamma", "alpha", "epsilon", "delta"} if diff := cmp.Diff(wantOrder, order); diff != "" { t.Errorf("Unexpected order (-want,+got):\n%s", diff) } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index cad03ab1c18..f7fc32f61b4 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -294,3 +294,16 @@ func AdmissionPatch(w *kueue.Workload) *kueue.Workload { wlCopy.Status.Admission = w.Status.Admission.DeepCopy() return wlCopy } + +// GetSchedulingTimestamp return the timestamp to be used by the scheduler. It could +// be the workload creation time or the last time a PodsReady timeout has occurred. +func GetSchedulingTimestamp(w *kueue.Workload) *metav1.Time { + evictedConditionPosition := FindConditionIndex(&w.Status, kueue.WorkloadEvicted) + if evictedConditionPosition != -1 { + evictedCondition := w.Status.Conditions[evictedConditionPosition] + if evictedCondition.Reason == kueue.WorkloadEvictedByPodsReadyTimeout { + return &w.Status.Conditions[evictedConditionPosition].LastTransitionTime + } + } + return &w.CreationTimestamp +} diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 939366d5936..59323eb8a10 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -19,6 +19,7 @@ package workload import ( "context" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -310,3 +311,51 @@ func containersForRequests(requests ...map[corev1.ResourceName]string) []corev1. } return containers } + +func TestSchedulingTimestamp(t *testing.T) { + creationTime := metav1.Now() + conditionTime := metav1.NewTime(time.Now().Add(time.Hour)) + cases := map[string]struct { + wl *kueue.Workload + want metav1.Time + }{ + "no condition": { + wl: utiltesting.MakeWorkload("name", "ns"). + Creation(creationTime.Time). + Obj(), + want: creationTime, + }, + "evicted by preemption": { + wl: utiltesting.MakeWorkload("name", "ns"). + Creation(creationTime.Time). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: conditionTime, + Reason: kueue.WorkloadEvictedByPreemption, + }). + Obj(), + want: creationTime, + }, + "evicted by PodsReady timeout": { + wl: utiltesting.MakeWorkload("name", "ns"). + Creation(creationTime.Time). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: conditionTime, + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + }). + Obj(), + want: conditionTime, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + gotTime := GetSchedulingTimestamp(tc.wl) + if diff := cmp.Diff(*gotTime, tc.want); diff != "" { + t.Errorf("Unexpected time (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/test/integration/scheduler/podsready/scheduler_test.go b/test/integration/scheduler/podsready/scheduler_test.go index 61441804d31..0d2e916b43c 100644 --- a/test/integration/scheduler/podsready/scheduler_test.go +++ b/test/integration/scheduler/podsready/scheduler_test.go @@ -18,6 +18,7 @@ package podsready import ( "context" + "fmt" "path/filepath" "time" @@ -301,4 +302,75 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { }) }) + var _ = ginkgo.Context("Short PodsReady timeout", func() { + + ginkgo.BeforeEach(func() { + podsReadyTimeout = 5 * time.Second + }) + + ginkgo.It("Should move the evicted workload at the end of the queue", func() { + // the workloads are created with a 5 cpu resource requirement to ensure only one can fit at a given time, + // letting them all to time out, we should see a circular buffer admission pattern + workloads := []*kueue.Workload{ + testing.MakeWorkload("prod1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj(), + testing.MakeWorkload("prod2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj(), + testing.MakeWorkload("prod3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj(), + testing.MakeWorkload("prod4", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj(), + } + + tkr := time.NewTicker(time.Second) + for _, wl := range workloads { + ginkgo.By(fmt.Sprintf("create the '%s' workload", wl.Name), func() { + gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).Should(gomega.Succeed()) + ginkgo.GinkgoLogr.Info(wl.Name+" created", "at", wl.CreationTimestamp) + // since metav1.Time has only second resolution, wait one second to avoid + // any potential creation time collision + <-tkr.C + }) + } + + getAdmittedWorkloads := func() []int { + ret := make([]int, 0, len(workloads)) + for i, wl := range workloads { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), wl)).Should(gomega.Succeed()) + if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) { + ret = append(ret, i) + } + } + return ret + } + + prev := -1 + gomega.Eventually(func() bool { + admitted := getAdmittedWorkloads() + if len(admitted) > 0 { + prev = admitted[0] + } + return prev != -1 + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + for i := 0; i < 2*len(workloads); i++ { + current := -1 + // wait for a change + ginkgo.By(fmt.Sprintf("checking transition %d, from %d", i, prev), func() { + gomega.Eventually(func() bool { + admitted := getAdmittedWorkloads() + if len(admitted) != 1 { + return false + } + current = admitted[0] + return current != prev + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + if prev < (len(workloads) - 1) { + gomega.Expect(current).To(gomega.Equal(prev + 1)) + } else { + gomega.Expect(current).To(gomega.Equal(0)) + } + }) + prev = current + } + + }) + }) })