diff --git a/api/v1alpha1/queuedworkload_types.go b/api/v1alpha1/queuedworkload_types.go index 52272bd0d4..2d142ce9dc 100644 --- a/api/v1alpha1/queuedworkload_types.go +++ b/api/v1alpha1/queuedworkload_types.go @@ -39,6 +39,20 @@ type QueuedWorkloadSpec struct { // admission holds the parameters of the admission of the workload by a ClusterQueue. Admission *Admission `json:"admission,omitempty"` + + // If specified, indicates the queuedWorkload's priority. + // "system-node-critical" and "system-cluster-critical" are two special + // keywords which indicate the highest priorities with the former being + // the highest priority. Any other name must be defined by creating a + // PriorityClass object with that name. If not specified, the queuedWorkload + // priority will be default or zero if there is no default. + PriorityClassName string `json:"priorityClassName,omitempty"` + + // Priority determines the order of access to the resources managed by the + // ClusterQueue where the workload is queued. + // The priority value is populated from PriorityClassName. + // The higher the value, the higher the priority. + Priority *int32 `json:"priority,omitempty"` } type Admission struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 0ea177f1e9..de60615a88 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -386,6 +386,11 @@ func (in *QueuedWorkloadSpec) DeepCopyInto(out *QueuedWorkloadSpec) { *out = new(Admission) (*in).DeepCopyInto(*out) } + if in.Priority != nil { + in, out := &in.Priority, &out.Priority + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueuedWorkloadSpec. diff --git a/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml b/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml index 1c7854b8e9..1a2d4bd45b 100644 --- a/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml +++ b/config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml @@ -7300,6 +7300,21 @@ spec: x-kubernetes-list-map-keys: - name x-kubernetes-list-type: map + priority: + description: Priority determines the order of access to the resources + managed by the ClusterQueue where the workload is queued. The priority + value is populated from PriorityClassName. The higher the value, + the higher the priority. + format: int32 + type: integer + priorityClassName: + description: If specified, indicates the queuedWorkload's priority. + "system-node-critical" and "system-cluster-critical" are two special + keywords which indicate the highest priorities with the former being + the highest priority. Any other name must be defined by creating + a PriorityClass object with that name. If not specified, the queuedWorkload + priority will be default or zero if there is no default. + type: string queueName: description: queueName is the name of the queue the QueuedWorkload is associated with. diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index e601c66145..9f0b489978 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -30,4 +30,9 @@ const ( // UpdatesBatchPeriod is the batch period to hold queuedworkload updates // before syncing a Queue and CLusterQueue onbjects. UpdatesBatchPeriod = 3 * time.Second + + // DefaultPriority is used to set priority of workloads + // that do not specify any priority class and there is no priority class + // marked as default. + DefaultPriority = 0 ) diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index d57b89548e..805c422e32 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -35,6 +35,7 @@ import ( kueue "sigs.k8s.io/kueue/api/v1alpha1" "sigs.k8s.io/kueue/pkg/constants" + utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -294,7 +295,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv } // Create the corresponding workload. - wl, err := ConstructWorkloadFor(job, r.scheme) + wl, err := ConstructWorkloadFor(ctx, r.client, job, r.scheme) if err != nil { return err } @@ -370,7 +371,8 @@ func (r *JobReconciler) ensureAtMostOneWorkload(ctx context.Context, job *batchv return match, nil } -func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) { +func ConstructWorkloadFor(ctx context.Context, client client.Client, + job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) { w := &kueue.QueuedWorkload{ ObjectMeta: metav1.ObjectMeta{ Name: job.Name, @@ -386,6 +388,16 @@ func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Queu QueueName: queueName(job), }, } + + // Populate priority from priority class. + priorityClassName, p, err := utilpriority.GetPriorityFromPriorityClass( + ctx, client, job.Spec.Template.Spec.PriorityClassName) + if err != nil { + return nil, err + } + w.Spec.Priority = &p + w.Spec.PriorityClassName = priorityClassName + if err := ctrl.SetControllerReference(job, w, scheme); err != nil { return nil, err } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 441538e08f..d4214f63b8 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -21,6 +21,7 @@ import ( "fmt" kueue "sigs.k8s.io/kueue/api/v1alpha1" + utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -140,6 +141,12 @@ func (cq *ClusterQueue) Pop() *workload.Info { } func creationFIFO(a, b workload.Info) bool { + p1 := utilpriority.Priority(a.Obj) + p2 := utilpriority.Priority(b.Obj) + + if p1 != p2 { + return p1 > p2 + } return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp) } diff --git a/pkg/queue/queue_test.go b/pkg/queue/queue_test.go index 720ee639ee..42186da7f7 100644 --- a/pkg/queue/queue_test.go +++ b/pkg/queue/queue_test.go @@ -21,9 +21,16 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + kueue "sigs.k8s.io/kueue/api/v1alpha1" ) +const ( + lowPriority = 0 + highPriority = 1000 +) + func TestFIFOClusterQueue(t *testing.T) { q := newClusterQueue(&kueue.ClusterQueue{ Spec: kueue.ClusterQueueSpec{ @@ -83,3 +90,98 @@ func TestFIFOClusterQueue(t *testing.T) { t.Errorf("Queue is not empty, poped workload %q", got.Obj.Name) } } + +func TestStrictFIFO(t *testing.T) { + t1 := time.Now() + t2 := t1.Add(time.Second) + for _, tt := range []struct { + name string + w1 *kueue.QueuedWorkload + w2 *kueue.QueuedWorkload + expected string + }{ + { + name: "w1.priority is higher than w2.priority", + w1: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "highPriority", + Priority: pointer.Int32(highPriority), + }, + }, + w2: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "lowPriority", + Priority: pointer.Int32(lowPriority), + }, + }, + expected: "w1", + }, + { + name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time", + w1: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + }, + w2: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + }, + expected: "w1", + }, + { + name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time", + w1: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w1", + CreationTimestamp: metav1.NewTime(t1), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "lowPriority", + Priority: pointer.Int32(lowPriority), + }, + }, + w2: &kueue.QueuedWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: "w2", + CreationTimestamp: metav1.NewTime(t2), + }, + Spec: kueue.QueuedWorkloadSpec{ + PriorityClassName: "highPriority", + Priority: pointer.Int32(highPriority), + }, + }, + expected: "w2", + }, + } { + t.Run(tt.name, func(t *testing.T) { + q := newClusterQueue(&kueue.ClusterQueue{ + Spec: kueue.ClusterQueueSpec{ + QueueingStrategy: kueue.StrictFIFO, + }, + }) + + q.PushOrUpdate(tt.w1) + q.PushOrUpdate(tt.w2) + + got := q.Pop() + if got == nil { + t.Fatal("Queue is empty") + } + if got.Obj.Name != tt.expected { + t.Errorf("Poped workload %q want %q", got.Obj.Name, tt.expected) + } + }) + } +} diff --git a/pkg/util/priority/priority.go b/pkg/util/priority/priority.go new file mode 100644 index 0000000000..22e732c4d5 --- /dev/null +++ b/pkg/util/priority/priority.go @@ -0,0 +1,87 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priority + +import ( + "context" + + schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/api/v1alpha1" + "sigs.k8s.io/kueue/pkg/constants" +) + +// Priority returns priority of the given workload. +func Priority(w *kueue.QueuedWorkload) int32 { + if w.Spec.Priority != nil { + return *w.Spec.Priority + } + // When priority of a running workload is nil, it means it was created at a time + // that there was no global default priority class and the priority class + // name of the pod was empty. So, we resolve to the static default priority. + return constants.DefaultPriority +} + +// GetPriorityFromPriorityClass returns the priority populated from +// priority class. If not specified, priority will be default or +// zero if there is no default. +func GetPriorityFromPriorityClass(ctx context.Context, client client.Client, + priorityClass string) (string, int32, error) { + if len(priorityClass) == 0 { + return getDefaultPriority(ctx, client) + } + + pc := &schedulingv1.PriorityClass{} + if err := client.Get(ctx, types.NamespacedName{Name: priorityClass}, pc); err != nil { + return "", 0, err + } + + return pc.Name, pc.Value, nil +} + +func getDefaultPriority(ctx context.Context, client client.Client) (string, int32, error) { + dpc, err := getDefaultPriorityClass(ctx, client) + if err != nil { + return "", 0, err + } + if dpc != nil { + return dpc.Name, dpc.Value, nil + } + return "", int32(constants.DefaultPriority), nil +} + +func getDefaultPriorityClass(ctx context.Context, client client.Client) (*schedulingv1.PriorityClass, error) { + pcs := schedulingv1.PriorityClassList{} + err := client.List(ctx, &pcs) + if err != nil { + return nil, err + } + + // In case more than one global default priority class is added as a result of a race condition, + // we pick the one with the lowest priority value. + var defaultPC *schedulingv1.PriorityClass + for _, pci := range pcs.Items { + if pci.GlobalDefault { + if defaultPC == nil || defaultPC.Value > pci.Value { + defaultPC = &pci + } + } + } + return defaultPC, nil +} diff --git a/pkg/util/routine/wrapper.go b/pkg/util/routine/wrapper.go index 356cfa42ce..23b5cce289 100644 --- a/pkg/util/routine/wrapper.go +++ b/pkg/util/routine/wrapper.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package routine // Wrapper is used to wrap a function that will run in a goroutine. diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index c93b3470e6..33c409c27a 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -21,6 +21,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -78,6 +79,12 @@ func (j *JobWrapper) Parallelism(p int32) *JobWrapper { return j } +// PriorityClass updates job priorityclass. +func (j *JobWrapper) PriorityClass(pc string) *JobWrapper { + j.Spec.Template.Spec.PriorityClassName = pc + return j +} + // Queue updates the queue name of the job func (j *JobWrapper) Queue(queue string) *JobWrapper { j.Annotations[constants.QueueAnnotation] = queue @@ -102,6 +109,31 @@ func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper { return j } +// PriorityClassWrapper wraps a PriorityClass. +type PriorityClassWrapper struct { + schedulingv1.PriorityClass +} + +// MakePriorityClass creates a wrapper for a PriorityClass. +func MakePriorityClass(name string) *PriorityClassWrapper { + return &PriorityClassWrapper{schedulingv1.PriorityClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }}, + } +} + +// PriorityValue update value of PriorityClass。 +func (p *PriorityClassWrapper) PriorityValue(v int32) *PriorityClassWrapper { + p.Value = v + return p +} + +// Obj returns the inner PriorityClass. +func (p *PriorityClassWrapper) Obj() *schedulingv1.PriorityClass { + return &p.PriorityClass +} + type QueuedWorkloadWrapper struct{ kueue.QueuedWorkload } // MakeQueuedWorkload creates a wrapper for a QueuedWorkload with a single @@ -154,6 +186,11 @@ func (w *QueuedWorkloadWrapper) Creation(t time.Time) *QueuedWorkloadWrapper { return w } +func (w *QueuedWorkloadWrapper) PriorityClass(priorityClassName string) *QueuedWorkloadWrapper { + w.Spec.PriorityClassName = priorityClassName + return w +} + // AdmissionWrapper wraps an Admission type AdmissionWrapper struct{ kueue.Admission } diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index bd1eeba8fd..81bd60b363 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -37,13 +37,15 @@ import ( ) const ( - parallelism = 4 - jobName = "test-job" - jobNamespace = "default" - jobKey = jobNamespace + "/" + jobName - labelKey = "cloud.provider.com/instance" - flavorOnDemand = "on-demand" - flavorSpot = "spot" + parallelism = 4 + jobName = "test-job" + jobNamespace = "default" + jobKey = jobNamespace + "/" + jobName + labelKey = "cloud.provider.com/instance" + flavorOnDemand = "on-demand" + flavorSpot = "spot" + priorityClassName = "test-priority-class" + priorityValue = 10 ) // +kubebuilder:docs-gen:collapse=Imports @@ -51,7 +53,10 @@ const ( var _ = ginkgo.Describe("Job controller", func() { ginkgo.It("Should reconcile workload and job", func() { ginkgo.By("checking the job gets suspended when created unsuspended") - job := testing.MakeJob(jobName, jobNamespace).Obj() + priorityClass := testing.MakePriorityClass(priorityClassName). + PriorityValue(int32(priorityValue)).Obj() + gomega.Expect(k8sClient.Create(ctx, priorityClass)).Should(gomega.Succeed()) + job := testing.MakeJob(jobName, jobNamespace).PriorityClass(priorityClassName).Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} createdJob := &batchv1.Job{} @@ -70,6 +75,10 @@ var _ = ginkgo.Describe("Job controller", func() { }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal("")) + ginkgo.By("checking the workload is created with priority and priorityName") + gomega.Expect(createdWorkload.Spec.PriorityClassName).Should(gomega.Equal(priorityClassName)) + gomega.Expect(*createdWorkload.Spec.Priority).Should(gomega.Equal(int32(priorityValue))) + ginkgo.By("checking the workload is updated with queue name when the job does") jobQueueName := "test-queue" createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} @@ -82,7 +91,7 @@ var _ = ginkgo.Describe("Job controller", func() { }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) ginkgo.By("checking a second non-matching workload is deleted") - secondWl, _ := workloadjob.ConstructWorkloadFor(createdJob, scheme.Scheme) + secondWl, _ := workloadjob.ConstructWorkloadFor(ctx, k8sClient, createdJob, scheme.Scheme) secondWl.Name = "second-workload" secondWl.Spec.PodSets[0].Count = parallelism + 1 gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed()) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 50e613e9ba..fa5ead8dce 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -269,4 +269,56 @@ var _ = ginkgo.Describe("Scheduler", func() { return createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) }) + + ginkgo.It("Should schedule jobs according to their priorities", func() { + queue := testing.MakeQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + + highPriorityClass := testing.MakePriorityClass("high-priority-class").PriorityValue(100).Obj() + gomega.Expect(k8sClient.Create(ctx, highPriorityClass)).Should(gomega.Succeed()) + + lowPriorityClass := testing.MakePriorityClass("low-priority-class").PriorityValue(10).Obj() + gomega.Expect(k8sClient.Create(ctx, lowPriorityClass)).Should(gomega.Succeed()) + + jobLowPriority := testing.MakeJob("job-low-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").PriorityClass(lowPriorityClass.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, jobLowPriority)).Should(gomega.Succeed()) + jobHighPriority := testing.MakeJob("job-high-priority", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "5").PriorityClass(highPriorityClass.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, jobHighPriority)).Should(gomega.Succeed()) + + ginkgo.By("checking that workload1 is created with priority and priorityName") + createdLowPriorityWorkload := &kueue.QueuedWorkload{} + gomega.Eventually(func() bool { + lookupKey := types.NamespacedName{Name: jobLowPriority.Name, Namespace: jobLowPriority.Namespace} + err := k8sClient.Get(ctx, lookupKey, createdLowPriorityWorkload) + return err == nil + }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) + gomega.Expect(createdLowPriorityWorkload.Spec.PriorityClassName).Should(gomega.Equal(lowPriorityClass.Name)) + gomega.Expect(*createdLowPriorityWorkload.Spec.Priority).Should(gomega.Equal(lowPriorityClass.Value)) + + ginkgo.By("checking that workload2 is created with priority and priorityName") + createdHighPriorityWorkload := &kueue.QueuedWorkload{} + gomega.Eventually(func() bool { + lookupKey := types.NamespacedName{Name: jobHighPriority.Name, Namespace: jobHighPriority.Namespace} + err := k8sClient.Get(ctx, lookupKey, createdHighPriorityWorkload) + return err == nil + }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) + gomega.Expect(createdHighPriorityWorkload.Spec.PriorityClassName).Should(gomega.Equal(highPriorityClass.Name)) + gomega.Expect(*createdHighPriorityWorkload.Spec.Priority).Should(gomega.Equal(highPriorityClass.Value)) + + // delay creating the queue until after workloads are created. + gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) + + ginkgo.By("checking the job with low priority continues to be suspended") + createdJob1 := &batchv1.Job{} + gomega.Consistently(func() bool { + return k8sClient.Get(ctx, types.NamespacedName{Name: jobLowPriority.Name, Namespace: ns.Name}, + createdJob1) == nil && *createdJob1.Spec.Suspend + }, framework.ConsistentDuration, framework.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the job with high priority starts") + createdJob2 := &batchv1.Job{} + gomega.Eventually(func() *bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: jobHighPriority.Name, Namespace: ns.Name}, createdJob2)).Should(gomega.Succeed()) + return createdJob2.Spec.Suspend + }, framework.Timeout, framework.Interval).Should(gomega.Equal(pointer.Bool(false))) + }) })