Skip to content

Commit

Permalink
[AC/preemption] Check the pending workloads in order of their priority
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Oct 12, 2023
1 parent 9a116d3 commit 371072a
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions pkg/controller/admissionchecks/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package preemption

import (
"context"
"sort"
"time"

"github.com/go-logr/logr"
Expand All @@ -34,6 +35,7 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -160,9 +162,32 @@ func (c *Controller) run(ctx context.Context) {

snapshot := c.cache.Snapshot()
workloads := filterWorkloads(&snapshot)

// sort the workloads base on their priority and queue order time
sort.Slice(workloads, func(i, j int) bool {
a := workloads[i]
b := workloads[j]

// 2. Higher priority first.
p1 := priority.Priority(a.Obj)
p2 := priority.Priority(b.Obj)
if p1 != p2 {
return p1 > p2
}

// 2. FIFO.
aComparisonTimestamp := workload.GetQueueOrderTimestamp(a.Obj)
bComparisonTimestamp := workload.GetQueueOrderTimestamp(b.Obj)
return aComparisonTimestamp.Before(bComparisonTimestamp)
})

// remove all of them from the snapshot, those that are still waiting evictions or are now fitting
// are added back in the priority order
for _, wl := range workloads {
// 1. remove the workload from the snapshot
snapshot.RemoveWorkload(wl)
}

for _, wl := range workloads {
usage := totalRequestsForWorkload(wl)
needPreemption := resourcesNeedingPreemption(wl, usage, &snapshot)
log := c.log.WithValues("workload", klog.KObj(wl.Obj))
Expand All @@ -176,6 +201,8 @@ func (c *Controller) run(ctx context.Context) {
} else {
log.V(2).Info("Preemption ended")
}
// add it back to the Snapshot
snapshot.AddWorkload(wl)
} else {
// Additional resources need to be freed.
targets := c.preemptor.GetTargetsForResources(wl, needPreemption, usage, &snapshot)
Expand All @@ -185,6 +212,8 @@ func (c *Controller) run(ctx context.Context) {
if err := c.updateCheckStatus(ctx, c.client, wl.Obj, false); err != nil {
log.V(2).Error(err, "Unable to update the check state to False")
c.trigger("retryUpdate", klog.KObj(wl.Obj))
// add it back to the Snapshot, since its transition is not final.
snapshot.AddWorkload(wl)
} else {
log.V(2).Info("Preemption is no longer possible")
}
Expand All @@ -197,10 +226,10 @@ func (c *Controller) run(ctx context.Context) {
} else {
log.V(2).Info("Preemption triggered", "count", count)
}
// add it back to the Snapshot
snapshot.AddWorkload(wl)
}
}
// 3. add it back to the Snapshot
snapshot.AddWorkload(wl)
}
}

Expand Down

0 comments on commit 371072a

Please sign in to comment.