Skip to content

Commit

Permalink
Merge pull request #104 from denkensk/add-workload-priority
Browse files Browse the repository at this point in the history
Add PriorityClass in Workload api
  • Loading branch information
k8s-ci-robot authored Mar 17, 2022
2 parents c50f61c + 39e9333 commit 32a2d30
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 11 deletions.
14 changes: 14 additions & 0 deletions api/v1alpha1/queuedworkload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ type QueuedWorkloadSpec struct {

// admission holds the parameters of the admission of the workload by a ClusterQueue.
Admission *Admission `json:"admission,omitempty"`

// If specified, indicates the queuedWorkload's priority.
// "system-node-critical" and "system-cluster-critical" are two special
// keywords which indicate the highest priorities with the former being
// the highest priority. Any other name must be defined by creating a
// PriorityClass object with that name. If not specified, the queuedWorkload
// priority will be default or zero if there is no default.
PriorityClassName string `json:"priorityClassName,omitempty"`

// Priority determines the order of access to the resources managed by the
// ClusterQueue where the workload is queued.
// The priority value is populated from PriorityClassName.
// The higher the value, the higher the priority.
Priority *int32 `json:"priority,omitempty"`
}

type Admission struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/crd/bases/kueue.x-k8s.io_queuedworkloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7300,6 +7300,21 @@ spec:
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
priority:
description: Priority determines the order of access to the resources
managed by the ClusterQueue where the workload is queued. The priority
value is populated from PriorityClassName. The higher the value,
the higher the priority.
format: int32
type: integer
priorityClassName:
description: If specified, indicates the queuedWorkload's priority.
"system-node-critical" and "system-cluster-critical" are two special
keywords which indicate the highest priorities with the former being
the highest priority. Any other name must be defined by creating
a PriorityClass object with that name. If not specified, the queuedWorkload
priority will be default or zero if there is no default.
type: string
queueName:
description: queueName is the name of the queue the QueuedWorkload
is associated with.
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ const (
// UpdatesBatchPeriod is the batch period to hold queuedworkload updates
// before syncing a Queue and CLusterQueue onbjects.
UpdatesBatchPeriod = 3 * time.Second

// DefaultPriority is used to set priority of workloads
// that do not specify any priority class and there is no priority class
// marked as default.
DefaultPriority = 0
)
16 changes: 14 additions & 2 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -294,7 +295,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv
}

// Create the corresponding workload.
wl, err := ConstructWorkloadFor(job, r.scheme)
wl, err := ConstructWorkloadFor(ctx, r.client, job, r.scheme)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +371,8 @@ func (r *JobReconciler) ensureAtMostOneWorkload(ctx context.Context, job *batchv
return match, nil
}

func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
func ConstructWorkloadFor(ctx context.Context, client client.Client,
job *batchv1.Job, scheme *runtime.Scheme) (*kueue.QueuedWorkload, error) {
w := &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: job.Name,
Expand All @@ -386,6 +388,16 @@ func ConstructWorkloadFor(job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Queu
QueueName: queueName(job),
},
}

// Populate priority from priority class.
priorityClassName, p, err := utilpriority.GetPriorityFromPriorityClass(
ctx, client, job.Spec.Template.Spec.PriorityClassName)
if err != nil {
return nil, err
}
w.Spec.Priority = &p
w.Spec.PriorityClassName = priorityClassName

if err := ctrl.SetControllerReference(job, w, scheme); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -140,6 +141,12 @@ func (cq *ClusterQueue) Pop() *workload.Info {
}

func creationFIFO(a, b workload.Info) bool {
p1 := utilpriority.Priority(a.Obj)
p2 := utilpriority.Priority(b.Obj)

if p1 != p2 {
return p1 > p2
}
return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp)
}

Expand Down
102 changes: 102 additions & 0 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
)

const (
lowPriority = 0
highPriority = 1000
)

func TestFIFOClusterQueue(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
Expand Down Expand Up @@ -83,3 +90,98 @@ func TestFIFOClusterQueue(t *testing.T) {
t.Errorf("Queue is not empty, poped workload %q", got.Obj.Name)
}
}

func TestStrictFIFO(t *testing.T) {
t1 := time.Now()
t2 := t1.Add(time.Second)
for _, tt := range []struct {
name string
w1 *kueue.QueuedWorkload
w2 *kueue.QueuedWorkload
expected string
}{
{
name: "w1.priority is higher than w2.priority",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: pointer.Int32(highPriority),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "lowPriority",
Priority: pointer.Int32(lowPriority),
},
},
expected: "w1",
},
{
name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
},
expected: "w1",
},
{
name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "lowPriority",
Priority: pointer.Int32(lowPriority),
},
},
w2: &kueue.QueuedWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
Spec: kueue.QueuedWorkloadSpec{
PriorityClassName: "highPriority",
Priority: pointer.Int32(highPriority),
},
},
expected: "w2",
},
} {
t.Run(tt.name, func(t *testing.T) {
q := newClusterQueue(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})

q.PushOrUpdate(tt.w1)
q.PushOrUpdate(tt.w2)

got := q.Pop()
if got == nil {
t.Fatal("Queue is empty")
}
if got.Obj.Name != tt.expected {
t.Errorf("Poped workload %q want %q", got.Obj.Name, tt.expected)
}
})
}
}
87 changes: 87 additions & 0 deletions pkg/util/priority/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package priority

import (
"context"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/api/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
)

// Priority returns priority of the given workload.
func Priority(w *kueue.QueuedWorkload) int32 {
if w.Spec.Priority != nil {
return *w.Spec.Priority
}
// When priority of a running workload is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return constants.DefaultPriority
}

// GetPriorityFromPriorityClass returns the priority populated from
// priority class. If not specified, priority will be default or
// zero if there is no default.
func GetPriorityFromPriorityClass(ctx context.Context, client client.Client,
priorityClass string) (string, int32, error) {
if len(priorityClass) == 0 {
return getDefaultPriority(ctx, client)
}

pc := &schedulingv1.PriorityClass{}
if err := client.Get(ctx, types.NamespacedName{Name: priorityClass}, pc); err != nil {
return "", 0, err
}

return pc.Name, pc.Value, nil
}

func getDefaultPriority(ctx context.Context, client client.Client) (string, int32, error) {
dpc, err := getDefaultPriorityClass(ctx, client)
if err != nil {
return "", 0, err
}
if dpc != nil {
return dpc.Name, dpc.Value, nil
}
return "", int32(constants.DefaultPriority), nil
}

func getDefaultPriorityClass(ctx context.Context, client client.Client) (*schedulingv1.PriorityClass, error) {
pcs := schedulingv1.PriorityClassList{}
err := client.List(ctx, &pcs)
if err != nil {
return nil, err
}

// In case more than one global default priority class is added as a result of a race condition,
// we pick the one with the lowest priority value.
var defaultPC *schedulingv1.PriorityClass
for _, pci := range pcs.Items {
if pci.GlobalDefault {
if defaultPC == nil || defaultPC.Value > pci.Value {
defaultPC = &pci
}
}
}
return defaultPC, nil
}
16 changes: 16 additions & 0 deletions pkg/util/routine/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package routine

// Wrapper is used to wrap a function that will run in a goroutine.
Expand Down
Loading

0 comments on commit 32a2d30

Please sign in to comment.