Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preemption admission check controller #1113

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions apis/kueue/v1beta1/admissioncheck_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ const (
// CheckStateReady means that the check has passed.
// A workload having all its checks ready, and quota reserved can begin execution.
CheckStateReady CheckState = "Ready"

// CheckStatePreemptionRequired means that the check might pass if there was fewer workloads.
// Proceed with any pending workload preemption. Should be set when condition status
// is still "Unknown"
CheckStatePreemptionRequired CheckState = "PreemptionRequired"
)

// AdmissionCheckSpec defines the desired state of AdmissionCheck
Expand All @@ -65,8 +70,28 @@ type AdmissionCheckSpec struct {
// Parameters identifies the resource providing additional check parameters.
// +optional
Parameters *AdmissionCheckParametersReference `json:"parameters,omitempty"`

// preemptionPolicy determines when to issue preemptions for the Workload,
// if necessary, in relationship to the status of the admission check.
// The possible values are:
// - `Anytime`: No need to wait for this check to pass before issuing preemptions.
// Preemptions might be blocked on the preemptionPolicy of other AdmissionChecks.
// - `AfterCheckPassedOrOnDemand`: Wait for this check to pass before issuing preemptions,
// unless this or other checks requests preemptions through the Workload's admissionChecks.
// Defaults to `Anytime`.
// +optional
// +kubebuilder:default=Anytime
// +kubebuilder:validation:Enum=Anytime;AfterCheckPassedOrOnDemand
PreemptionPolicy *AdmissionCheckPreemptionPolicy `json:"preemptionPolicy,omitempty"`
}

type AdmissionCheckPreemptionPolicy string

const (
Anytime AdmissionCheckPreemptionPolicy = "Anytime"
AfterCheckPassedOrOnDemand AdmissionCheckPreemptionPolicy = "AfterCheckPassedOrOnDemand"
)

type AdmissionCheckParametersReference struct {
// ApiGroup is the group for the resource being referenced.
APIGroup string `json:"apiGroup"`
Expand Down
2 changes: 1 addition & 1 deletion apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ type AdmissionCheckState struct {
// status of the condition, one of True, False, Unknown.
// +required
// +kubebuilder:validation:Required
// +kubebuilder:validation:Enum=Pending;Ready;Retry;Rejected
// +kubebuilder:validation:Enum=Pending;Ready;Retry;Rejected;PreemptionRequired
State CheckState `json:"state"`
// lastTransitionTime is the last time the condition transitioned from one status to another.
// This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
Expand Down
5 changes: 5 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_admissionchecks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ spec:
- kind
- name
type: object
preemptionPolicy:
default: Anytime
description: 'preemptionPolicy determines when to issue preemptions
for the Workload, if necessary, in relationship to the status of
the admission check. The possible values are: - `Anytime`: No need
to wait for this check to pass before issuing preemptions. Preemptions
might be blocked on the preemptionPolicy of other AdmissionChecks.
- `AfterCheckPassedOrOnDemand`: Wait for this check to pass before
issuing preemptions, unless this or other checks requests preemptions
through the Workload''s admissionChecks. Defaults to `Anytime`.'
enum:
- Anytime
- AfterCheckPassedOrOnDemand
type: string
retryDelayMinutes:
default: 15
description: RetryDelayMinutes specifies how long to keep the workload
Expand Down
1 change: 1 addition & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8404,6 +8404,7 @@ spec:
- Ready
- Retry
- Rejected
- PreemptionRequired
type: string
required:
- lastTransitionTime
Expand Down
13 changes: 13 additions & 0 deletions client-go/applyconfiguration/kueue/v1beta1/admissioncheckspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_admissionchecks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ spec:
- kind
- name
type: object
preemptionPolicy:
default: Anytime
description: 'preemptionPolicy determines when to issue preemptions
for the Workload, if necessary, in relationship to the status of
the admission check. The possible values are: - `Anytime`: No need
to wait for this check to pass before issuing preemptions. Preemptions
might be blocked on the preemptionPolicy of other AdmissionChecks.
- `AfterCheckPassedOrOnDemand`: Wait for this check to pass before
issuing preemptions, unless this or other checks requests preemptions
through the Workload''s admissionChecks. Defaults to `Anytime`.'
enum:
- Anytime
- AfterCheckPassedOrOnDemand
type: string
retryDelayMinutes:
default: 15
description: RetryDelayMinutes specifies how long to keep the workload
Expand Down
1 change: 1 addition & 0 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8391,6 +8391,7 @@ spec:
- Ready
- Retry
- Rejected
- PreemptionRequired
type: string
required:
- lastTransitionTime
Expand Down
7 changes: 6 additions & 1 deletion pkg/cache/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ limitations under the License.

package cache

import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

type AdmissionCheck struct {
Active bool
Active bool
PreemptionPolicy kueue.AdmissionCheckPreemptionPolicy
}
18 changes: 17 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -222,7 +223,8 @@ func (c *Cache) AddOrUpdateAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[str
c.Lock()
defer c.Unlock()
c.admissionChecks[ac.Name] = AdmissionCheck{
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
PreemptionPolicy: ptr.Deref(ac.Spec.PreemptionPolicy, kueue.Anytime),
trasc marked this conversation as resolved.
Show resolved Hide resolved
}

return c.updateClusterQueues()
Expand Down Expand Up @@ -725,6 +727,20 @@ func (c *Cache) MatchingClusterQueues(nsLabels map[string]string) sets.Set[strin
return cqs
}

// HasWorkloadsPreemptingNow returns true if it has workloads that need preemption now.
func (c *Cache) HasWorkloadsPreemptingNow() bool {
c.RLock()
defer c.RUnlock()
// Depending on the overall state this may end up iterating over all the workloads present
// in the cache (workloads having QuotaReservation).
for _, cq := range c.clusterQueues {
trasc marked this conversation as resolved.
Show resolved Hide resolved
if now, _ := cq.PreemptingWorkloads(c.admissionChecks); len(now) > 0 {
return true
}
}
return false
}

// Key is the key used to index the queue.
func queueKey(q *kueue.LocalQueue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
Expand Down
43 changes: 43 additions & 0 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -433,3 +434,45 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool {
func reportAdmittedActiveWorkloads(cqName string, val int) {
metrics.AdmittedActiveWorkloads.WithLabelValues(cqName).Set(float64(val))
}

// PreemptingWorkloads - returns two list of Workloads
// the first one contains the workloads that are pending preemption and should be evaluated now
// the second one contains the workloads that are pending preemption and should be evaluated later
func (cq *ClusterQueue) PreemptingWorkloads(acMap map[string]AdmissionCheck) ([]*workload.Info, []*workload.Info) {
var preemptNow, preemptLater []*workload.Info
for _, wl := range cq.Workloads {
// if the workload has the preemption check set to unknown
if state := workload.FindAdmissionCheck(wl.Obj.Status.AdmissionChecks, constants.PreemptionAdmissionCheckName); state != nil && state.State == kueue.CheckStatePending {
checkNow := true
skip := false
for i := range wl.Obj.Status.AdmissionChecks {
c := &wl.Obj.Status.AdmissionChecks[i]
if c.Name == constants.PreemptionAdmissionCheckName {
continue
}

ac, found := acMap[c.Name]
if !found {
skip = true
break
}

if ac.PreemptionPolicy == kueue.Anytime {
continue
}
if c.State != kueue.CheckStatePreemptionRequired && c.State != kueue.CheckStateReady {
checkNow = false
break
}
}
if !skip {
if checkNow {
preemptNow = append(preemptNow, wl)
} else {
preemptLater = append(preemptLater, wl)
}
}
}
}
return preemptNow, preemptLater
}
Loading