Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Apr 2, 2024
1 parent dc25ca6 commit a829ccf
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ test: gotestsum ## Run tests.
.PHONY: test-integration
test-integration: gomod-download envtest ginkgo mpi-operator-crd ray-operator-crd jobset-operator-crd kf-training-operator-crd cluster-autoscaler-crd ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \
$(GINKGO) $(GINKGO_ARGS) --until-it-fails -procs=$(INTEGRATION_NPROCS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)
$(GINKGO) $(GINKGO_ARGS) -procs=$(INTEGRATION_NPROCS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)

CREATE_KIND_CLUSTER ?= true
.PHONY: test-e2e
Expand Down
28 changes: 25 additions & 3 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type clusterQueueBase struct {
// of inadmissible workloads while a workload is being scheduled.
popCycle int64

// keeps the ptr to the element that is currently being scheduled. It is still
// counted as pending.
inScheduling *workload.Info

// queueInadmissibleCycle stores the popId at the time when
// QueueInadmissibleWorkloads is called.
queueInadmissibleCycle int64
Expand Down Expand Up @@ -150,9 +154,12 @@ func (c *clusterQueueBase) backoffWaitingTimeExpired(wInfo *workload.Info) bool
}

func (c *clusterQueueBase) Delete(w *kueue.Workload) {
c.rwm.Lock()
defer c.rwm.Unlock()
key := workload.Key(w)
delete(c.inadmissibleWorkloads, key)
c.heap.Delete(key)
c.cleanInScheduling(key)
}

func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) {
Expand All @@ -178,6 +185,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b
c.rwm.Lock()
defer c.rwm.Unlock()
key := workload.Key(wInfo.Obj)
c.cleanInScheduling(key)
if c.backoffWaitingTimeExpired(wInfo) &&
(immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors()) {
// If the workload was inadmissible, move it back into the queue.
Expand All @@ -202,6 +210,12 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b
return true
}

func (c *clusterQueueBase) cleanInScheduling(key string) {
if c.inScheduling != nil && workload.Key(c.inScheduling.Obj) == key {
c.inScheduling = nil
}
}

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true, otherwise returns false.
func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool {
Expand Down Expand Up @@ -235,7 +249,11 @@ func (c *clusterQueueBase) Pending() int {
}

func (c *clusterQueueBase) PendingActive() int {
return c.heap.Len()
result := c.heap.Len()
if c.inScheduling != nil {
result += 1
}
return result
}

func (c *clusterQueueBase) PendingInadmissible() int {
Expand All @@ -247,10 +265,11 @@ func (c *clusterQueueBase) Pop() *workload.Info {
defer c.rwm.Unlock()
c.popCycle++
if c.heap.Len() == 0 {
c.inScheduling = nil
return nil
}

return c.heap.Pop()
c.inScheduling = c.heap.Pop()
return c.inScheduling
}

func (c *clusterQueueBase) Dump() ([]string, bool) {
Expand Down Expand Up @@ -302,6 +321,9 @@ func (c *clusterQueueBase) totalElements() []*workload.Info {
for _, e := range c.inadmissibleWorkloads {
elements = append(elements, e)
}
if c.inScheduling != nil {
elements = append(elements, c.inScheduling)
}
return elements
}

Expand Down
9 changes: 7 additions & 2 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
})
})

ginkgo.FWhen("Queueing when mutating queueingStrategy", func() {
ginkgo.FWhen("Queueing with StrictFIFO", func() {
var (
cq *kueue.ClusterQueue
localQueue *kueue.LocalQueue
Expand All @@ -1748,7 +1748,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})

ginkgo.It("Should schedule workloads by their priority strictly", func() {
ginkgo.It("Should report pending workloads properly when blocked", func() {
var wl1, wl2, wl3 *kueue.Workload
ginkgo.By("Creating workloads", func() {
wl1 = testing.MakeWorkload("wl1", ns.Name).Queue(localQueue.
Expand Down Expand Up @@ -1781,6 +1781,11 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1)
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2)
util.ExpectPendingWorkloadsMetric(cq, 2, 0)
gomega.Eventually(func() int32 {
var clusterQueue kueue.ClusterQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed())
return clusterQueue.Status.PendingWorkloads
}, util.Timeout, util.Interval).Should(gomega.Equal(int32(2)))
})
})
})
Expand Down

0 comments on commit a829ccf

Please sign in to comment.