Skip to content

Commit

Permalink
Track number of active running workloads per queue (kubernetes-sigs#295)
Browse files Browse the repository at this point in the history
* Track number of active running workloads per queue

Change-Id: Iac98b009808de37b70b93ccf7f1b77fcd03a21fc

* Address review comments

Change-Id: If91541b4b91defff2ac1ff1699ca6537b798e6db
  • Loading branch information
alculquicondor authored and ahg-g committed Aug 10, 2022
1 parent 8000cbc commit a3edce7
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 31 deletions.
103 changes: 99 additions & 4 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ import (
"sigs.k8s.io/kueue/pkg/workload"
)

const workloadClusterQueueKey = "spec.admission.clusterQueue"
const (
workloadClusterQueueKey = "spec.admission.clusterQueue"
queueClusterQueueKey = "spec.clusterQueue"
)

var (
errQueueAlreadyExists = errors.New("queue already exists")
errCqNotFound = errors.New("cluster queue not found")
errWorkloadNotAdmitted = errors.New("workload not admitted by a ClusterQueue")
)
Expand Down Expand Up @@ -107,6 +111,10 @@ type ClusterQueue struct {
// that can be matched against the flavors.
LabelKeys map[corev1.ResourceName]sets.String
Status ClusterQueueStatus

// The following fields are not populated in a snapshot.

admittedWorkloadsPerQueue map[string]int
}

// FlavorLimits holds a processed ClusterQueue flavor quota.
Expand All @@ -118,8 +126,9 @@ type FlavorLimits struct {

func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) {
cqImpl := &ClusterQueue{
Name: cq.Name,
Workloads: map[string]*workload.Info{},
Name: cq.Name,
Workloads: make(map[string]*workload.Info),
admittedWorkloadsPerQueue: make(map[string]int),
}
if err := cqImpl.update(cq, c.resourceFlavors); err != nil {
return nil, err
Expand Down Expand Up @@ -232,6 +241,32 @@ func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
}
}
}
qKey := workload.QueueKey(wi.Obj)
if _, ok := c.admittedWorkloadsPerQueue[qKey]; ok {
c.admittedWorkloadsPerQueue[qKey] += int(m)
}
}

func (c *ClusterQueue) addQueue(q *kueue.Queue) error {
qKey := queueKey(q)
if _, ok := c.admittedWorkloadsPerQueue[qKey]; ok {
return errQueueAlreadyExists
}
// We need to count the workloads, because they could have been added before
// receiving the queue add event.
workloads := 0
for _, wl := range c.Workloads {
if workloadBelongsToQueue(wl.Obj, q) {
workloads++
}
}
c.admittedWorkloadsPerQueue[qKey] = workloads
return nil
}

func (c *ClusterQueue) deleteQueue(q *kueue.Queue) {
qKey := queueKey(q)
delete(c.admittedWorkloadsPerQueue, qKey)
}

func (c *ClusterQueue) flavorInUse(flavor string) bool {
Expand Down Expand Up @@ -301,7 +336,18 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err
c.clusterQueues[cq.Name] = cqImpl

// On controller restart, an add ClusterQueue event may come after
// add workload events, and so here we explicitly list and add existing workloads.
// add queue and workload, so here we explicitly list and add existing queues
// and workloads.
var queues kueue.QueueList
if err := c.client.List(ctx, &queues, client.MatchingFields{queueClusterQueueKey: cq.Name}); err != nil {
return fmt.Errorf("listing queues that match the clusterQueue: %w", err)
}
for _, q := range queues.Items {
// Checking ClusterQueue name again because the field index is not available in tests.
if string(q.Spec.ClusterQueue) == cq.Name {
cqImpl.admittedWorkloadsPerQueue[queueKey(&q)] = 0
}
}
var workloads kueue.WorkloadList
if err := c.client.List(ctx, &workloads, client.MatchingFields{workloadClusterQueueKey: cq.Name}); err != nil {
return fmt.Errorf("listing workloads that match the queue: %w", err)
Expand All @@ -312,6 +358,9 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err
continue
}
c.addOrUpdateWorkload(&workloads.Items[i])
if _, ok := cqImpl.admittedWorkloadsPerQueue[w.Spec.QueueName]; ok {
cqImpl.admittedWorkloadsPerQueue[w.Spec.QueueName]++
}
}

return nil
Expand Down Expand Up @@ -351,6 +400,43 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
delete(c.clusterQueues, cq.Name)
}

func (c *Cache) AddQueue(q *kueue.Queue) error {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)]
if !ok {
return nil
}
return cq.addQueue(q)
}

func (c *Cache) DeleteQueue(q *kueue.Queue) {
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)]
if !ok {
return
}
cq.deleteQueue(q)
}

func (c *Cache) UpdateQueue(oldQ, newQ *kueue.Queue) error {
if oldQ.Spec.ClusterQueue == newQ.Spec.ClusterQueue {
return nil
}
c.Lock()
defer c.Unlock()
cq, ok := c.clusterQueues[string(oldQ.Spec.ClusterQueue)]
if ok {
cq.deleteQueue(oldQ)
}
cq, ok = c.clusterQueues[string(newQ.Spec.ClusterQueue)]
if ok {
return cq.addQueue(newQ)
}
return nil
}

func (c *Cache) AddOrUpdateWorkload(w *kueue.Workload) bool {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -574,3 +660,12 @@ func SetupIndexes(indexer client.FieldIndexer) error {
return []string{string(wl.Spec.Admission.ClusterQueue)}
})
}

func workloadBelongsToQueue(wl *kueue.Workload, q *kueue.Queue) bool {
return wl.Namespace == q.Namespace && wl.Spec.QueueName == q.Name
}

// Key is the key used to index the queue.
func queueKey(q *kueue.Queue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
}
Loading

0 comments on commit a3edce7

Please sign in to comment.