Skip to content

Commit

Permalink
[admissionChecks] Add preemption controller
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Sep 14, 2023
1 parent fcf0186 commit c2d3d12
Show file tree
Hide file tree
Showing 4 changed files with 657 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ const (
// that do not specify any priority class and there is no priority class
// marked as default.
DefaultPriority = 0

PreemptionAdmissionCheckName = "kueue-preemption"
)
263 changes: 263 additions & 0 deletions pkg/controller/admissionchecks/preemption/preemption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package preemption

import (
"context"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/workload"
)

const (
controllerName = "PreemptionController"
throttleTimeout = 500 * time.Millisecond
)

type updateStatusFnc func(context.Context, client.Client, *kueue.Workload, bool) error

type Controller struct {
log logr.Logger
cache *cache.Cache
client client.Client
recorder record.EventRecorder
preemptor *preemption.Preemptor
ctx context.Context
triggerChan chan struct{}

// stub, only for test
updateFnc updateStatusFnc
}

func NewController(cache *cache.Cache) *Controller {
return &Controller{
log: ctrl.Log.WithName(controllerName),
cache: cache,
triggerChan: make(chan struct{}),
updateFnc: updateCheckStatus,
}
}

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
c.client = mgr.GetClient()
c.recorder = mgr.GetEventRecorderFor(controllerName)
c.preemptor = preemption.New(c.client, c.recorder)
if err := mgr.Add(c); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
//TODO: filter the events
For(&kueue.Workload{}).
Complete(c)
}

var _ manager.Runnable = (*Controller)(nil)
var _ reconcile.Reconciler = (*Controller)(nil)

func (c *Controller) Start(ctx context.Context) error {
c.log.V(5).Info("Staring main loop")
c.ctx = ctx
ticker := time.NewTicker(throttleTimeout)
trigger := false
timeout := false
for {
select {
case <-c.triggerChan:
trigger = true
case <-ticker.C:
timeout = true
case <-ctx.Done():
c.log.V(5).Info("End main loop")
return nil
}

if trigger && timeout {
c.run()
trigger = false
timeout = false
}
}
}

func (c *Controller) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) {
select {
case c.triggerChan <- struct{}{}:
c.log.V(6).Info("Triggered")
default:
}
return ctrl.Result{}, nil
}

func (c *Controller) run() {
snapshot := c.cache.Snapshot()
workloads := filterWorkloads(&snapshot)
for _, wl := range workloads {
//1. remove it from Snapshot
snapshot.RemoveWorkload(wl)
//2. check if preemption is needed
usage := totalRequestsForWorkload(wl)
needPreemption := resourcesNeedingPreemption(wl, usage, &snapshot)
log := c.log.WithValues("workload", klog.KObj(wl.Obj))

if len(needPreemption) == 0 {
// 2.1 - set the check to true
// the preemption is done , flip the condition
if err := c.updateFnc(c.ctx, c.client, wl.Obj, true); err != nil {
log.V(2).Error(err, "Unable to update the check state to True")
} else {
log.V(2).Info("Preemption ended")
}
} else {
// 2.2 - issue eviction
targets := c.preemptor.GetTargetsForResources(wl, needPreemption, usage, &snapshot)
if len(targets) == 0 {
//2.2.1 - preemption is no longer an option, flip the condition to false
if err := c.updateFnc(c.ctx, c.client, wl.Obj, false); err != nil {
log.V(2).Error(err, "Unable to update the check state to False")
} else {
log.V(2).Info("Preemption is no longer possible")
}
} else {
count, err := c.preemptor.IssuePreemptions(c.ctx, targets, snapshot.ClusterQueues[wl.ClusterQueue])
if err != nil {
log.V(5).Error(err, "Unable to issue preemption")
} else {
log.V(5).Info("Preemption triggered", "count", count)
}
}
}
// 3 add it back to the Snapshot
snapshot.AddWorkload(wl)
}
}

func filterWorkloads(snapsot *cache.Snapshot) []*workload.Info {
ret := []*workload.Info{}
for _, cq := range snapsot.ClusterQueues {
for _, wl := range cq.Workloads {
// if the workload has the preemption check set to unknown
if apimeta.IsStatusConditionPresentAndEqual(wl.Obj.Status.AdmissionChecks, constants.PreemptionAdmissionCheckName, metav1.ConditionUnknown) {
checkNow := true
for i := range wl.Obj.Status.AdmissionChecks {
c := &wl.Obj.Status.AdmissionChecks[i]
if c.Type == constants.PreemptionAdmissionCheckName {
continue
}
if snapsot.AdmissionChecks[c.Type] != kueue.AfterCheckPassedOrOnDemand {
continue
}
if c.Reason != kueue.CheckStateReady && c.Reason != kueue.CheckStatePreemptionRequired {
checkNow = false
break
}
}
if checkNow {
ret = append(ret, wl)
}
}
}
}
return ret
}

func totalRequestsForWorkload(wl *workload.Info) cache.FlavorResourceQuantities {
usage := make(cache.FlavorResourceQuantities)
for _, ps := range wl.TotalRequests {
for res, q := range ps.Requests {
flv := ps.Flavors[res]
resUsage := usage[flv]
if resUsage == nil {
resUsage = make(map[corev1.ResourceName]int64)
usage[flv] = resUsage
}
resUsage[res] += q
}
}
return usage
}

func resourcesNeedingPreemption(wl *workload.Info, usage cache.FlavorResourceQuantities, snap *cache.Snapshot) preemption.ResourcesPerFlavor {
ret := make(preemption.ResourcesPerFlavor)

cq := snap.ClusterQueues[wl.ClusterQueue]
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
flvReq, found := usage[flvQuotas.Name]
if !found {
// Workload doesn't request this flavor.
continue
}
cqResUsage := cq.Usage[flvQuotas.Name]
var cohortResUsage, cohortResRequestable map[corev1.ResourceName]int64
if cq.Cohort != nil {
cohortResUsage = cq.Cohort.Usage[flvQuotas.Name]
cohortResRequestable = cq.Cohort.RequestableResources[flvQuotas.Name]
}
for rName, rReq := range flvReq {
limit := flvQuotas.Resources[rName].Nominal
if flvQuotas.Resources[rName].BorrowingLimit != nil {
limit += *flvQuotas.Resources[rName].BorrowingLimit
}
exceedsNominal := cqResUsage[rName]+rReq > limit
exceedsBorrowing := cq.Cohort != nil && cohortResUsage[rName]+rReq > cohortResRequestable[rName]
if exceedsNominal || exceedsBorrowing {
if _, found := ret[flvQuotas.Name]; !found {
ret[flvQuotas.Name] = sets.New(rName)
} else {
ret[flvQuotas.Name].Insert(rName)
}
}
}
}
}
return ret
}

func updateCheckStatus(ctx context.Context, c client.Client, wl *kueue.Workload, successful bool) error {
cond := metav1.Condition{
Type: constants.PreemptionAdmissionCheckName,
Status: metav1.ConditionTrue,
Reason: kueue.CheckStateReady, // the reason is not relevant, add this to keep it aligned wit the other checks
Message: "Preemption done",
}

if !successful {
cond.Status = metav1.ConditionFalse
cond.Reason = kueue.CheckStateRetry
cond.Message = "Preemption is not possible"
}

patch := workload.BaseSSAWorkload(wl)
apimeta.SetStatusCondition(&patch.Status.AdmissionChecks, cond)
return c.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(controllerName), client.ForceOwnership)
}
Loading

0 comments on commit c2d3d12

Please sign in to comment.