Skip to content

Commit

Permalink
Test StrictFiFO misreporting metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Apr 3, 2024
1 parent af116ab commit bad5983
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
25 changes: 22 additions & 3 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type clusterQueueBase struct {
// of inadmissible workloads while a workload is being scheduled.
popCycle int64

inScheduling *workload.Info

// queueInadmissibleCycle stores the popId at the time when
// QueueInadmissibleWorkloads is called.
queueInadmissibleCycle int64
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
c.rwm.Lock()
defer c.rwm.Unlock()
key := workload.Key(wInfo.Obj)
c.cleanInScheduling(key)
oldInfo := c.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
Expand Down Expand Up @@ -153,6 +156,7 @@ func (c *clusterQueueBase) Delete(w *kueue.Workload) {
key := workload.Key(w)
delete(c.inadmissibleWorkloads, key)
c.heap.Delete(key)
c.cleanInScheduling(key)
}

func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) {
Expand All @@ -178,6 +182,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b
c.rwm.Lock()
defer c.rwm.Unlock()
key := workload.Key(wInfo.Obj)
c.cleanInScheduling(key)
if c.backoffWaitingTimeExpired(wInfo) &&
(immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors()) {
// If the workload was inadmissible, move it back into the queue.
Expand All @@ -202,6 +207,12 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b
return true
}

func (c *clusterQueueBase) cleanInScheduling(key string) {
if c.inScheduling != nil && workload.Key(c.inScheduling.Obj) == key {
c.inScheduling = nil
}
}

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true, otherwise returns false.
func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool {
Expand Down Expand Up @@ -235,7 +246,11 @@ func (c *clusterQueueBase) Pending() int {
}

func (c *clusterQueueBase) PendingActive() int {
return c.heap.Len()
result := c.heap.Len()
if c.inScheduling != nil {
result += 1
}
return result
}

func (c *clusterQueueBase) PendingInadmissible() int {
Expand All @@ -247,10 +262,11 @@ func (c *clusterQueueBase) Pop() *workload.Info {
defer c.rwm.Unlock()
c.popCycle++
if c.heap.Len() == 0 {
c.inScheduling = nil
return nil
}

return c.heap.Pop()
c.inScheduling = c.heap.Pop()
return c.inScheduling
}

func (c *clusterQueueBase) Dump() ([]string, bool) {
Expand Down Expand Up @@ -302,6 +318,9 @@ func (c *clusterQueueBase) totalElements() []*workload.Info {
for _, e := range c.inadmissibleWorkloads {
elements = append(elements, e)
}
if c.inScheduling != nil {
elements = append(elements, c.inScheduling)
}
return elements
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func Test_PushOrUpdate(t *testing.T) {
updatedWl := tc.workload.Clone().ResourceVersion("1").Obj()
cq.PushOrUpdate(workload.NewInfo(updatedWl))
newWl := cq.Pop()
if newWl != nil && cq.Pending() != 0 {
if newWl != nil && cq.Pending() != 1 {
t.Error("failed to update a workload in ClusterQueue")
}
if diff := cmp.Diff(tc.wantWorkload, newWl, cmpOpts...); len(diff) != 0 {
Expand Down
71 changes: 71 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,4 +1723,75 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.FinishWorkloads(ctx, k8sClient, wl1, wl2)
})
})

ginkgo.When("Queueing with StrictFIFO", func() {
var (
cq *kueue.ClusterQueue
localQueue *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed())
cq = testing.MakeClusterQueue("cq").
QueueingStrategy(kueue.StrictFIFO).
ResourceGroup(*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed())

localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(cq.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})

ginkgo.It("Should report pending workloads properly when blocked", func() {
var wl1, wl2, wl3 *kueue.Workload
ginkgo.By("Creating workloads", func() {
wl1 = testing.MakeWorkload("wl1", ns.Name).Queue(localQueue.
Name).Request(corev1.ResourceCPU, "2").Priority(100).Obj()
gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed())
wl2 = testing.MakeWorkload("wl2", ns.Name).Queue(localQueue.
Name).Request(corev1.ResourceCPU, "5").Priority(10).Obj()
gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed())
})

ginkgo.By("Verify wl2 is pending", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1)
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2)
util.ExpectPendingWorkloadsMetric(cq, 1, 0)

gomega.Eventually(func() int32 {
var clusterQueue kueue.ClusterQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed())
return clusterQueue.Status.PendingWorkloads
}, util.Timeout, util.Interval).Should(gomega.Equal(int32(1)))
})

ginkgo.By("Creating workload wl3", func() {
wl3 = testing.MakeWorkload("wl3", ns.Name).Queue(localQueue.
Name).Request(corev1.ResourceCPU, "1").Priority(1).Obj()
gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed())
})

ginkgo.By("Verify the wl2 workload blocks admission of the wl3 workload", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1)
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl2)
util.ExpectPendingWorkloadsMetric(cq, 2, 0)
gomega.Eventually(func() int32 {
var clusterQueue kueue.ClusterQueue
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &clusterQueue)).To(gomega.Succeed())
return clusterQueue.Status.PendingWorkloads
}, util.Timeout, util.Interval).Should(gomega.Equal(int32(2)))
})

ginkgo.By("Mark all workloads as finished", func() {
util.FinishWorkloads(ctx, k8sClient, wl1, wl2, wl3)
})

})
})
})

0 comments on commit bad5983

Please sign in to comment.