From bad5983b3b8094b292d556cf93ee4769d53dd158 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 2 Apr 2024 10:58:34 +0200 Subject: [PATCH] Test StrictFiFO misreporting metrics --- pkg/queue/cluster_queue_impl.go | 25 ++++++- pkg/queue/cluster_queue_impl_test.go | 2 +- test/integration/scheduler/scheduler_test.go | 71 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index dc3662cb6b9..2d7ffe45da7 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -51,6 +51,8 @@ type clusterQueueBase struct { // of inadmissible workloads while a workload is being scheduled. popCycle int64 + inScheduling *workload.Info + // queueInadmissibleCycle stores the popId at the time when // QueueInadmissibleWorkloads is called. queueInadmissibleCycle int64 @@ -113,6 +115,7 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) { c.rwm.Lock() defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) + c.cleanInScheduling(key) oldInfo := c.inadmissibleWorkloads[key] if oldInfo != nil { // update in place if the workload was inadmissible and didn't change @@ -153,6 +156,7 @@ func (c *clusterQueueBase) Delete(w *kueue.Workload) { key := workload.Key(w) delete(c.inadmissibleWorkloads, key) c.heap.Delete(key) + c.cleanInScheduling(key) } func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) { @@ -178,6 +182,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b c.rwm.Lock() defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) + c.cleanInScheduling(key) if c.backoffWaitingTimeExpired(wInfo) && (immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors()) { // If the workload was inadmissible, move it back into the queue. @@ -202,6 +207,12 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b return true } +func (c *clusterQueueBase) cleanInScheduling(key string) { + if c.inScheduling != nil && workload.Key(c.inScheduling.Obj) == key { + c.inScheduling = nil + } +} + // QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. // If at least one workload is moved, returns true, otherwise returns false. func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool { @@ -235,7 +246,11 @@ func (c *clusterQueueBase) Pending() int { } func (c *clusterQueueBase) PendingActive() int { - return c.heap.Len() + result := c.heap.Len() + if c.inScheduling != nil { + result += 1 + } + return result } func (c *clusterQueueBase) PendingInadmissible() int { @@ -247,10 +262,11 @@ func (c *clusterQueueBase) Pop() *workload.Info { defer c.rwm.Unlock() c.popCycle++ if c.heap.Len() == 0 { + c.inScheduling = nil return nil } - - return c.heap.Pop() + c.inScheduling = c.heap.Pop() + return c.inScheduling } func (c *clusterQueueBase) Dump() ([]string, bool) { @@ -302,6 +318,9 @@ func (c *clusterQueueBase) totalElements() []*workload.Info { for _, e := range c.inadmissibleWorkloads { elements = append(elements, e) } + if c.inScheduling != nil { + elements = append(elements, c.inScheduling) + } return elements } diff --git a/pkg/queue/cluster_queue_impl_test.go b/pkg/queue/cluster_queue_impl_test.go index eb3ab20f3ec..66ef79c40be 100644 --- a/pkg/queue/cluster_queue_impl_test.go +++ b/pkg/queue/cluster_queue_impl_test.go @@ -101,7 +101,7 @@ func Test_PushOrUpdate(t *testing.T) { updatedWl := tc.workload.Clone().ResourceVersion("1").Obj() cq.PushOrUpdate(workload.NewInfo(updatedWl)) newWl := cq.Pop() - if newWl != nil && cq.Pending() != 0 { + if newWl != nil && cq.Pending() != 1 { t.Error("failed to update a workload in ClusterQueue") } if diff := cmp.Diff(tc.wantWorkload, newWl, cmpOpts...); len(diff) != 0 { diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 1c0fc7c202d..9e559f40cd7 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -1723,4 +1723,75 @@ var _ = ginkgo.Describe("Scheduler", func() { util.FinishWorkloads(ctx, k8sClient, wl1, wl2) }) }) + + ginkgo.When("Queueing with StrictFIFO", func() { + var ( + cq *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + cq = testing.MakeClusterQueue("cq"). + QueueingStrategy(kueue.StrictFIFO). + ResourceGroup(*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) + + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(cq.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) + }) + + ginkgo.It("Should report pending workloads properly when blocked", func() { + var wl1, wl2, wl3 *kueue.Workload + ginkgo.By("Creating workloads", func() { + wl1 = testing.MakeWorkload("wl1", ns.Name).Queue(localQueue. + Name).Request(corev1.ResourceCPU, "2").Priority(100).Obj() + gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) + wl2 = testing.MakeWorkload("wl2", ns.Name).Queue(localQueue. + Name).Request(corev1.ResourceCPU, "5").Priority(10).Obj() + gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify wl2 is pending", func() { + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1) + util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) + util.ExpectPendingWorkloadsMetric(cq, 1, 0) + + gomega.Eventually(func() int32 { + var clusterQueue kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed()) + return clusterQueue.Status.PendingWorkloads + }, util.Timeout, util.Interval).Should(gomega.Equal(int32(1))) + }) + + ginkgo.By("Creating workload wl3", func() { + wl3 = testing.MakeWorkload("wl3", ns.Name).Queue(localQueue. + Name).Request(corev1.ResourceCPU, "1").Priority(1).Obj() + gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify the wl2 workload blocks admission of the wl3 workload", func() { + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1) + util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) + util.ExpectPendingWorkloadsMetric(cq, 2, 0) + gomega.Eventually(func() int32 { + var clusterQueue kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed()) + return clusterQueue.Status.PendingWorkloads + }, util.Timeout, util.Interval).Should(gomega.Equal(int32(2))) + }) + + ginkgo.By("Mark all workloads as finished", func() { + util.FinishWorkloads(ctx, k8sClient, wl1, wl2, wl3) + }) + + }) + }) })