Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
denkensk committed Mar 16, 2022
1 parent a55dd1c commit 6b14068
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 138 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/queuedworkload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type QueuedWorkloadSpec struct {
// PriorityClass object with that name. If not specified, the queuedWorkload
// priority will be default or zero if there is no default.
// Priority determines the order of access to the resources managed by the
// ClusterQueue the workload is actually queued.
// ClusterQueue where the workload is queued.
// The higher the value, the higher the priority.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7308,8 +7308,8 @@ spec:
a PriorityClass object with that name. If not specified, the queuedWorkload
priority will be default or zero if there is no default. Priority
determines the order of access to the resources managed by the ClusterQueue
the workload is actually queued. The higher the value, the higher
the priority.
where the workload is queued. The higher the value, the higher the
priority.
type: string
queueName:
description: queueName is the name of the queue the QueuedWorkload
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r *QueueReconciler) Update(e event.UpdateEvent) bool {
}
log := r.log.WithValues("queue", klog.KObj(q))
log.V(2).Info("Queue update event")
if err := r.queues.UpdateQueue(q); err != nil {
if err := r.queues.UpdateQueue(context.Background(), q); err != nil {
log.Error(err, "Failed to update queue in system")
}
return true
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/core/queuedworkload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *QueuedWorkloadReconciler) Create(e event.CreateEvent) bool {
}

if wl.Spec.Admission == nil {
if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) {
if !r.queues.AddOrUpdateWorkload(context.Background(), wl.DeepCopy()) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
return false
Expand Down Expand Up @@ -135,6 +135,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
}
log.V(2).Info("QueuedWorkload update event")

ctx := context.Background()
switch {
case status == finished:
if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted {
Expand All @@ -143,7 +144,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
r.queues.DeleteWorkload(oldWl)

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wl.DeepCopy()) {
if !r.queues.UpdateWorkload(ctx, oldWl, wl.DeepCopy()) {
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now")
}

Expand All @@ -157,7 +158,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool {
if err := r.cache.DeleteWorkload(oldWl); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) {
if !r.queues.AddOrUpdateWorkload(ctx, wl.DeepCopy()) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}

Expand Down
54 changes: 43 additions & 11 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"fmt"
"sync"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
Expand All @@ -44,6 +46,7 @@ var (
type Manager struct {
sync.RWMutex
cond sync.Cond
log logr.Logger

client client.Client
clusterQueues map[string]*ClusterQueue
Expand All @@ -55,6 +58,7 @@ func NewManager(client client.Client) *Manager {
client: client,
queues: make(map[string]*Queue),
clusterQueues: make(map[string]*ClusterQueue),
log: ctrl.Log.WithName("manager"),
}
m.cond.L = &m.RWMutex
return m
Expand All @@ -67,7 +71,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
if _, ok := m.clusterQueues[cq.Name]; ok {
return errClusterQueueAlreadyExists
}
cqImpl := newClusterQueue(m.client, cq)
cqImpl := newClusterQueue(cq)
m.clusterQueues[cq.Name] = cqImpl

// Iterate through existing queues, as queues corresponding to this cluster
Expand All @@ -77,6 +81,8 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
return fmt.Errorf("listing queues pointing to the cluster queue: %w", err)
}
addedWorkloads := false

ctx = ctrl.LoggerInto(ctx, m.log)
for _, q := range queues.Items {
// Checking clusterQueue name again because the field index is not available in tests.
if string(q.Spec.ClusterQueue) != cq.Name {
Expand Down Expand Up @@ -138,22 +144,32 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error {
if w.Spec.QueueName != q.Name || w.Spec.Admission != nil {
continue
}
qImpl.AddOrUpdate(&w)

ctx = ctrl.LoggerInto(ctx, m.log)
info := workload.NewInfo(&w)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

qImpl.AddOrUpdate(info)
}

cq := m.clusterQueues[qImpl.ClusterQueue]
if cq != nil && cq.AddFromQueue(qImpl) {
m.cond.Broadcast()
}
return nil
}

func (m *Manager) UpdateQueue(q *kueue.Queue) error {
func (m *Manager) UpdateQueue(ctx context.Context, q *kueue.Queue) error {
m.Lock()
defer m.Unlock()
qImpl, ok := m.queues[Key(q)]
if !ok {
return errQueueDoesNotExist
}

ctx = ctrl.LoggerInto(ctx, m.log)
if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) {
oldCQ := m.clusterQueues[qImpl.ClusterQueue]
if oldCQ != nil {
Expand Down Expand Up @@ -203,24 +219,33 @@ func (m *Manager) Pending(cq *kueue.ClusterQueue) int32 {

// AddOrUpdateWorkload adds or updates workload to the corresponding queue.
// Returns whether the queue existed.
func (m *Manager) AddOrUpdateWorkload(w *kueue.QueuedWorkload) bool {
func (m *Manager) AddOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool {
m.Lock()
defer m.Unlock()
return m.addOrUpdateWorkload(w)

ctx = ctrl.LoggerInto(ctx, m.log)
return m.addOrUpdateWorkload(ctx, w)
}

func (m *Manager) addOrUpdateWorkload(w *kueue.QueuedWorkload) bool {
func (m *Manager) addOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool {
qKey := queueKeyForWorkload(w)
q := m.queues[qKey]
if q == nil {
return false
}
q.AddOrUpdate(w)

info := workload.NewInfo(w)
ctx = ctrl.LoggerInto(ctx, m.log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

q.AddOrUpdate(info)
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}
if !cq.PushOrUpdate(w) {
if !cq.PushOrUpdate(info) {
return false
}
m.cond.Broadcast()
Expand Down Expand Up @@ -276,12 +301,13 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.QueuedWorkload

// UpdateWorkload updates the workload to the corresponding queue or adds it if
// it didn't exist. Returns whether the queue existed.
func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool {
func (m *Manager) UpdateWorkload(ctx context.Context, oldW, w *kueue.QueuedWorkload) bool {
m.Lock()
defer m.Unlock()

if oldW.Spec.QueueName != w.Spec.QueueName {
m.deleteWorkloadFromQueueAndClusterQueue(w, queueKeyForWorkload(oldW))
return m.addOrUpdateWorkload(w)
return m.addOrUpdateWorkload(ctx, w)
}

q := m.queues[queueKeyForWorkload(w)]
Expand All @@ -290,7 +316,13 @@ func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool {
}
cq := m.clusterQueues[q.ClusterQueue]
if cq != nil {
return cq.PushOrUpdate(w)
info := workload.NewInfo(w)
ctx = ctrl.LoggerInto(ctx, m.log)
// Populate priority from priority class before enqueue.
p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName)
info.Priority = &p

return cq.PushOrUpdate(info)
}
return false
}
Expand Down
25 changes: 13 additions & 12 deletions pkg/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ func TestUpdateQueue(t *testing.T) {
}
}
for _, w := range workloads {
manager.AddOrUpdateWorkload(w)
manager.AddOrUpdateWorkload(ctx, w)
}

// Update cluster queue of first queue.
queues[0].Spec.ClusterQueue = "cq2"
if err := manager.UpdateQueue(queues[0]); err != nil {
if err := manager.UpdateQueue(ctx, queues[0]); err != nil {
t.Fatalf("Failed updating queue: %v", err)
}

Expand All @@ -168,21 +168,22 @@ func TestUpdateQueue(t *testing.T) {
}

func TestAddWorkload(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
if err := kueue.AddToScheme(scheme); err != nil {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
manager := NewManager(fake.NewClientBuilder().WithScheme(scheme).Build())
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(context.Background(), cq); err != nil {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
}
queues := []*kueue.Queue{
utiltesting.MakeQueue("foo", "earth").ClusterQueue("cq").Obj(),
utiltesting.MakeQueue("bar", "mars").Obj(),
}
for _, q := range queues {
if err := manager.AddQueue(context.Background(), q); err != nil {
if err := manager.AddQueue(ctx, q); err != nil {
t.Fatalf("Failed adding queue %s: %v", q.Name, err)
}
}
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestAddWorkload(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.workload.Name, func(t *testing.T) {
if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded {
if added := manager.AddOrUpdateWorkload(ctx, tc.workload); added != tc.wantAdded {
t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded)
}
})
Expand Down Expand Up @@ -298,7 +299,7 @@ func TestStatus(t *testing.T) {
}
for _, wl := range workloads {
wl := wl
manager.AddOrUpdateWorkload(&wl)
manager.AddOrUpdateWorkload(ctx, &wl)
}

cases := map[string]struct {
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestRequeueWorkload(t *testing.T) {
}
}
if tc.inQueue {
_ = manager.AddOrUpdateWorkload(tc.workload)
_ = manager.AddOrUpdateWorkload(ctx, tc.workload)
}
info := workload.NewInfo(tc.workload)
if requeued := manager.RequeueWorkload(ctx, info); requeued != tc.wantRequeued {
Expand Down Expand Up @@ -567,11 +568,11 @@ func TestUpdateWorkload(t *testing.T) {
}
}
for _, w := range tc.workloads {
manager.AddOrUpdateWorkload(w)
manager.AddOrUpdateWorkload(ctx, w)
}
wl := tc.workloads[0].DeepCopy()
tc.update(wl)
if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated {
if updated := manager.UpdateWorkload(ctx, tc.workloads[0], wl); updated != tc.wantUpdated {
t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated)
}
queueOrder := make(map[string][]string)
Expand Down Expand Up @@ -666,7 +667,7 @@ func TestHeads(t *testing.T) {
go manager.CleanUpOnContext(ctx)
for _, wl := range workloads {
wl := wl
manager.AddOrUpdateWorkload(&wl)
manager.AddOrUpdateWorkload(ctx, &wl)
}
wantHeads := []workload.Info{
{
Expand Down Expand Up @@ -734,7 +735,7 @@ func TestHeadsAsync(t *testing.T) {
if err := mgr.AddQueue(ctx, &q); err != nil {
t.Errorf("Failed adding queue: %s", err)
}
mgr.AddOrUpdateWorkload(&wl)
mgr.AddOrUpdateWorkload(ctx, &wl)
go func() {
if err := mgr.AddClusterQueue(ctx, cq); err != nil {
t.Errorf("Failed adding clusterQueue: %v", err)
Expand Down Expand Up @@ -764,7 +765,7 @@ func TestHeadsAsync(t *testing.T) {
t.Errorf("Failed adding queue: %s", err)
}
go func() {
mgr.AddOrUpdateWorkload(&wl)
mgr.AddOrUpdateWorkload(ctx, &wl)
}()
},
},
Expand Down
Loading

0 comments on commit 6b14068

Please sign in to comment.