Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PriorityClass in Workload api #104

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/v1alpha1/queuedworkload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ type QueuedWorkloadSpec struct {

// admission holds the parameters of the admission of the workload by a ClusterQueue.
Admission *Admission `json:"admission,omitempty"`

// If specified, indicates the queuedWorkload's priority.
// "system-node-critical" and "system-cluster-critical" are two special
// keywords which indicate the highest priorities with the former being
// the highest priority. Any other name must be defined by creating a
// PriorityClass object with that name. If not specified, the queuedWorkload
// priority will be default or zero if there is no default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this default defined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the constant

DefaultPriority = 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it wouldn't change by external factors, you should just say that the default priority is zero.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I'm referring to If not specified, the queuedWorkload priority will be default

Is this referring to the default priority class defined for the cluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's the default priority class defined for the cluster. But if there is no default. It will be 0.
Keep the comments as
https://github.com/kubernetes/kubernetes/blob/ca2cd3b18ef145c34311ba7fd9d389fe8233fae8/pkg/apis/core/types.go#L2902

PriorityClassName string `json:"priorityClassName,omitempty"`

// Priority determines the order of access to the resources managed by the
// ClusterQueue where the workload is queued.
// The priority value is populated from PriorityClassName.
// The higher the value, the higher the priority.
Priority *int32 `json:"priority,omitempty"`
}

type Admission struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7300,6 +7300,21 @@ spec:
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
priority:
description: Priority determines the order of access to the resources
managed by the ClusterQueue where the workload is queued. The priority
value is populated from PriorityClassName. The higher the value,
the higher the priority.
format: int32
type: integer
priorityClassName:
description: If specified, indicates the queuedWorkload's priority.
"system-node-critical" and "system-cluster-critical" are two special
keywords which indicate the highest priorities with the former being
the highest priority. Any other name must be defined by creating
a PriorityClass object with that name. If not specified, the queuedWorkload
priority will be default or zero if there is no default.
type: string
queueName:
description: queueName is the name of the queue the QueuedWorkload
is associated with.
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ const (
// UpdatesBatchPeriod is the batch period to hold queuedworkload updates
// before syncing a Queue and CLusterQueue onbjects.
UpdatesBatchPeriod = 3 * time.Second

// DefaultPriority is used to set priority of workloads
// that do not specify any priority class and there is no priority class
// marked as default.
DefaultPriority = 0
)
16 changes: 14 additions & 2 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the alias. Use just priority

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utilpriority is auto add by auto import. I suggest keeping as utilpriority.

"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -294,7 +295,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv
}

// Create the corresponding workload.
wl, err := ConstructWorkloadFor(job, r.scheme)
wl, err := ConstructWorkloadFor(ctx, r.client, job, r.scheme)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +371,8 @@ func (r *JobReconciler) ensureAtMostOneWorkload(ctx context.Context, job *batchv
return match, nil
}

func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
func ConstructWorkloadFor(ctx context.Context, client client.Client,
job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
w := &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: job.Name,
Expand All @@ -386,6 +388,16 @@ func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Queu
QueueName: queueName(job),
},
}

// Populate priority from priority class.
priorityClassName, p, err := utilpriority.GetPriorityFromPriorityClass(
ctx, client, job.Spec.Template.Spec.PriorityClassName)
if err != nil {
return nil, err
}
w.Spec.Priority = &p
w.Spec.PriorityClassName = priorityClassName

if err := ctrl.SetControllerReference(job, w, scheme); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -140,6 +141,12 @@ func (cq *ClusterQueue) Pop() *workload.Info {
}

func creationFIFO(a, b workload.Info) bool {
p1 := utilpriority.Priority(a.Obj)
p2 := utilpriority.Priority(b.Obj)

if p1 != p2 {
return p1 > p2
}
return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp)
}

Expand Down
102 changes: 102 additions & 0 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
)

const (
lowPriority = 0
highPriority = 1000
)

func TestFIFOClusterQueue(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
Expand Down Expand Up @@ -83,3 +90,98 @@ func TestFIFOClusterQueue(t *testing.T) {
t.Errorf("Queue is not empty, poped workload %q", got.Obj.Name)
}
}

func TestStrictFIFO(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a follow up, we should find a way to merge the 2 tests #72

t1 := time.Now()
t2 := t1.Add(time.Second)
for _, tt := range []struct {
name string
w1 *kueue.QueuedWorkload
w2 *kueue.QueuedWorkload
expected string
}{
{
name: "w1.priority is higher than w2.priority",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: pointer.Int32(highPriority),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "lowPriority",
Priority: pointer.Int32(lowPriority),
},
},
expected: "w1",
},
{
name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
},
expected: "w1",
},
{
name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "lowPriority",
Priority: pointer.Int32(lowPriority),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: pointer.Int32(highPriority),
},
},
expected: "w2",
},
} {
t.Run(tt.name, func(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})

q.PushOrUpdate(tt.w1)
q.PushOrUpdate(tt.w2)

got := q.Pop()
if got == nil {
t.Fatal("Queue is empty")
}
if got.Obj.Name != tt.expected {
t.Errorf("Poped workload %q want %q", got.Obj.Name, tt.expected)
}
})
}
}
87 changes: 87 additions & 0 deletions pkg/util/priority/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package priority

import (
"context"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
)

// Priority returns priority of the given workload.
func Priority(w *kueue.QueuedWorkload) int32 {
if w.Spec.Priority != nil {
return *w.Spec.Priority
}
// When priority of a running workload is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return constants.DefaultPriority
}

// GetPriorityFromPriorityClass returns the priority populated from
// priority class. If not specified, priority will be default or
// zero if there is no default.
func GetPriorityFromPriorityClass(ctx context.Context, client client.Client,
priorityClass string) (string, int32, error) {
if len(priorityClass) == 0 {
return getDefaultPriority(ctx, client)
}

pc := &schedulingv1.PriorityClass{}
if err := client.Get(ctx, types.NamespacedName{Name: priorityClass}, pc); err != nil {
return "", 0, err
}

return pc.Name, pc.Value, nil
}

func getDefaultPriority(ctx context.Context, client client.Client) (string, int32, error) {
dpc, err := getDefaultPriorityClass(ctx, client)
if err != nil {
return "", 0, err
}
if dpc != nil {
return dpc.Name, dpc.Value, nil
}
return "", int32(constants.DefaultPriority), nil
}

func getDefaultPriorityClass(ctx context.Context, client client.Client) (*schedulingv1.PriorityClass, error) {
pcs := schedulingv1.PriorityClassList{}
err := client.List(ctx, &pcs)
if err != nil {
return nil, err
}

// In case more than one global default priority class is added as a result of a race condition,
// we pick the one with the lowest priority value.
var defaultPC *schedulingv1.PriorityClass
for _, pci := range pcs.Items {
if pci.GlobalDefault {
if defaultPC == nil || defaultPC.Value > pci.Value {
defaultPC = &pci
}
}
}
return defaultPC, nil
}
16 changes: 16 additions & 0 deletions pkg/util/routine/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I wonder if we can make a verify step for this.

Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package routine

// Wrapper is used to wrap a function that will run in a goroutine.
Expand Down
Loading