Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
denkensk committed Mar 16, 2022
1 parent a55dd1c commit 89b35c3
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 135 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/queuedworkload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type QueuedWorkloadSpec struct {
// PriorityClass object with that name. If not specified, the queuedWorkload
// priority will be default or zero if there is no default.
// Priority determines the order of access to the resources managed by the
// ClusterQueue the workload is actually queued.
// ClusterQueue where the workload is queued.
// The higher the value, the higher the priority.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7308,8 +7308,8 @@ spec:
a PriorityClass object with that name. If not specified, the queuedWorkload
priority will be default or zero if there is no default. Priority
determines the order of access to the resources managed by the ClusterQueue
the workload is actually queued. The higher the value, the higher
the priority.
where the workload is queued. The higher the value, the higher the
priority.
type: string
queueName:
description: queueName is the name of the queue the QueuedWorkload
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/core/queuedworkload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *QueuedWorkloadReconciler) Create(e event.CreateEvent) bool {
}

if wl.Spec.Admission == nil {
if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) {
if !r.queues.AddOrUpdateWorkload(context.Background(), wl.DeepCopy()) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
return false
Expand Down Expand Up @@ -135,6 +135,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
}
log.V(2).Info("QueuedWorkload update event")

ctx := context.Background()
switch {
case status == finished:
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
Expand All @@ -143,7 +144,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
r.queues.DeleteWorkload(oldWl)

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wl.DeepCopy()) {
if !r.queues.UpdateWorkload(ctx, oldWl, wl.DeepCopy()) {
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now")
}

Expand All @@ -157,7 +158,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) {
if !r.queues.AddOrUpdateWorkload(ctx, wl.DeepCopy()) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}

Expand Down
50 changes: 40 additions & 10 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"fmt"
"sync"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand All @@ -44,6 +46,7 @@ var (
type Manager struct {
sync.RWMutex
cond sync.Cond
log logr.Logger

client client.Client
clusterQueues map[string]*ClusterQueue
Expand All @@ -55,6 +58,7 @@ func NewManager(client client.Client) *Manager {
client: client,
queues: make(map[string]*Queue),
clusterQueues: make(map[string]*ClusterQueue),
log: ctrl.Log.WithName("manager"),
}
m.cond.L = &m.RWMutex
return m
Expand All @@ -67,7 +71,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
if _, ok := m.clusterQueues[cq.Name]; ok {
return errClusterQueueAlreadyExists
}
cqImpl := newClusterQueue(m.client, cq)
cqImpl := newClusterQueue(cq)
m.clusterQueues[cq.Name] = cqImpl

// Iterate through existing queues, as queues corresponding to this cluster
Expand All @@ -77,6 +81,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
return fmt.Errorf("listing queues pointing to the cluster queue: %w", err)
}
addedWorkloads := false

for _, q := range queues.Items {
// Checking clusterQueue name again because the field index is not available in tests.
if string(q.Spec.ClusterQueue) != cq.Name {
Expand Down Expand Up @@ -138,8 +143,16 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error {
if w.Spec.QueueName != q.Name || w.Spec.Admission != nil {
continue
}
qImpl.AddOrUpdate(&w)

ctx = ctrl.LoggerInto(ctx, m.log)
info := workload.NewInfo(&w)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

qImpl.AddOrUpdate(info)
}

cq := m.clusterQueues[qImpl.ClusterQueue]
if cq != nil && cq.AddFromQueue(qImpl) {
m.cond.Broadcast()
Expand All @@ -154,6 +167,7 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error {
if !ok {
return errQueueDoesNotExist
}

if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) {
oldCQ := m.clusterQueues[qImpl.ClusterQueue]
if oldCQ != nil {
Expand Down Expand Up @@ -203,24 +217,33 @@ func (m *Manager) Pending(cq *kueue.ClusterQueue) int32 {

// AddOrUpdateWorkload adds or updates workload to the corresponding queue.
// Returns whether the queue existed.
func (m *Manager) AddOrUpdateWorkload(w *kueue.QueuedWorkload) bool {
func (m *Manager) AddOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool {
m.Lock()
defer m.Unlock()
return m.addOrUpdateWorkload(w)

ctx = ctrl.LoggerInto(ctx, m.log)
return m.addOrUpdateWorkload(ctx, w)
}

func (m *Manager) addOrUpdateWorkload(w *kueue.QueuedWorkload) bool {
func (m *Manager) addOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool {
qKey := queueKeyForWorkload(w)
q := m.queues[qKey]
if q == nil {
return false
}
q.AddOrUpdate(w)

info := workload.NewInfo(w)
ctx = ctrl.LoggerInto(ctx, m.log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

q.AddOrUpdate(info)
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}
if !cq.PushOrUpdate(w) {
if !cq.PushOrUpdate(info) {
return false
}
m.cond.Broadcast()
Expand Down Expand Up @@ -276,12 +299,13 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.QueuedWorkload

// UpdateWorkload updates the workload to the corresponding queue or adds it if
// it didn't exist. Returns whether the queue existed.
func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool {
func (m *Manager) UpdateWorkload(ctx context.Context, oldW, w *kueue.QueuedWorkload) bool {
m.Lock()
defer m.Unlock()

if oldW.Spec.QueueName != w.Spec.QueueName {
m.deleteWorkloadFromQueueAndClusterQueue(w, queueKeyForWorkload(oldW))
return m.addOrUpdateWorkload(w)
return m.addOrUpdateWorkload(ctx, w)
}

q := m.queues[queueKeyForWorkload(w)]
Expand All @@ -290,7 +314,13 @@ func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool {
}
cq := m.clusterQueues[q.ClusterQueue]
if cq != nil {
return cq.PushOrUpdate(w)
info := workload.NewInfo(w)
ctx = ctrl.LoggerInto(ctx, m.log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

return cq.PushOrUpdate(info)
}
return false
}
Expand Down
23 changes: 12 additions & 11 deletions pkg/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUpdateQueue(t *testing.T) {
}
}
for _, w := range workloads {
manager.AddOrUpdateWorkload(w)
manager.AddOrUpdateWorkload(ctx, w)
}

// Update cluster queue of first queue.
Expand All @@ -168,21 +168,22 @@ func TestUpdateQueue(t *testing.T) {
}

func TestAddWorkload(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
if err := kueue.AddToScheme(scheme); err != nil {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
manager := NewManager(fake.NewClientBuilder().WithScheme(scheme).Build())
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(context.Background(), cq); err != nil {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
}
queues := []*kueue.Queue{
utiltesting.MakeQueue("foo", "earth").ClusterQueue("cq").Obj(),
utiltesting.MakeQueue("bar", "mars").Obj(),
}
for _, q := range queues {
if err := manager.AddQueue(context.Background(), q); err != nil {
if err := manager.AddQueue(ctx, q); err != nil {
t.Fatalf("Failed adding queue %s: %v", q.Name, err)
}
}
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestAddWorkload(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.workload.Name, func(t *testing.T) {
if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded {
if added := manager.AddOrUpdateWorkload(ctx, tc.workload); added != tc.wantAdded {
t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded)
}
})
Expand Down Expand Up @@ -298,7 +299,7 @@ func TestStatus(t *testing.T) {
}
for _, wl := range workloads {
wl := wl
manager.AddOrUpdateWorkload(&wl)
manager.AddOrUpdateWorkload(ctx, &wl)
}

cases := map[string]struct {
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestRequeueWorkload(t *testing.T) {
}
}
if tc.inQueue {
_ = manager.AddOrUpdateWorkload(tc.workload)
_ = manager.AddOrUpdateWorkload(ctx, tc.workload)
}
info := workload.NewInfo(tc.workload)
if requeued := manager.RequeueWorkload(ctx, info); requeued != tc.wantRequeued {
Expand Down Expand Up @@ -567,11 +568,11 @@ func TestUpdateWorkload(t *testing.T) {
}
}
for _, w := range tc.workloads {
manager.AddOrUpdateWorkload(w)
manager.AddOrUpdateWorkload(ctx, w)
}
wl := tc.workloads[0].DeepCopy()
tc.update(wl)
if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated {
if updated := manager.UpdateWorkload(ctx, tc.workloads[0], wl); updated != tc.wantUpdated {
t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated)
}
queueOrder := make(map[string][]string)
Expand Down Expand Up @@ -666,7 +667,7 @@ func TestHeads(t *testing.T) {
go manager.CleanUpOnContext(ctx)
for _, wl := range workloads {
wl := wl
manager.AddOrUpdateWorkload(&wl)
manager.AddOrUpdateWorkload(ctx, &wl)
}
wantHeads := []workload.Info{
{
Expand Down Expand Up @@ -734,7 +735,7 @@ func TestHeadsAsync(t *testing.T) {
if err := mgr.AddQueue(ctx, &q); err != nil {
t.Errorf("Failed adding queue: %s", err)
}
mgr.AddOrUpdateWorkload(&wl)
mgr.AddOrUpdateWorkload(ctx, &wl)
go func() {
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
t.Errorf("Failed adding clusterQueue: %v", err)
Expand Down Expand Up @@ -764,7 +765,7 @@ func TestHeadsAsync(t *testing.T) {
t.Errorf("Failed adding queue: %s", err)
}
go func() {
mgr.AddOrUpdateWorkload(&wl)
mgr.AddOrUpdateWorkload(ctx, &wl)
}()
},
},
Expand Down
50 changes: 14 additions & 36 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@ package queue

import (
"container/heap"
"context"
"fmt"

"github.com/go-logr/logr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -58,35 +53,33 @@ func (q *Queue) update(apiQueue *kueue.Queue) {
q.ClusterQueue = string(apiQueue.Spec.ClusterQueue)
}

func (q *Queue) AddOrUpdate(w *kueue.QueuedWorkload) {
key := workload.Key(w)
info := q.items[key]
if info != nil {
info.Obj = w
func (q *Queue) AddOrUpdate(info *workload.Info) {
key := workload.Key(info.Obj)
oldInfo := q.items[key]
if oldInfo != nil {
oldInfo.Obj = info.Obj
oldInfo.Priority = info.Priority
return
}
q.items[key] = workload.NewInfo(w)
q.items[key] = info
}

// ClusterQueue is the internal implementation of kueue.ClusterQueue that
// holds pending workloads.
type ClusterQueue struct {
client client.Client
// QueueingStrategy indicates the queueing strategy of the workloads
// across the queues in this ClusterQueue.
QueueingStrategy kueue.QueueingStrategy
log logr.Logger
heap heapImpl

heap heapImpl
}

func newClusterQueue(client client.Client, cq *kueue.ClusterQueue) *ClusterQueue {
func newClusterQueue(cq *kueue.ClusterQueue) *ClusterQueue {
cqImpl := &ClusterQueue{
client: client,
heap: heapImpl{
less: creationFIFO,
items: make(map[string]*heapItem),
},
log: ctrl.Log.WithName("cluster-queue"),
}

cqImpl.update(cq)
Expand Down Expand Up @@ -119,33 +112,18 @@ func (cq *ClusterQueue) PushIfNotPresent(info *workload.Info) bool {
return false
}

log := cq.log.WithValues("workload", workload.Key(info.Obj))
ctx := ctrl.LoggerInto(context.Background(), log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, cq.client,
info.Obj.Spec.PriorityClassName)
info.Priority = &p

heap.Push(&cq.heap, *info)
return true
}

func (cq *ClusterQueue) PushOrUpdate(w *kueue.QueuedWorkload) bool {
item := cq.heap.items[workload.Key(w)]
info := *workload.NewInfo(w)

log := cq.log.WithValues("workload", workload.Key(w))
ctx := ctrl.LoggerInto(context.Background(), log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, cq.client,
info.Obj.Spec.PriorityClassName)
info.Priority = &p
func (cq *ClusterQueue) PushOrUpdate(info *workload.Info) bool {
item := cq.heap.items[workload.Key(info.Obj)]

if item == nil {
heap.Push(&cq.heap, info)
heap.Push(&cq.heap, *info)
return true
}
item.obj = info
item.obj = *info
heap.Fix(&cq.heap, item.index)
return true
}
Expand Down
Loading

0 comments on commit 89b35c3

Please sign in to comment.