Skip to content

Commit

Permalink
Reduce cardinality of pending_workloads metric
Browse files Browse the repository at this point in the history
  • Loading branch information
alculquicondor authored and ahg-g committed Aug 10, 2022
1 parent 3d3222b commit 8000cbc
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 66 deletions.
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var (
prometheus.GaugeOpts{
Subsystem: subsystemName,
Name: "pending_workloads",
Help: "Number of pending workloads, per queue and cluster_queue.",
}, []string{"cluster_queue", "queue"})
Help: "Number of pending workloads, per cluster_queue.",
}, []string{"cluster_queue"})
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
Expand Down
19 changes: 11 additions & 8 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -106,6 +107,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
}

queued := m.queueAllInadmissibleWorkloadsInCohort(cqImpl)
reportPendingWorkloads(cq.Name, cqImpl.Pending())
if queued || addedWorkloads {
m.Broadcast()
}
Expand Down Expand Up @@ -144,6 +146,7 @@ func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) {
return
}
delete(m.clusterQueues, cq.Name)
metrics.PendingWorkloads.DeleteLabelValues(cq.Name)

cohort := cq.Spec.Cohort
m.deleteCohort(cohort, cq.Name)
Expand Down Expand Up @@ -177,7 +180,6 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error {
if cq != nil && cq.AddFromQueue(qImpl) {
m.Broadcast()
}
qImpl.reportPendingWorkloads()
return nil
}

Expand All @@ -189,7 +191,6 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error {
return errQueueDoesNotExist
}
if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) {
qImpl.resetPendingWorkloads()
oldCQ := m.clusterQueues[qImpl.ClusterQueue]
if oldCQ != nil {
oldCQ.DeleteFromQueue(qImpl)
Expand All @@ -200,7 +201,6 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error {
}
}
qImpl.update(q)
qImpl.reportPendingWorkloads()
return nil
}

Expand All @@ -217,7 +217,6 @@ func (m *Manager) DeleteQueue(q *kueue.Queue) {
cq.DeleteFromQueue(qImpl)
}
delete(m.queues, key)
qImpl.resetPendingWorkloads()
}

func (m *Manager) PendingWorkloads(q *kueue.Queue) (int32, error) {
Expand Down Expand Up @@ -276,12 +275,12 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool {
}
wInfo := workload.NewInfo(w)
q.AddOrUpdate(wInfo)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}
cq.PushOrUpdate(wInfo)
reportPendingWorkloads(q.ClusterQueue, cq.Pending())
m.Broadcast()
return true
}
Expand All @@ -307,13 +306,13 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, imme
}
info.Update(&w)
q.AddOrUpdate(info)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}

added := cq.RequeueIfNotPresent(info, immediate)
reportPendingWorkloads(q.ClusterQueue, cq.Pending())
if added {
m.Broadcast()
}
Expand All @@ -332,10 +331,10 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey
return
}
delete(q.items, workload.Key(w))
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq != nil {
cq.Delete(w)
reportPendingWorkloads(q.ClusterQueue, cq.Pending())
}
}

Expand Down Expand Up @@ -483,12 +482,12 @@ func (m *Manager) heads() []workload.Info {
if wl == nil {
continue
}
reportPendingWorkloads(cqName, cq.Pending())
wlCopy := *wl
wlCopy.ClusterQueue = cqName
workloads = append(workloads, wlCopy)
q := m.queues[queueKeyForWorkload(wl.Obj)]
delete(q.items, workload.Key(wl.Obj))
q.reportPendingWorkloads()
}
return workloads
}
Expand Down Expand Up @@ -538,3 +537,7 @@ func SetupIndexes(indexer client.FieldIndexer) error {
}
return nil
}

func reportPendingWorkloads(cqName string, val int32) {
metrics.PendingWorkloads.WithLabelValues(cqName).Set(float64(val))
}
9 changes: 0 additions & 9 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -74,11 +73,3 @@ func (q *Queue) AddIfNotPresent(w *workload.Info) bool {
}
return false
}

func (q *Queue) reportPendingWorkloads() {
metrics.PendingWorkloads.WithLabelValues(q.ClusterQueue, q.Key).Set(float64(len(q.items)))
}

func (q *Queue) resetPendingWorkloads() {
metrics.PendingWorkloads.DeleteLabelValues(q.ClusterQueue, q.Key)
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
PendingWorkloads: 5,
UsedResources: emptyUsedResources,
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 5)

ginkgo.By("Admitting workloads")
admissions := []*kueue.Admission{
Expand Down Expand Up @@ -168,6 +169,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
},
},
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 1)

ginkgo.By("Finishing workloads")
for _, w := range workloads {
Expand All @@ -188,6 +190,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
}, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.ClusterQueueStatus{
UsedResources: emptyUsedResources,
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 0)
})
})
})
2 changes: 0 additions & 2 deletions test/integration/controller/core/queue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ var _ = ginkgo.Describe("Queue controller", func() {
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 3}))
framework.ExpectPendingWorkloadsMetric(queue, 3)

ginkgo.By("Admitting workloads")
for _, w := range workloads {
Expand All @@ -99,7 +98,6 @@ var _ = ginkgo.Describe("Queue controller", func() {
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed())
return updatedQueue.Status
}, framework.Timeout, framework.Interval).Should(testing.Equal(kueue.QueueStatus{PendingWorkloads: 0}))
framework.ExpectPendingWorkloadsMetric(queue, 0)

ginkgo.By("Finishing workloads")
for _, w := range workloads {
Expand Down
5 changes: 2 additions & 3 deletions test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/workload"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -248,8 +247,8 @@ func ExpectWorkloadsToBeFrozen(ctx context.Context, k8sClient client.Client, cq
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are frozen")
}

func ExpectPendingWorkloadsMetric(q *kueue.Queue, v int) {
metric := metrics.PendingWorkloads.WithLabelValues(string(q.Spec.ClusterQueue), queue.Key(q))
func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.PendingWorkloads.WithLabelValues(cq.Name)
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetGaugeMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
Expand Down
Loading

0 comments on commit 8000cbc

Please sign in to comment.