Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric for admitted active workloads #291

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/pointer"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -216,6 +217,7 @@ func (c *ClusterQueue) addWorkload(w *kueue.Workload) error {
wi := workload.NewInfo(w)
c.Workloads[k] = wi
c.updateWorkloadUsage(wi, 1)
reportAdmittedActiveWorkloads(wi.ClusterQueue, len(c.Workloads))
return nil
}

Expand All @@ -227,6 +229,7 @@ func (c *ClusterQueue) deleteWorkload(w *kueue.Workload) {
}
c.updateWorkloadUsage(wi, -1)
delete(c.Workloads, k)
reportAdmittedActiveWorkloads(wi.ClusterQueue, len(c.Workloads))
}

func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
Expand Down Expand Up @@ -398,6 +401,7 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
}
c.deleteClusterQueueFromCohort(cqImpl)
delete(c.clusterQueues, cq.Name)
metrics.AdmittedActiveWorkloads.DeleteLabelValues(cq.Name)
}

func (c *Cache) AddQueue(q *kueue.Queue) error {
Expand Down Expand Up @@ -669,3 +673,7 @@ func workloadBelongsToQueue(wl *kueue.Workload, q *kueue.Queue) bool {
func queueKey(q *kueue.Queue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
}

func reportAdmittedActiveWorkloads(cqName string, val int) {
metrics.AdmittedActiveWorkloads.WithLabelValues(cqName).Set(float64(val))
}
9 changes: 9 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ var (
Help: "Number of pending workloads, per cluster_queue.",
}, []string{"cluster_queue"},
)

AdmittedActiveWorkloads = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: subsystemName,
Name: "admitted_active_workloads",
Help: "Number of admitted workloads that are active (unsuspended and not finished), per cluster_queue",
}, []string{"cluster_queue"},
)
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
Expand All @@ -68,5 +76,6 @@ func Register() {
admissionAttempts,
admissionAttemptLatency,
PendingWorkloads,
AdmittedActiveWorkloads,
)
}
9 changes: 7 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type Info struct {
Obj *kueue.Workload
// list of total resources requested by the podsets.
TotalRequests []PodSetResources
// Populated from queue.
// Populated from the queue during admission or from the admission field if
// already admitted.
ClusterQueue string
}

Expand All @@ -45,10 +46,14 @@ type PodSetResources struct {
}

func NewInfo(w *kueue.Workload) *Info {
return &Info{
info := &Info{
Obj: w,
TotalRequests: totalRequests(&w.Spec),
}
if w.Spec.Admission != nil {
info.ClusterQueue = string(w.Spec.Admission.ClusterQueue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we were only using this field during scheduling, where the information comes from the queues, rather than the Workload.

Now I'm populating the field when adding the workload to the cache, for consistency and convenience (and to avoid potential bugs in the future).

}
return info
}

func (i *Info) Update(wl *kueue.Workload) {
Expand Down
139 changes: 92 additions & 47 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,68 +133,113 @@ func TestPodRequests(t *testing.T) {
}

func TestNewInfo(t *testing.T) {
wl := &kueue.Workload{
Spec: kueue.WorkloadSpec{
PodSets: []kueue.PodSet{
{
Name: "driver",
Spec: corev1.PodSpec{
Containers: containersForRequests(
map[corev1.ResourceName]string{
corev1.ResourceCPU: "10m",
corev1.ResourceMemory: "512Ki",
}),
cases := map[string]struct {
workload kueue.Workload
wantInfo Info
}{
"pending": {
workload: kueue.Workload{
Spec: kueue.WorkloadSpec{
PodSets: []kueue.PodSet{
{
Name: "driver",
Spec: corev1.PodSpec{
Containers: containersForRequests(
map[corev1.ResourceName]string{
corev1.ResourceCPU: "10m",
corev1.ResourceMemory: "512Ki",
}),
},
Count: 1,
},
},
Count: 1,
},
{
Name: "workers",
Spec: corev1.PodSpec{
Containers: containersForRequests(
map[corev1.ResourceName]string{
corev1.ResourceCPU: "5m",
corev1.ResourceMemory: "1Mi",
"ex.com/gpu": "1",
}),
},
wantInfo: Info{
TotalRequests: []PodSetResources{
{
Name: "driver",
Requests: Requests{
corev1.ResourceCPU: 10,
corev1.ResourceMemory: 512 * 1024,
},
},
},
},
},
"admitted": {
workload: kueue.Workload{
Spec: kueue.WorkloadSpec{
PodSets: []kueue.PodSet{
{
Name: "driver",
Spec: corev1.PodSpec{
Containers: containersForRequests(
map[corev1.ResourceName]string{
corev1.ResourceCPU: "10m",
corev1.ResourceMemory: "512Ki",
}),
},
Count: 1,
},
{
Name: "workers",
Spec: corev1.PodSpec{
Containers: containersForRequests(
map[corev1.ResourceName]string{
corev1.ResourceCPU: "5m",
corev1.ResourceMemory: "1Mi",
"ex.com/gpu": "1",
}),
},
Count: 3,
},
},
Admission: &kueue.Admission{
ClusterQueue: "foo",
PodSetFlavors: []kueue.PodSetFlavors{
{
Name: "driver",
Flavors: map[corev1.ResourceName]string{
corev1.ResourceCPU: "on-demand",
},
},
},
},
Count: 3,
},
},
Admission: &kueue.Admission{
PodSetFlavors: []kueue.PodSetFlavors{
wantInfo: Info{
ClusterQueue: "foo",
TotalRequests: []PodSetResources{
{
Name: "driver",
Requests: Requests{
corev1.ResourceCPU: 10,
corev1.ResourceMemory: 512 * 1024,
},
Flavors: map[corev1.ResourceName]string{
corev1.ResourceCPU: "on-demand",
},
},
{
Name: "workers",
Requests: Requests{
corev1.ResourceCPU: 15,
corev1.ResourceMemory: 3 * 1024 * 1024,
"ex.com/gpu": 3,
},
},
},
},
},
}
info := NewInfo(wl)
wantRequests := []PodSetResources{
{
Name: "driver",
Requests: Requests{
corev1.ResourceCPU: 10,
corev1.ResourceMemory: 512 * 1024,
},
Flavors: map[corev1.ResourceName]string{
corev1.ResourceCPU: "on-demand",
},
},
{
Name: "workers",
Requests: Requests{
corev1.ResourceCPU: 15,
corev1.ResourceMemory: 3 * 1024 * 1024,
"ex.com/gpu": 3,
},
},
}
if diff := cmp.Diff(info.TotalRequests, wantRequests); diff != "" {
t.Errorf("NewInfo returned unexpected total requests (-want,+got):\n%s", diff)
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
info := NewInfo(&tc.workload)
if diff := cmp.Diff(info, &tc.wantInfo, cmpopts.IgnoreFields(Info{}, "Obj")); diff != "" {
t.Errorf("NewInfo(_) = (-want,+got):\n%s", diff)
}
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
UsedResources: emptyUsedResources,
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 5)
framework.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0)

ginkgo.By("Admitting workloads")
admissions := []*kueue.Admission{
Expand Down Expand Up @@ -170,6 +171,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
},
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 1)
framework.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 4)

ginkgo.By("Finishing workloads")
for _, w := range workloads {
Expand All @@ -191,6 +193,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() {
UsedResources: emptyUsedResources,
}))
framework.ExpectPendingWorkloadsMetric(clusterQueue, 0)
framework.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0)
})
})
})
10 changes: 9 additions & 1 deletion test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/workload"
// +kubebuilder:scaffold:imports
)

type ManagerSetup func(manager.Manager, context.Context)
Expand Down Expand Up @@ -256,6 +255,15 @@ func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, v int) {
}, Timeout, Interval).Should(gomega.Equal(v))
}

func ExpectAdmittedActiveWorkloadsMetric(cq *kueue.ClusterQueue, v int) {
metric := metrics.AdmittedActiveWorkloads.WithLabelValues(cq.Name)
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetGaugeMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return int(v)
}, Timeout, Interval).Should(gomega.Equal(v))
}

func UpdateWorkloadStatus(ctx context.Context, k8sClient client.Client, wl *kueue.Workload, update func(*kueue.Workload)) {
gomega.EventuallyWithOffset(1, func() error {
var updatedWl kueue.Workload
Expand Down
Loading