Skip to content

Commit

Permalink
add priority in Workload api
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Wang <wangqingcan1990@gmail.com>
  • Loading branch information
denkensk committed Mar 16, 2022
1 parent 399521d commit edacea0
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 12 deletions.
14 changes: 14 additions & 0 deletions api/v1alpha1/queuedworkload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ type QueuedWorkloadSpec struct {

// admission holds the parameters of the admission of the workload by a ClusterQueue.
Admission *Admission `json:"admission,omitempty"`

// If specified, indicates the queuedWorkload's priority.
// "system-node-critical" and "system-cluster-critical" are two special
// keywords which indicate the highest priorities with the former being
// the highest priority. Any other name must be defined by creating a
// PriorityClass object with that name. If not specified, the queuedWorkload
// priority will be default or zero if there is no default.
PriorityClassName string `json:"priorityClassName,omitempty"`

// Priority determines the order of access to the resources managed by the
// ClusterQueue where the workload is queued.
// The priority value is populated from PriorityClassName.
// The higher the value, the higher the priority.
Priority *int32 `json:"priority,omitempty"`
}

type Admission struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7300,6 +7300,21 @@ spec:
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
priority:
description: Priority determines the order of access to the resources
managed by the ClusterQueue where the workload is queued. The priority
value is populated from PriorityClassName. The higher the value,
the higher the priority.
format: int32
type: integer
priorityClassName:
description: If specified, indicates the queuedWorkload's priority.
"system-node-critical" and "system-cluster-critical" are two special
keywords which indicate the highest priorities with the former being
the highest priority. Any other name must be defined by creating
a PriorityClass object with that name. If not specified, the queuedWorkload
priority will be default or zero if there is no default.
type: string
queueName:
description: queueName is the name of the queue the QueuedWorkload
is associated with.
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ const (
// UpdatesBatchPeriod is the batch period to hold queuedworkload updates
// before syncing a Queue and CLusterQueue onbjects.
UpdatesBatchPeriod = 3 * time.Second

// DefaultPriority is used to set priority of workloads
// that do not specify any priority class and there is no priority class
// marked as default.
DefaultPriority = 0
)
16 changes: 14 additions & 2 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -294,7 +295,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv
}

// Create the corresponding workload.
wl, err := ConstructWorkloadFor(job, r.scheme)
wl, err := ConstructWorkloadFor(ctx, r.client, job, r.scheme)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +371,8 @@ func (r *JobReconciler) ensureAtMostOneWorkload(ctx context.Context, job *batchv
return match, nil
}

func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
func ConstructWorkloadFor(ctx context.Context, client client.Client,
job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
w := &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: job.Name,
Expand All @@ -386,6 +388,16 @@ func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Queu
QueueName: queueName(job),
},
}

priorityClassName := job.Spec.Template.Spec.PriorityClassName
// Populate priority from priority class.
p, err := utilpriority.GetPriorityFromPriorityClass(ctx, client, priorityClassName)
if err != nil {
return nil, err
}
w.Spec.Priority = &p
w.Spec.PriorityClassName = priorityClassName

if err := ctrl.SetControllerReference(job, w, scheme); err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"container/heap"
"fmt"

utilpriority "sigs.k8s.io/kueue/pkg/util/priority"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -140,7 +142,9 @@ func (cq *ClusterQueue) Pop() *workload.Info {
}

func creationFIFO(a, b workload.Info) bool {
return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp)
p1 := utilpriority.Priority(a)
p2 := utilpriority.Priority(b)
return (p1 > p2) || (p1 == p2 && a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp))
}

// heap.Interface implementation inspired by
Expand Down
99 changes: 99 additions & 0 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
kueue "sigs.k8s.io/kueue/api/v1alpha1"
)

var (
lowPriority, highPriority = int32(0), int32(1000)
)

func TestFIFOClusterQueue(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
Expand Down Expand Up @@ -83,3 +87,98 @@ func TestFIFOClusterQueue(t *testing.T) {
t.Errorf("Queue is not empty, poped workload %q", got.Obj.Name)
}
}

func TestStrictFIFO(t *testing.T) {
t1 := time.Now()
t2 := t1.Add(time.Second)
for _, tt := range []struct {
name string
w1 *kueue.QueuedWorkload
w2 *kueue.QueuedWorkload
expected string
}{
{
name: "w1.priority is larger than w2.priority",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: &highPriority,
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: &highPriority,
},
},
expected: "w1",
},
{
name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
},
expected: "w1",
},
{
name: "p1.priority is less than p2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "lowPriority",
Priority: &lowPriority,
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: &highPriority,
},
},
expected: "w2",
},
} {
t.Run(tt.name, func(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})

q.PushOrUpdate(tt.w1)
q.PushOrUpdate(tt.w2)

got := q.Pop()
if got == nil {
t.Fatal("Queue is empty")
}
if got.Obj.Name != tt.expected {
t.Errorf("Poped workload %q want %q", got.Obj.Name, tt.expected)
}
})
}
}
71 changes: 71 additions & 0 deletions pkg/util/priority/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package priority

import (
"context"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/workload"
)

// Priority returns priority of the given workload.
func Priority(w workload.Info) int32 {
if w.Obj.Spec.Priority != nil {
return *w.Obj.Spec.Priority
}
// When priority of a running workload is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return constants.DefaultPriority
}

// GetPriorityFromPriorityClass returns the priority populated from
// priority class. If not specified, priority will be default or
// zero if there is no default.
func GetPriorityFromPriorityClass(ctx context.Context, client client.Client,
priorityClass string) (int32, error) {
if len(priorityClass) == 0 {
return getDefaultPriority(ctx, client)
}

pc := &schedulingv1.PriorityClass{}
if err := client.Get(ctx, types.NamespacedName{Name: priorityClass}, pc); err != nil {
return 0, err
}

return pc.Value, nil
}

func getDefaultPriority(ctx context.Context, client client.Client) (int32, error) {
dpc, err := getDefaultPriorityClass(ctx, client)
if err != nil {
return 0, err
}
if dpc != nil {
return dpc.Value, nil
}
return int32(constants.DefaultPriority), nil
}

func getDefaultPriorityClass(ctx context.Context, client client.Client) (*schedulingv1.PriorityClass, error) {
pcs := schedulingv1.PriorityClassList{}
err := client.List(ctx, &pcs)
if err != nil {
return nil, err
}

// In case more than one global default priority class is added as a result of a race condition,
// we pick the one with the lowest priority value.
var defaultPC *schedulingv1.PriorityClass
for _, pci := range pcs.Items {
if pci.GlobalDefault {
if defaultPC == nil || defaultPC.Value > pci.Value {
defaultPC = &pci
}
}
}
return defaultPC, nil
}
37 changes: 37 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -78,6 +79,12 @@ func (j *JobWrapper) Parallelism(p int32) *JobWrapper {
return j
}

// PriorityClass updates job priorityclass.
func (j *JobWrapper) PriorityClass(pc string) *JobWrapper {
j.Spec.Template.Spec.PriorityClassName = pc
return j
}

// Queue updates the queue name of the job
func (j *JobWrapper) Queue(queue string) *JobWrapper {
j.Annotations[constants.QueueAnnotation] = queue
Expand All @@ -102,6 +109,31 @@ func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper {
return j
}

// PriorityClassWrapper wraps a PriorityClass.
type PriorityClassWrapper struct {
schedulingv1.PriorityClass
}

// MakePriorityClass creates a wrapper for a PriorityClass.
func MakePriorityClass(name string) *PriorityClassWrapper {
return &PriorityClassWrapper{schedulingv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
Name: name,
}},
}
}

// PriorityValue update value of PriorityClass。
func (p *PriorityClassWrapper) PriorityValue(v int32) *PriorityClassWrapper {
p.Value = v
return p
}

// Obj returns the inner PriorityClass.
func (p *PriorityClassWrapper) Obj() *schedulingv1.PriorityClass {
return &p.PriorityClass
}

type QueuedWorkloadWrapper struct{ kueue.QueuedWorkload }

// MakeQueuedWorkload creates a wrapper for a QueuedWorkload with a single
Expand Down Expand Up @@ -154,6 +186,11 @@ func (w *QueuedWorkloadWrapper) Creation(t time.Time) *QueuedWorkloadWrapper {
return w
}

func (w *QueuedWorkloadWrapper) PriorityClass(priorityClassName string) *QueuedWorkloadWrapper {
w.Spec.PriorityClassName = priorityClassName
return w
}

// AdmissionWrapper wraps an Admission
type AdmissionWrapper struct{ kueue.Admission }

Expand Down
Loading

0 comments on commit edacea0

Please sign in to comment.