Skip to content

Commit

Permalink
Avoid queueing workloads that don't match CQ namespaceSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
ahg-g committed Aug 10, 2022
1 parent 5402db6 commit 687e6d7
Show file tree
Hide file tree
Showing 17 changed files with 597 additions and 352 deletions.
4 changes: 2 additions & 2 deletions apis/config/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the config v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=config.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=config.x-k8s.io
package v1alpha1

import (
Expand Down
4 changes: 2 additions & 2 deletions apis/kueue/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the kueue v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=kueue.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=kueue.x-k8s.io
package v1alpha1

import (
Expand Down
127 changes: 127 additions & 0 deletions apis/kueue/v1alpha1/workload_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
apivalidation "k8s.io/apimachinery/pkg/api/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

// log is for logging in this package.
var workloadlog = ctrl.Log.WithName("workload-webhook")

func (r *Workload) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}

// +kubebuilder:webhook:path=/mutate-kueue-x-k8s-io-v1alpha1-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads,verbs=create;update,versions=v1alpha1,name=mworkload.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &Workload{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *Workload) Default() {
workloadlog.V(5).Info("defaulter", "workload", klog.KObj(r))

for i := range r.Spec.PodSets {
podSet := &r.Spec.PodSets[i]
if len(podSet.Name) == 0 {
podSet.Name = DefaultPodSetName
}
}
}

// +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1alpha1-workload,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads,verbs=create;update,versions=v1alpha1,name=vworkload.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &Workload{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Workload) ValidateCreate() error {
return ValidateWorkload(r).ToAggregate()
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Workload) ValidateUpdate(old runtime.Object) error {
return ValidateWorkloadUpdate(r, old.(*Workload)).ToAggregate()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *Workload) ValidateDelete() error {
return nil
}

func ValidateWorkload(obj *Workload) field.ErrorList {
var allErrs field.ErrorList
specField := field.NewPath("spec")
podSetsField := specField.Child("podSets")
if len(obj.Spec.PodSets) == 0 {
allErrs = append(allErrs, field.Required(podSetsField, "at least one podSet is required"))
}

for i, podSet := range obj.Spec.PodSets {
if podSet.Count <= 0 {
allErrs = append(allErrs, field.Invalid(
podSetsField.Index(i).Child("count"),
podSet.Count,
"count must be greater than 0"),
)
}
}

if len(obj.Spec.PriorityClassName) > 0 {
msgs := validation.IsDNS1123Subdomain(obj.Spec.PriorityClassName)
if len(msgs) > 0 {
classNameField := specField.Child("priorityClassName")
for _, msg := range msgs {
allErrs = append(allErrs, field.Invalid(classNameField, obj.Spec.PriorityClassName, msg))
}
}
}
return allErrs
}

func ValidateWorkloadUpdate(newObj, oldObj *Workload) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateWorkload(newObj)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, field.NewPath("spec", "podSets"))...)
allErrs = append(allErrs, validateQueueNameUpdate(newObj.Spec.QueueName, oldObj.Spec.QueueName)...)
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Spec.Admission, oldObj.Spec.Admission)...)

return allErrs
}

// validateQueueNameUpdate validates that queueName is set once
func validateQueueNameUpdate(new, old string) field.ErrorList {
if len(old) == 0 {
return field.ErrorList{}
}
return apivalidation.ValidateImmutableField(new, old, field.NewPath("spec", "queueName"))
}

// validateAdmissionUpdate validates that admission is set once
func validateAdmissionUpdate(new, old *Admission) field.ErrorList {
if old == nil {
return field.ErrorList{}
}
return apivalidation.ValidateImmutableField(new, old, field.NewPath("spec", "admission"))
}
2 changes: 1 addition & 1 deletion apis/kueue/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
return allErrs
}

/// validateQueueNameUpdate validates that queueName is set once
// / validateQueueNameUpdate validates that queueName is set once
func validateQueueNameUpdate(new, old string, path *field.Path) field.ErrorList {
if len(old) == 0 {
return nil
Expand Down
40 changes: 40 additions & 0 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ metadata:
creationTimestamp: null
name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-kueue-x-k8s-io-v1alpha1-workload
failurePolicy: Fail
name: mworkload.kb.io
rules:
- apiGroups:
- kueue.x-k8s.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- workloads
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down Expand Up @@ -70,6 +90,26 @@ metadata:
creationTimestamp: null
name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-kueue-x-k8s-io-v1alpha1-workload
failurePolicy: Fail
name: vworkload.kb.io
rules:
- apiGroups:
- kueue.x-k8s.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- workloads
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
Expand Down
92 changes: 5 additions & 87 deletions pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package queue

import (
"k8s.io/apimachinery/pkg/api/equality"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand All @@ -27,9 +25,6 @@ import (
// BestEffortFIFO.
type ClusterQueueBestEffortFIFO struct {
*ClusterQueueImpl

// inadmissibleWorkloads are workloads that have been tried at least once and couldn't be admitted.
inadmissibleWorkloads map[string]*workload.Info
}

var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}
Expand All @@ -39,90 +34,13 @@ const BestEffortFIFO = kueue.BestEffortFIFO
func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqBE := &ClusterQueueBestEffortFIFO{
ClusterQueueImpl: cqImpl,
inadmissibleWorkloads: make(map[string]*workload.Info),
}

cqBE.Update(cq)
return cqBE, nil
}

func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(wInfo *workload.Info) {
key := workload.Key(wInfo.Obj)
oldInfo := cq.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
cq.inadmissibleWorkloads[key] = wInfo
return
}
// otherwise move or update in place in the queue.
delete(cq.inadmissibleWorkloads, key)
}

cq.ClusterQueueImpl.PushOrUpdate(wInfo)
}

func (cq *ClusterQueueBestEffortFIFO) Delete(w *kueue.Workload) {
delete(cq.inadmissibleWorkloads, workload.Key(w))
cq.ClusterQueueImpl.Delete(w)
}

func (cq *ClusterQueueBestEffortFIFO) DeleteFromQueue(q *Queue) {
for _, w := range q.items {
key := workload.Key(w.Obj)
if wl := cq.inadmissibleWorkloads[key]; wl != nil {
delete(cq.inadmissibleWorkloads, key)
}
}
cq.ClusterQueueImpl.DeleteFromQueue(q)
}

// RequeueIfNotPresent inserts a workload that cannot be admitted into
// ClusterQueue, unless it is already in the queue. If immediate is true,
// the workload will be pushed back to heap directly. If not,
// the workload will be put into the inadmissibleWorkloads.
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, immediate bool) bool {
key := workload.Key(wInfo.Obj)
if immediate {
// If the workload was inadmissible, move it back into the queue.
inadmissibleWl := cq.inadmissibleWorkloads[key]
if inadmissibleWl != nil {
wInfo = inadmissibleWl
delete(cq.inadmissibleWorkloads, key)
}
return cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
}

if cq.inadmissibleWorkloads[key] != nil {
return false
}

if data := cq.heap.GetByKey(key); data != nil {
return false
}

cq.inadmissibleWorkloads[key] = wInfo

return true
}

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true. Otherwise returns false.
func (cq *ClusterQueueBestEffortFIFO) QueueInadmissibleWorkloads() bool {
if len(cq.inadmissibleWorkloads) == 0 {
return false
}

for _, wInfo := range cq.inadmissibleWorkloads {
cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
ClusterQueueImpl: cqImpl,
}

cq.inadmissibleWorkloads = make(map[string]*workload.Info)
return true
err := cqBE.Update(cq)
return cqBE, err
}

func (cq *ClusterQueueBestEffortFIFO) Pending() int32 {
return cq.ClusterQueueImpl.Pending() + int32(len(cq.inadmissibleWorkloads))
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
return cq.ClusterQueueImpl.RequeueIfNotPresent(wInfo, reason == RequeueReasonFailedAfterNomination)
}
Loading

0 comments on commit 687e6d7

Please sign in to comment.