From 89b35c380dffa7c809da093832dedf044954f70b Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 16 Mar 2022 15:56:45 +0800 Subject: [PATCH] update --- api/v1alpha1/queuedworkload_types.go | 2 +- .../bases/kueue.x-k8s.io_queuedworkloads.yaml | 4 +- .../core/queuedworkload_controller.go | 7 +- pkg/queue/manager.go | 50 ++++-- pkg/queue/manager_test.go | 23 +-- pkg/queue/queue.go | 50 ++---- pkg/queue/queue_test.go | 157 ++++++++++-------- 7 files changed, 158 insertions(+), 135 deletions(-) diff --git a/api/v1alpha1/queuedworkload_types.go b/api/v1alpha1/queuedworkload_types.go index 63e0b9894c..d56ec0d439 100644 --- a/api/v1alpha1/queuedworkload_types.go +++ b/api/v1alpha1/queuedworkload_types.go @@ -47,7 +47,7 @@ type QueuedWorkloadSpec struct { // PriorityClass object with that name. If not specified, the queuedWorkload // priority will be default or zero if there is no default. // Priority determines the order of access to the resources managed by the - // ClusterQueue the workload is actually queued. + // ClusterQueue where the workload is queued. // The higher the value, the higher the priority. // +optional PriorityClassName string `json:"priorityClassName,omitempty"` diff --git a/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml b/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml index ff0250035e..133a62476a 100644 --- a/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml +++ b/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml @@ -7308,8 +7308,8 @@ spec: a PriorityClass object with that name. If not specified, the queuedWorkload priority will be default or zero if there is no default. Priority determines the order of access to the resources managed by the ClusterQueue - the workload is actually queued. The higher the value, the higher - the priority. + where the workload is queued. The higher the value, the higher the + priority. type: string queueName: description: queueName is the name of the queue the QueuedWorkload diff --git a/pkg/controller/core/queuedworkload_controller.go b/pkg/controller/core/queuedworkload_controller.go index d6a33c865e..2b8fcc8091 100644 --- a/pkg/controller/core/queuedworkload_controller.go +++ b/pkg/controller/core/queuedworkload_controller.go @@ -76,7 +76,7 @@ func (r *QueuedWorkloadReconciler) Create(e event.CreateEvent) bool { } if wl.Spec.Admission == nil { - if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) { + if !r.queues.AddOrUpdateWorkload(context.Background(), wl.DeepCopy()) { log.V(2).Info("Queue for workload didn't exist; ignored for now") } return false @@ -135,6 +135,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool { } log.V(2).Info("QueuedWorkload update event") + ctx := context.Background() switch { case status == finished: if err := r.cache.DeleteWorkload(oldWl); err != nil && prevStatus == admitted { @@ -143,7 +144,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool { r.queues.DeleteWorkload(oldWl) case prevStatus == pending && status == pending: - if !r.queues.UpdateWorkload(oldWl, wl.DeepCopy()) { + if !r.queues.UpdateWorkload(ctx, oldWl, wl.DeepCopy()) { log.V(2).Info("Queue for updated workload didn't exist; ignoring for now") } @@ -157,7 +158,7 @@ func (r *QueuedWorkloadReconciler) Update(e event.UpdateEvent) bool { if err := r.cache.DeleteWorkload(oldWl); err != nil { log.Error(err, "Failed to delete workload from cache") } - if !r.queues.AddOrUpdateWorkload(wl.DeepCopy()) { + if !r.queues.AddOrUpdateWorkload(ctx, wl.DeepCopy()) { log.V(2).Info("Queue for workload didn't exist; ignored for now") } diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 8c4c3204f6..2fb7c878db 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -22,8 +22,10 @@ import ( "fmt" "sync" + "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/api/v1alpha1" @@ -44,6 +46,7 @@ var ( type Manager struct { sync.RWMutex cond sync.Cond + log logr.Logger client client.Client clusterQueues map[string]*ClusterQueue @@ -55,6 +58,7 @@ func NewManager(client client.Client) *Manager { client: client, queues: make(map[string]*Queue), clusterQueues: make(map[string]*ClusterQueue), + log: ctrl.Log.WithName("manager"), } m.cond.L = &m.RWMutex return m @@ -67,7 +71,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e if _, ok := m.clusterQueues[cq.Name]; ok { return errClusterQueueAlreadyExists } - cqImpl := newClusterQueue(m.client, cq) + cqImpl := newClusterQueue(cq) m.clusterQueues[cq.Name] = cqImpl // Iterate through existing queues, as queues corresponding to this cluster @@ -77,6 +81,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e return fmt.Errorf("listing queues pointing to the cluster queue: %w", err) } addedWorkloads := false + for _, q := range queues.Items { // Checking clusterQueue name again because the field index is not available in tests. if string(q.Spec.ClusterQueue) != cq.Name { @@ -138,8 +143,16 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error { if w.Spec.QueueName != q.Name || w.Spec.Admission != nil { continue } - qImpl.AddOrUpdate(&w) + + ctx = ctrl.LoggerInto(ctx, m.log) + info := workload.NewInfo(&w) + // Populate priority from priority class before enqueue. + p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName) + info.Priority = &p + + qImpl.AddOrUpdate(info) } + cq := m.clusterQueues[qImpl.ClusterQueue] if cq != nil && cq.AddFromQueue(qImpl) { m.cond.Broadcast() @@ -154,6 +167,7 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error { if !ok { return errQueueDoesNotExist } + if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) { oldCQ := m.clusterQueues[qImpl.ClusterQueue] if oldCQ != nil { @@ -203,24 +217,33 @@ func (m *Manager) Pending(cq *kueue.ClusterQueue) int32 { // AddOrUpdateWorkload adds or updates workload to the corresponding queue. // Returns whether the queue existed. -func (m *Manager) AddOrUpdateWorkload(w *kueue.QueuedWorkload) bool { +func (m *Manager) AddOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool { m.Lock() defer m.Unlock() - return m.addOrUpdateWorkload(w) + + ctx = ctrl.LoggerInto(ctx, m.log) + return m.addOrUpdateWorkload(ctx, w) } -func (m *Manager) addOrUpdateWorkload(w *kueue.QueuedWorkload) bool { +func (m *Manager) addOrUpdateWorkload(ctx context.Context, w *kueue.QueuedWorkload) bool { qKey := queueKeyForWorkload(w) q := m.queues[qKey] if q == nil { return false } - q.AddOrUpdate(w) + + info := workload.NewInfo(w) + ctx = ctrl.LoggerInto(ctx, m.log) + // Populate priority from priority class before enqueue. + p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName) + info.Priority = &p + + q.AddOrUpdate(info) cq := m.clusterQueues[q.ClusterQueue] if cq == nil { return false } - if !cq.PushOrUpdate(w) { + if !cq.PushOrUpdate(info) { return false } m.cond.Broadcast() @@ -276,12 +299,13 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.QueuedWorkload // UpdateWorkload updates the workload to the corresponding queue or adds it if // it didn't exist. Returns whether the queue existed. -func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool { +func (m *Manager) UpdateWorkload(ctx context.Context, oldW, w *kueue.QueuedWorkload) bool { m.Lock() defer m.Unlock() + if oldW.Spec.QueueName != w.Spec.QueueName { m.deleteWorkloadFromQueueAndClusterQueue(w, queueKeyForWorkload(oldW)) - return m.addOrUpdateWorkload(w) + return m.addOrUpdateWorkload(ctx, w) } q := m.queues[queueKeyForWorkload(w)] @@ -290,7 +314,13 @@ func (m *Manager) UpdateWorkload(oldW, w *kueue.QueuedWorkload) bool { } cq := m.clusterQueues[q.ClusterQueue] if cq != nil { - return cq.PushOrUpdate(w) + info := workload.NewInfo(w) + ctx = ctrl.LoggerInto(ctx, m.log) + // Populate priority from priority class before enqueue. + p := workload.GetPriorityFromPriorityClass(ctx, m.client, info.Obj.Spec.PriorityClassName) + info.Priority = &p + + return cq.PushOrUpdate(info) } return false } diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go index 67deb7a1c8..c6bd161ff5 100644 --- a/pkg/queue/manager_test.go +++ b/pkg/queue/manager_test.go @@ -144,7 +144,7 @@ func TestUpdateQueue(t *testing.T) { } } for _, w := range workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w) } // Update cluster queue of first queue. @@ -168,13 +168,14 @@ func TestUpdateQueue(t *testing.T) { } func TestAddWorkload(t *testing.T) { + ctx := context.Background() scheme := runtime.NewScheme() if err := kueue.AddToScheme(scheme); err != nil { t.Fatalf("Failed adding kueue scheme: %s", err) } manager := NewManager(fake.NewClientBuilder().WithScheme(scheme).Build()) cq := utiltesting.MakeClusterQueue("cq").Obj() - if err := manager.AddClusterQueue(context.Background(), cq); err != nil { + if err := manager.AddClusterQueue(ctx, cq); err != nil { t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err) } queues := []*kueue.Queue{ @@ -182,7 +183,7 @@ func TestAddWorkload(t *testing.T) { utiltesting.MakeQueue("bar", "mars").Obj(), } for _, q := range queues { - if err := manager.AddQueue(context.Background(), q); err != nil { + if err := manager.AddQueue(ctx, q); err != nil { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } @@ -230,7 +231,7 @@ func TestAddWorkload(t *testing.T) { } for _, tc := range cases { t.Run(tc.workload.Name, func(t *testing.T) { - if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded { + if added := manager.AddOrUpdateWorkload(ctx, tc.workload); added != tc.wantAdded { t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded) } }) @@ -298,7 +299,7 @@ func TestStatus(t *testing.T) { } for _, wl := range workloads { wl := wl - manager.AddOrUpdateWorkload(&wl) + manager.AddOrUpdateWorkload(ctx, &wl) } cases := map[string]struct { @@ -412,7 +413,7 @@ func TestRequeueWorkload(t *testing.T) { } } if tc.inQueue { - _ = manager.AddOrUpdateWorkload(tc.workload) + _ = manager.AddOrUpdateWorkload(ctx, tc.workload) } info := workload.NewInfo(tc.workload) if requeued := manager.RequeueWorkload(ctx, info); requeued != tc.wantRequeued { @@ -567,11 +568,11 @@ func TestUpdateWorkload(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w) } wl := tc.workloads[0].DeepCopy() tc.update(wl) - if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated { + if updated := manager.UpdateWorkload(ctx, tc.workloads[0], wl); updated != tc.wantUpdated { t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated) } queueOrder := make(map[string][]string) @@ -666,7 +667,7 @@ func TestHeads(t *testing.T) { go manager.CleanUpOnContext(ctx) for _, wl := range workloads { wl := wl - manager.AddOrUpdateWorkload(&wl) + manager.AddOrUpdateWorkload(ctx, &wl) } wantHeads := []workload.Info{ { @@ -734,7 +735,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddQueue(ctx, &q); err != nil { t.Errorf("Failed adding queue: %s", err) } - mgr.AddOrUpdateWorkload(&wl) + mgr.AddOrUpdateWorkload(ctx, &wl) go func() { if err := mgr.AddClusterQueue(ctx, cq); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) @@ -764,7 +765,7 @@ func TestHeadsAsync(t *testing.T) { t.Errorf("Failed adding queue: %s", err) } go func() { - mgr.AddOrUpdateWorkload(&wl) + mgr.AddOrUpdateWorkload(ctx, &wl) }() }, }, diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 5ab7c315c1..3a07bff3f3 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -18,13 +18,8 @@ package queue import ( "container/heap" - "context" "fmt" - "github.com/go-logr/logr" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - kueue "sigs.k8s.io/kueue/api/v1alpha1" "sigs.k8s.io/kueue/pkg/workload" ) @@ -58,35 +53,33 @@ func (q *Queue) update(apiQueue *kueue.Queue) { q.ClusterQueue = string(apiQueue.Spec.ClusterQueue) } -func (q *Queue) AddOrUpdate(w *kueue.QueuedWorkload) { - key := workload.Key(w) - info := q.items[key] - if info != nil { - info.Obj = w +func (q *Queue) AddOrUpdate(info *workload.Info) { + key := workload.Key(info.Obj) + oldInfo := q.items[key] + if oldInfo != nil { + oldInfo.Obj = info.Obj + oldInfo.Priority = info.Priority return } - q.items[key] = workload.NewInfo(w) + q.items[key] = info } // ClusterQueue is the internal implementation of kueue.ClusterQueue that // holds pending workloads. type ClusterQueue struct { - client client.Client // QueueingStrategy indicates the queueing strategy of the workloads // across the queues in this ClusterQueue. QueueingStrategy kueue.QueueingStrategy - log logr.Logger - heap heapImpl + + heap heapImpl } -func newClusterQueue(client client.Client, cq *kueue.ClusterQueue) *ClusterQueue { +func newClusterQueue(cq *kueue.ClusterQueue) *ClusterQueue { cqImpl := &ClusterQueue{ - client: client, heap: heapImpl{ less: creationFIFO, items: make(map[string]*heapItem), }, - log: ctrl.Log.WithName("cluster-queue"), } cqImpl.update(cq) @@ -119,33 +112,18 @@ func (cq *ClusterQueue) PushIfNotPresent(info *workload.Info) bool { return false } - log := cq.log.WithValues("workload", workload.Key(info.Obj)) - ctx := ctrl.LoggerInto(context.Background(), log) - // Populate priority from priority class before enqueue. - p := workload.GetPriorityFromPriorityClass(ctx, cq.client, - info.Obj.Spec.PriorityClassName) - info.Priority = &p - heap.Push(&cq.heap, *info) return true } -func (cq *ClusterQueue) PushOrUpdate(w *kueue.QueuedWorkload) bool { - item := cq.heap.items[workload.Key(w)] - info := *workload.NewInfo(w) - - log := cq.log.WithValues("workload", workload.Key(w)) - ctx := ctrl.LoggerInto(context.Background(), log) - // Populate priority from priority class before enqueue. - p := workload.GetPriorityFromPriorityClass(ctx, cq.client, - info.Obj.Spec.PriorityClassName) - info.Priority = &p +func (cq *ClusterQueue) PushOrUpdate(info *workload.Info) bool { + item := cq.heap.items[workload.Key(info.Obj)] if item == nil { - heap.Push(&cq.heap, info) + heap.Push(&cq.heap, *info) return true } - item.obj = info + item.obj = *info heap.Fix(&cq.heap, item.index) return true } diff --git a/pkg/queue/queue_test.go b/pkg/queue/queue_test.go index 8c2d34afc9..6adceafdca 100644 --- a/pkg/queue/queue_test.go +++ b/pkg/queue/queue_test.go @@ -20,47 +20,51 @@ import ( "testing" "time" - schedulingv1 "k8s.io/api/scheduling/v1" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/kueue/pkg/workload" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kueue "sigs.k8s.io/kueue/api/v1alpha1" - utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +var ( + lowPriority, highPriority = int32(0), int32(1000) ) func TestFIFOClusterQueue(t *testing.T) { - scheme := runtime.NewScheme() - if err := schedulingv1.AddToScheme(scheme); err != nil { - t.Fatalf("Failed adding scheduling v1 scheme: %v", err) - } - cl := fake.NewClientBuilder().WithScheme(scheme).Build() - q := newClusterQueue(cl, &kueue.ClusterQueue{ + q := newClusterQueue(&kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.StrictFIFO, }, }) now := metav1.Now() - ws := []*kueue.QueuedWorkload{ + + ws := []*workload.Info{ { - ObjectMeta: metav1.ObjectMeta{ - Name: "now", - CreationTimestamp: now, + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "now", + CreationTimestamp: now, + }, }, }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "before", - CreationTimestamp: metav1.NewTime(now.Add(-time.Second)), + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "before", + CreationTimestamp: metav1.NewTime(now.Add(-time.Second)), + }, }, }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "after", - CreationTimestamp: metav1.NewTime(now.Add(time.Second)), + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "after", + CreationTimestamp: metav1.NewTime(now.Add(time.Second)), + }, }, }, } + for _, w := range ws { q.PushOrUpdate(w) } @@ -71,10 +75,12 @@ func TestFIFOClusterQueue(t *testing.T) { if got.Obj.Name != "before" { t.Errorf("Poped workload %q want %q", got.Obj.Name, "before") } - q.PushOrUpdate(&kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "after", - CreationTimestamp: metav1.NewTime(now.Add(-time.Minute)), + q.PushOrUpdate(&workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "after", + CreationTimestamp: metav1.NewTime(now.Add(-time.Minute)), + }, }, }) got = q.Pop() @@ -95,86 +101,93 @@ func TestFIFOClusterQueue(t *testing.T) { } func TestStrictFIFO(t *testing.T) { - scheme := runtime.NewScheme() - if err := schedulingv1.AddToScheme(scheme); err != nil { - t.Fatalf("Failed adding scheduling v1 scheme: %v", err) - } - cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects( - utiltesting.MakePriorityClass("highPriority").PriorityValue(100).Obj(), - utiltesting.MakePriorityClass("lowPriority").PriorityValue(10).Obj(), - ).Build() - t1 := time.Now() t2 := t1.Add(time.Second) for _, tt := range []struct { name string - w1 *kueue.QueuedWorkload - w2 *kueue.QueuedWorkload + w1 *workload.Info + w2 *workload.Info expected string }{ { name: "w1.priority is larger than w2.priority", - w1: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w1", - CreationTimestamp: metav1.NewTime(t1), - }, - Spec: kueue.QueuedWorkloadSpec{ - PriorityClassName: "highPriority", + w1: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "highPriority", + }, }, + Priority: &highPriority, }, - w2: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w2", - CreationTimestamp: metav1.NewTime(t2), - }, - Spec: kueue.QueuedWorkloadSpec{ - PriorityClassName: "lowPriority", + w2: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "lowPriority", + }, }, + Priority: &lowPriority, }, expected: "w1", }, { name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time", - w1: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w1", - CreationTimestamp: metav1.NewTime(t1), + w1: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, }, }, - w2: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w2", - CreationTimestamp: metav1.NewTime(t2), + w2: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, }, }, expected: "w1", }, { name: "p1.priority is less than p2.priority and w1.create time is earlier than w2.create time", - w1: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w1", - CreationTimestamp: metav1.NewTime(t1), - }, - Spec: kueue.QueuedWorkloadSpec{ - PriorityClassName: "lowPriority", + w1: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "lowPriority", + }, }, + Priority: &lowPriority, }, - w2: &kueue.QueuedWorkload{ - ObjectMeta: metav1.ObjectMeta{ - Name: "w2", - CreationTimestamp: metav1.NewTime(t2), - }, - Spec: kueue.QueuedWorkloadSpec{ - PriorityClassName: "highPriority", + w2: &workload.Info{ + Obj: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "highPriority", + }, }, + Priority: &highPriority, }, expected: "w2", }, } { t.Run(tt.name, func(t *testing.T) { - q := newClusterQueue(cl, &kueue.ClusterQueue{ + q := newClusterQueue(&kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ QueueingStrategy: kueue.StrictFIFO, },