Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Sep 12, 2023
1 parent d8d8ec0 commit 1465d40
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ const (
// that do not specify any priority class and there is no priority class
// marked as default.
DefaultPriority = 0

PreemptionAdmissionCheckName = "kueue-preemption"
)
123 changes: 123 additions & 0 deletions pkg/controller/admissionchecks/preemption/preemption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package preemption

import (
"context"
"time"

"github.com/go-logr/logr"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/workload"
)

const (
controllerName = "PreemptionController"
throttleTimeout = 500 * time.Millisecond
)

type Controller struct {
log logr.Logger
cache *cache.Cache
client client.Client
recorder record.EventRecorder
// preemptor

triggerChan chan struct{}
}

func NewContriller(cache *cache.Cache) *Controller {
return &Controller{
log: ctrl.Log.WithName(controllerName),
cache: cache,
triggerChan: make(chan struct{}),
}
}

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
c.client = mgr.GetClient()
c.recorder = mgr.GetEventRecorderFor(controllerName)
if err := mgr.Add(c); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
//TODO: filter the events
For(&kueue.Workload{}).
Complete(c)
}

var _ manager.Runnable = (*Controller)(nil)
var _ reconcile.Reconciler = (*Controller)(nil)

func (c *Controller) Start(ctx context.Context) error {
c.log.V(5).Info("Staring main loop")
ticker := time.NewTicker(throttleTimeout)
trigger := false
timeout := false
for {
select {
case <-c.triggerChan:
trigger = true
case <-ticker.C:
timeout = true
case <-ctx.Done():
c.log.V(5).Info("End main loop")
return nil
}

if trigger && timeout {
c.run()
trigger = false
timeout = false
}
}
}

func (c *Controller) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) {
select {
case c.triggerChan <- struct{}{}:
c.log.V(6).Info("Triggered")
default:
}
return ctrl.Result{}, nil
}

func (c *Controller) run() {
snapsot := c.cache.Snapshot()
workloads := filterWorkloads(&snapsot)
for _, wl := range workloads {
//1. remove it from Snapshot
snapsot.RemoveWorkload(wl)
//2. check if preemption is needed

// 2.1 - issue eviction

// 2.2 - set the check to true

// 3 add it back to the Snapshot
snapsot.AddWorkload(wl)
}
}

func filterWorkloads(snapsot *cache.Snapshot) []*workload.Info {
ret := []*workload.Info{}

for _, cq := range snapsot.ClusterQueues {
for _, wl := range cq.Workloads {
// if the workload has the preemption check set to unknown
if apimeta.IsStatusConditionPresentAndEqual(wl.Obj.Status.AdmissionChecks, constants.PreemptionAdmissionCheckName, metav1.ConditionUnknown) {
ret = append(ret, wl)
//TODO: check if it should be taken into account based on admission check policy
}
}
}
return ret
}

0 comments on commit 1465d40

Please sign in to comment.