diff --git a/Makefile b/Makefile index e32c8822406..0097425d38c 100644 --- a/Makefile +++ b/Makefile @@ -179,7 +179,7 @@ test: gotestsum ## Run tests. .PHONY: test-integration test-integration: gomod-download envtest ginkgo mpi-operator-crd ray-operator-crd jobset-operator-crd kf-training-operator-crd cluster-autoscaler-crd ## Run tests. KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \ - $(GINKGO) $(GINKGO_ARGS) --until-it-fails -procs=$(INTEGRATION_NPROCS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET) + $(GINKGO) $(GINKGO_ARGS) -procs=$(INTEGRATION_NPROCS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET) CREATE_KIND_CLUSTER ?= true .PHONY: test-e2e diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index dc3662cb6b9..8dacbe13dbb 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -51,6 +51,10 @@ type clusterQueueBase struct { // of inadmissible workloads while a workload is being scheduled. popCycle int64 + // keeps the ptr to the element that is currently being scheduled. It is still + // counted as pending. + inScheduling *workload.Info + // queueInadmissibleCycle stores the popId at the time when // QueueInadmissibleWorkloads is called. queueInadmissibleCycle int64 @@ -150,9 +154,12 @@ func (c *clusterQueueBase) backoffWaitingTimeExpired(wInfo *workload.Info) bool } func (c *clusterQueueBase) Delete(w *kueue.Workload) { + c.rwm.Lock() + defer c.rwm.Unlock() key := workload.Key(w) delete(c.inadmissibleWorkloads, key) c.heap.Delete(key) + c.cleanInScheduling(key) } func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) { @@ -178,6 +185,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b c.rwm.Lock() defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) + c.cleanInScheduling(key) if c.backoffWaitingTimeExpired(wInfo) && (immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors()) { // If the workload was inadmissible, move it back into the queue. @@ -202,6 +210,12 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b return true } +func (c *clusterQueueBase) cleanInScheduling(key string) { + if c.inScheduling != nil && workload.Key(c.inScheduling.Obj) == key { + c.inScheduling = nil + } +} + // QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. // If at least one workload is moved, returns true, otherwise returns false. func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool { @@ -235,7 +249,11 @@ func (c *clusterQueueBase) Pending() int { } func (c *clusterQueueBase) PendingActive() int { - return c.heap.Len() + result := c.heap.Len() + if c.inScheduling != nil { + result += 1 + } + return result } func (c *clusterQueueBase) PendingInadmissible() int { @@ -247,10 +265,11 @@ func (c *clusterQueueBase) Pop() *workload.Info { defer c.rwm.Unlock() c.popCycle++ if c.heap.Len() == 0 { + c.inScheduling = nil return nil } - - return c.heap.Pop() + c.inScheduling = c.heap.Pop() + return c.inScheduling } func (c *clusterQueueBase) Dump() ([]string, bool) { @@ -302,6 +321,9 @@ func (c *clusterQueueBase) totalElements() []*workload.Info { for _, e := range c.inadmissibleWorkloads { elements = append(elements, e) } + if c.inScheduling != nil { + elements = append(elements, c.inScheduling) + } return elements } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index b9f0d817f80..128af321933 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -1724,7 +1724,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }) }) - ginkgo.FWhen("Queueing when mutating queueingStrategy", func() { + ginkgo.FWhen("Queueing with StrictFIFO", func() { var ( cq *kueue.ClusterQueue localQueue *kueue.LocalQueue @@ -1748,7 +1748,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) }) - ginkgo.It("Should schedule workloads by their priority strictly", func() { + ginkgo.It("Should report pending workloads properly when blocked", func() { var wl1, wl2, wl3 *kueue.Workload ginkgo.By("Creating workloads", func() { wl1 = testing.MakeWorkload("wl1", ns.Name).Queue(localQueue. @@ -1781,6 +1781,11 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1) util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2) util.ExpectPendingWorkloadsMetric(cq, 2, 0) + gomega.Eventually(func() int32 { + var clusterQueue kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed()) + return clusterQueue.Status.PendingWorkloads + }, util.Timeout, util.Interval).Should(gomega.Equal(int32(2))) }) }) })