diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d830fada88..d50f5a173b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -53,8 +53,8 @@ var ( prometheus.GaugeOpts{ Subsystem: subsystemName, Name: "pending_workloads", - Help: "Number of pending workloads, per queue and cluster_queue.", - }, []string{"cluster_queue", "queue"}) + Help: "Number of pending workloads, per cluster_queue.", + }, []string{"cluster_queue"}) ) func AdmissionAttempt(result AdmissionResult, duration time.Duration) { diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 8d282e9cd7..a716da90ed 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/workload" ) @@ -106,6 +107,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e } queued := m.queueAllInadmissibleWorkloadsInCohort(cqImpl) + reportPendingWorkloads(cq.Name, cqImpl.Pending()) if queued || addedWorkloads { m.Broadcast() } @@ -144,6 +146,7 @@ func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) { return } delete(m.clusterQueues, cq.Name) + metrics.PendingWorkloads.DeleteLabelValues(cq.Name) cohort := cq.Spec.Cohort m.deleteCohort(cohort, cq.Name) @@ -177,7 +180,6 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error { if cq != nil && cq.AddFromQueue(qImpl) { m.Broadcast() } - qImpl.reportPendingWorkloads() return nil } @@ -189,7 +191,6 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error { return errQueueDoesNotExist } if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) { - qImpl.resetPendingWorkloads() oldCQ := m.clusterQueues[qImpl.ClusterQueue] if oldCQ != nil { oldCQ.DeleteFromQueue(qImpl) @@ -200,7 +201,6 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error { } } qImpl.update(q) - qImpl.reportPendingWorkloads() return nil } @@ -217,7 +217,6 @@ func (m *Manager) DeleteQueue(q *kueue.Queue) { cq.DeleteFromQueue(qImpl) } delete(m.queues, key) - qImpl.resetPendingWorkloads() } func (m *Manager) PendingWorkloads(q *kueue.Queue) (int32, error) { @@ -276,12 +275,12 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool { } wInfo := workload.NewInfo(w) q.AddOrUpdate(wInfo) - q.reportPendingWorkloads() cq := m.clusterQueues[q.ClusterQueue] if cq == nil { return false } cq.PushOrUpdate(wInfo) + reportPendingWorkloads(q.ClusterQueue, cq.Pending()) m.Broadcast() return true } @@ -307,13 +306,13 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, imme } info.Update(&w) q.AddOrUpdate(info) - q.reportPendingWorkloads() cq := m.clusterQueues[q.ClusterQueue] if cq == nil { return false } added := cq.RequeueIfNotPresent(info, immediate) + reportPendingWorkloads(q.ClusterQueue, cq.Pending()) if added { m.Broadcast() } @@ -332,10 +331,10 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey return } delete(q.items, workload.Key(w)) - q.reportPendingWorkloads() cq := m.clusterQueues[q.ClusterQueue] if cq != nil { cq.Delete(w) + reportPendingWorkloads(q.ClusterQueue, cq.Pending()) } } @@ -483,12 +482,12 @@ func (m *Manager) heads() []workload.Info { if wl == nil { continue } + reportPendingWorkloads(cqName, cq.Pending()) wlCopy := *wl wlCopy.ClusterQueue = cqName workloads = append(workloads, wlCopy) q := m.queues[queueKeyForWorkload(wl.Obj)] delete(q.items, workload.Key(wl.Obj)) - q.reportPendingWorkloads() } return workloads } @@ -538,3 +537,7 @@ func SetupIndexes(indexer client.FieldIndexer) error { } return nil } + +func reportPendingWorkloads(cqName string, val int32) { + metrics.PendingWorkloads.WithLabelValues(cqName).Set(float64(val)) +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index de9acdd79e..0582cf7509 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -20,7 +20,6 @@ import ( "fmt" kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/workload" ) @@ -74,11 +73,3 @@ func (q *Queue) AddIfNotPresent(w *workload.Info) bool { } return false } - -func (q *Queue) reportPendingWorkloads() { - metrics.PendingWorkloads.WithLabelValues(q.ClusterQueue, q.Key).Set(float64(len(q.items))) -} - -func (q *Queue) resetPendingWorkloads() { - metrics.PendingWorkloads.DeleteLabelValues(q.ClusterQueue, q.Key) -} diff --git a/test/integration/controller/core/clusterqueue_controller_test.go b/test/integration/controller/core/clusterqueue_controller_test.go index c7d91d9ee3..fec231e6fd 100644 --- a/test/integration/controller/core/clusterqueue_controller_test.go +++ b/test/integration/controller/core/clusterqueue_controller_test.go @@ -118,6 +118,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { PendingWorkloads: 5, UsedResources: emptyUsedResources, })) + framework.ExpectPendingWorkloadsMetric(clusterQueue, 5) ginkgo.By("Admitting workloads") admissions := []*kueue.Admission{ @@ -168,6 +169,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { }, }, })) + framework.ExpectPendingWorkloadsMetric(clusterQueue, 1) ginkgo.By("Finishing workloads") for _, w := range workloads { @@ -188,6 +190,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { }, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.ClusterQueueStatus{ UsedResources: emptyUsedResources, })) + framework.ExpectPendingWorkloadsMetric(clusterQueue, 0) }) }) }) diff --git a/test/integration/controller/core/queue_controller_test.go b/test/integration/controller/core/queue_controller_test.go index 5d54363cd1..75ebbe75ef 100644 --- a/test/integration/controller/core/queue_controller_test.go +++ b/test/integration/controller/core/queue_controller_test.go @@ -82,7 +82,6 @@ var _ = ginkgo.Describe("Queue controller", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status }, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 3})) - framework.ExpectPendingWorkloadsMetric(queue, 3) ginkgo.By("Admitting workloads") for _, w := range workloads { @@ -99,7 +98,6 @@ var _ = ginkgo.Describe("Queue controller", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status }, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 0})) - framework.ExpectPendingWorkloadsMetric(queue, 0) ginkgo.By("Finishing workloads") for _, w := range workloads { diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 8ceaad20ce..673b9046a1 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -43,7 +43,6 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/metrics" - "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/workload" // +kubebuilder:scaffold:imports ) @@ -248,8 +247,8 @@ func ExpectWorkloadsToBeFrozen(ctx context.Context, k8sClient client.Client, cq }, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are frozen") } -func ExpectPendingWorkloadsMetric(q *kueue.Queue, v int) { - metric := metrics.PendingWorkloads.WithLabelValues(string(q.Spec.ClusterQueue), queue.Key(q)) +func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, v int) { + metric := metrics.PendingWorkloads.WithLabelValues(cq.Name) gomega.EventuallyWithOffset(1, func() int { v, err := testutil.GetGaugeMetricValue(metric) gomega.Expect(err).ToNot(gomega.HaveOccurred()) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 2a4c771fbe..11a71b46ee 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -134,7 +134,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdProdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdProdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(prodQueue, 0) + framework.ExpectPendingWorkloadsMetric(prodClusterQ, 0) ginkgo.By("checking a second no-fit prod job does not start") prodJob2 := testing.MakeJob("prod-job2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj() @@ -145,7 +145,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey2, createdProdJob2)).Should(gomega.Succeed()) return createdProdJob2.Spec.Suspend }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(pointer.Bool(true))) - framework.ExpectPendingWorkloadsMetric(prodQueue, 1) + framework.ExpectPendingWorkloadsMetric(prodClusterQ, 1) ginkgo.By("checking a dev job starts") devJob := testing.MakeJob("dev-job", ns.Name).Queue(devQueue.Name).Request(corev1.ResourceCPU, "5").Obj() @@ -157,7 +157,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdDevJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdDevJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(devQueue, 0) + framework.ExpectPendingWorkloadsMetric(devClusterQ, 0) ginkgo.By("checking the second prod job starts when the first finishes") createdProdJob1.Status.Conditions = append(createdProdJob1.Status.Conditions, @@ -173,7 +173,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdProdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdProdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(prodQueue, 0) + framework.ExpectPendingWorkloadsMetric(prodClusterQ, 0) }) ginkgo.It("Should schedule jobs according to their priorities", func() { @@ -208,6 +208,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(createdHighPriorityWorkload.Spec.PriorityClassName).Should(gomega.Equal(highPriorityClass.Name)) gomega.Expect(*createdHighPriorityWorkload.Spec.Priority).Should(gomega.Equal(highPriorityClass.Value)) + framework.ExpectPendingWorkloadsMetric(prodClusterQ, 0) // delay creating the queue until after workloads are created. gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) @@ -226,7 +227,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(prodClusterQ, 1) }) }) @@ -269,7 +270,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) job2 := testing.MakeJob("on-demand-job2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "4").Obj() gomega.Expect(k8sClient.Create(ctx, job2)).Should(gomega.Succeed()) @@ -279,7 +280,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob2)).Should(gomega.Succeed()) return createdJob2.Spec.Suspend }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(pointer.Bool(true))) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) job3 := testing.MakeJob("on-demand-job3", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "1").Obj() gomega.Expect(k8sClient.Create(ctx, job3)).Should(gomega.Succeed()) @@ -290,7 +291,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob3.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob3.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) ginkgo.By("deleting job1") gomega.Expect(k8sClient.Delete(ctx, job1, client.PropagationPolicy(metav1.DeletePropagationBackground))).Should(gomega.Succeed()) @@ -306,22 +307,22 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) ginkgo.It("Should re-enqueue by the delete event of workload belonging to the same Cohort", func() { - fooCq := testing.MakeClusterQueue("foo-clusterqueue"). + fooCQ := testing.MakeClusterQueue("foo-clusterqueue"). Cohort(cq.Spec.Cohort). Resource(testing.MakeResource(corev1.ResourceCPU). Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Obj()). Obj()). Obj() - gomega.Expect(k8sClient.Create(ctx, fooCq)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, fooCQ)).Should(gomega.Succeed()) defer func() { - gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, fooCq)).Should(gomega.Succeed()) + gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, fooCQ)).Should(gomega.Succeed()) }() - fooQ := testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCq.Name).Obj() + fooQ := testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, fooQ)).Should(gomega.Succeed()) job1 := testing.MakeJob("on-demand-job1", ns.Name).Queue(fooQ.Name).Request(corev1.ResourceCPU, "8").Obj() @@ -334,7 +335,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(fooQ, 0) + framework.ExpectPendingWorkloadsMetric(fooCQ, 0) job2 := testing.MakeJob("on-demand-job2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "8").Obj() @@ -345,7 +346,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob2)).Should(gomega.Succeed()) return createdJob2.Spec.Suspend }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(pointer.Bool(true))) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) job3 := testing.MakeJob("on-demand-job3", ns.Name).Queue(fooQ.Name).Request(corev1.ResourceCPU, "2").Obj() gomega.Expect(k8sClient.Create(ctx, job3)).Should(gomega.Succeed()) @@ -356,7 +357,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob3.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob3.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(fooQ, 0) + framework.ExpectPendingWorkloadsMetric(fooCQ, 0) ginkgo.By("deleting job1") gomega.Expect(k8sClient.Delete(ctx, job1, client.PropagationPolicy(metav1.DeletePropagationBackground))).Should(gomega.Succeed()) @@ -373,7 +374,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) }) @@ -409,7 +410,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob2)).Should(gomega.Succeed()) return createdJob2.Spec.Suspend }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(pointer.Bool(true))) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) ginkgo.By("updating ClusterQueue") updatedCq := &kueue.ClusterQueue{} @@ -425,7 +426,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) }) @@ -471,7 +472,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).Should(gomega.Succeed()) return createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(true)), "Job should be suspended") - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) ginkgo.By("checking the job starts after updating namespace labels to match QC selector") ns.Labels = map[string]string{"dep": "eng"} @@ -480,37 +481,38 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).Should(gomega.Succeed()) return createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false)), "Job should be unsuspended") - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) }) ginkgo.When("Referencing resourceFlavors in clusterQueue", func() { var ( - fooCq *kueue.ClusterQueue + fooCQ *kueue.ClusterQueue fooQ *kueue.Queue ) ginkgo.BeforeEach(func() { - fooCq = testing.MakeClusterQueue("foo-cq"). + fooCQ = testing.MakeClusterQueue("foo-cq"). QueueingStrategy(kueue.BestEffortFIFO). Resource(testing.MakeResource(corev1.ResourceCPU). Flavor(testing.MakeFlavor("foo-flavor", "15").Obj()). Obj()). Obj() - gomega.Expect(k8sClient.Create(ctx, fooCq)).Should(gomega.Succeed()) - fooQ = testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCq.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, fooCQ)).Should(gomega.Succeed()) + fooQ = testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, fooQ)).Should(gomega.Succeed()) }) ginkgo.AfterEach(func() { - gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, fooCq)).To(gomega.Succeed()) + gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, fooCQ)).To(gomega.Succeed()) }) ginkgo.It("Should be inactive until the flavor is created", func() { ginkgo.By("Creating one workload") wl := testing.MakeWorkload("workload", ns.Name).Queue(fooQ.Name).Request(corev1.ResourceCPU, "1").Obj() gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) - framework.ExpectWorkloadsToBeFrozen(ctx, k8sClient, fooCq.Name, wl) + framework.ExpectWorkloadsToBeFrozen(ctx, k8sClient, fooCQ.Name, wl) + framework.ExpectPendingWorkloadsMetric(fooCQ, 1) ginkgo.By("Creating foo flavor") fooFlavor := testing.MakeResourceFlavor("foo-flavor").Obj() @@ -518,7 +520,8 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(framework.DeleteResourceFlavor(ctx, k8sClient, fooFlavor)).To(gomega.Succeed()) }() gomega.Expect(k8sClient.Create(ctx, fooFlavor)).Should(gomega.Succeed()) - framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, fooCq.Name, wl) + framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, fooCQ.Name, wl) + framework.ExpectPendingWorkloadsMetric(fooCQ, 0) }) }) @@ -562,13 +565,13 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl1), wlCopy)).To(gomega.Succeed()) return wlCopy.Spec.Admission }, framework.Timeout, framework.Interval).Should(testing.Equal(expectAdmission)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) ginkgo.By("checking a second workload without toleration doesn't start") wl2 := testing.MakeWorkload("on-demand-wl2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").Obj() gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) framework.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) ginkgo.By("checking a third workload with toleration starts") wl3 := testing.MakeWorkload("on-demand-wl3", ns.Name).Queue(queue.Name).Toleration(spotToleration).Request(corev1.ResourceCPU, "5").Obj() @@ -579,7 +582,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl3), wlCopy)).To(gomega.Succeed()) return wlCopy.Spec.Admission }, framework.Timeout, framework.Interval).Should(testing.Equal(expectAdmission)) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) }) }) @@ -622,7 +625,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) ginkgo.By("checking a second job with affinity to on-demand") job2 := testing.MakeJob("affinity-job", ns.Name).Queue(queue.Name). @@ -637,7 +640,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) gomega.Expect(len(createdJob2.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(2)) gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) }) @@ -678,7 +681,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) return createdJob.Spec.Suspend }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(pointer.Bool(true))) - framework.ExpectPendingWorkloadsMetric(queue, 1) + framework.ExpectPendingWorkloadsMetric(cq, 1) ginkgo.By("checking the job starts when a fallback ClusterQueue gets added") fallbackClusterQueue := testing.MakeClusterQueue("fallback-cq"). @@ -698,7 +701,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) ginkgo.It("Should schedule workloads borrowing quota from ClusterQueues in the same Cohort", func() { @@ -736,8 +739,8 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) framework.ExpectWorkloadsToBePending(ctx, k8sClient, wl1, wl2) - framework.ExpectPendingWorkloadsMetric(prodBEQueue, 1) - framework.ExpectPendingWorkloadsMetric(devBEQueue, 1) + framework.ExpectPendingWorkloadsMetric(prodBEClusterQ, 1) + framework.ExpectPendingWorkloadsMetric(devBEClusterQ, 1) // Make sure workloads are in the same scheduling cycle. testBEClusterQ := testing.MakeClusterQueue("test-be-cq"). @@ -754,8 +757,8 @@ var _ = ginkgo.Describe("Scheduler", func() { framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, prodBEClusterQ.Name, wl1) framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, devBEClusterQ.Name, wl2) - framework.ExpectPendingWorkloadsMetric(prodBEQueue, 0) - framework.ExpectPendingWorkloadsMetric(devBEQueue, 0) + framework.ExpectPendingWorkloadsMetric(prodBEClusterQ, 0) + framework.ExpectPendingWorkloadsMetric(devBEClusterQ, 0) }) }) @@ -789,7 +792,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, bigWl)).Should(gomega.Succeed()) framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, bigWl) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) smallWl1 := testing.MakeWorkload("small-wl-1", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2.5").Obj() smallWl2 := testing.MakeWorkload("small-wl-2", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "2.5").Obj() @@ -798,7 +801,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, smallWl2)).Should(gomega.Succeed()) framework.ExpectWorkloadsToBePending(ctx, k8sClient, smallWl1, smallWl2) - framework.ExpectPendingWorkloadsMetric(queue, 2) + framework.ExpectPendingWorkloadsMetric(cq, 2) ginkgo.By("Marking the big workload as finished") framework.UpdateWorkloadStatus(ctx, k8sClient, bigWl, func(wl *kueue.Workload) { @@ -809,7 +812,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }) framework.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, smallWl1, smallWl2) - framework.ExpectPendingWorkloadsMetric(queue, 0) + framework.ExpectPendingWorkloadsMetric(cq, 0) }) }) @@ -856,6 +859,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Get(ctx, lookupKey, wl3)).Should(gomega.Succeed()) return wl3.Spec.Admission == nil }, framework.ConsistentDuration, framework.Interval).Should(gomega.Equal(true)) + framework.ExpectPendingWorkloadsMetric(strictFIFOClusterQ, 2) }) }) })