Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track number of active running workloads per queue #295

Merged
merged 2 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 100 additions & 4 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ import (
"sigs.k8s.io/kueue/pkg/workload"
)

const workloadClusterQueueKey = "spec.admission.clusterQueue"
const (
workloadClusterQueueKey = "spec.admission.clusterQueue"
queueClusterQueueKey = "spec.clusterQueue"
)

var (
errQueueAlreadyExists = errors.New("queue already exists")
errCqNotFound = errors.New("cluster queue not found")
errWorkloadNotAdmitted = errors.New("workload not admitted by a ClusterQueue")
)
Expand Down Expand Up @@ -107,6 +111,11 @@ type ClusterQueue struct {
// that can be matched against the flavors.
LabelKeys map[corev1.ResourceName]sets.String
Status ClusterQueueStatus

// The following fields are not populated in a snapshot.

// The number of admitted workloads per queue.
wlCountPerQueue map[string]int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps admittedWorkloadsPerQueue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// FlavorLimits holds a processed ClusterQueue flavor quota.
Expand All @@ -118,8 +127,9 @@ type FlavorLimits struct {

func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) {
cqImpl := &ClusterQueue{
Name: cq.Name,
Workloads: map[string]*workload.Info{},
Name: cq.Name,
Workloads: map[string]*workload.Info{},
wlCountPerQueue: make(map[string]int),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we unify the way we allocate the two maps, no preference on my side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
if err := cqImpl.update(cq, c.resourceFlavors); err != nil {
return nil, err
Expand Down Expand Up @@ -232,6 +242,32 @@ func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
}
}
}
qKey := workload.QueueKey(wi.Obj)
if _, ok := c.wlCountPerQueue[qKey]; ok {
c.wlCountPerQueue[qKey] += int(m)
}
}

func (c *ClusterQueue) addQueue(q *kueue.Queue) error {
qKey := queueKey(q)
if _, ok := c.wlCountPerQueue[qKey]; ok {
return errQueueAlreadyExists
}
// We need to count the workloads, because they could have been added before
// receiving the queue add event.
workloads := 0
for _, wl := range c.Workloads {
if workloadBelongsToQueue(wl.Obj, q) {
workloads++
}
}
c.wlCountPerQueue[qKey] = workloads
return nil
}

func (c *ClusterQueue) deleteQueue(q *kueue.Queue) {
qKey := queueKey(q)
delete(c.wlCountPerQueue, qKey)
}

func (c *ClusterQueue) flavorInUse(flavor string) bool {
Expand Down Expand Up @@ -301,7 +337,18 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err
c.clusterQueues[cq.Name] = cqImpl

// On controller restart, an add ClusterQueue event may come after
// add workload events, and so here we explicitly list and add existing workloads.
// add queue and workload, so here we explicitly list and add existing queues
// and workloads.
var queues kueue.QueueList
if err := c.client.List(ctx, &queues, client.MatchingFields{queueClusterQueueKey: cq.Name}); err != nil {
return fmt.Errorf("listing queues that match the clusterQueue: %w", err)
}
for _, q := range queues.Items {
// Checking ClusterQueue name again because the field index is not available in tests.
if string(q.Spec.ClusterQueue) == cq.Name {
cqImpl.wlCountPerQueue[queueKey(&q)] = 0
}
}
var workloads kueue.WorkloadList
if err := c.client.List(ctx, &workloads, client.MatchingFields{workloadClusterQueueKey: cq.Name}); err != nil {
return fmt.Errorf("listing workloads that match the queue: %w", err)
Expand All @@ -312,6 +359,9 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err
continue
}
c.addOrUpdateWorkload(&workloads.Items[i])
if _, ok := cqImpl.wlCountPerQueue[w.Spec.QueueName]; ok {
cqImpl.wlCountPerQueue[w.Spec.QueueName]++
}
}

return nil
Expand Down Expand Up @@ -351,6 +401,43 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
delete(c.clusterQueues, cq.Name)
}

func (c *Cache) AddQueue(q *kueue.Queue) error {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)]
if !ok {
return nil
}
return cq.addQueue(q)
}

func (c *Cache) DeleteQueue(q *kueue.Queue) {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)]
if !ok {
return
}
cq.deleteQueue(q)
}

func (c *Cache) UpdateQueue(oldQ, newQ *kueue.Queue) error {
if oldQ.Spec.ClusterQueue == newQ.Spec.ClusterQueue {
return nil
}
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(oldQ.Spec.ClusterQueue)]
if ok {
cq.deleteQueue(oldQ)
}
cq, ok = c.clusterQueues[string(newQ.Spec.ClusterQueue)]
if ok {
return cq.addQueue(newQ)
}
return nil
}

func (c *Cache) AddOrUpdateWorkload(w *kueue.Workload) bool {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -574,3 +661,12 @@ func SetupIndexes(indexer client.FieldIndexer) error {
return []string{string(wl.Spec.Admission.ClusterQueue)}
})
}

func workloadBelongsToQueue(wl *kueue.Workload, q *kueue.Queue) bool {
return wl.Namespace == q.Namespace && wl.Spec.QueueName == q.Name
}

// Key is the key used to index the queue.
func queueKey(q *kueue.Queue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
}
Loading