Skip to content

Commit

Permalink
Review Remarks.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Oct 4, 2023
1 parent 68c14f4 commit ce7e628
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 22 deletions.
6 changes: 3 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,14 @@ func (c *Cache) MatchingClusterQueues(nsLabels map[string]string) sets.Set[strin
return cqs
}

// ShouldCheckWorkloadsPreemption returns true if it has workloads pending preemption now.
func (c *Cache) ShouldCheckWorkloadsPreemption() bool {
// HasWorkloadsPreemptingNow returns true if it has workloads that need preemption now.
func (c *Cache) HasWorkloadsPreemptingNow() bool {
c.RLock()
defer c.RUnlock()
// Depending on the overall state this may end up iterating over all the workloads present
// in the cache (workloads having QuotaReservation).
for _, cq := range c.clusterQueues {
if now, _ := cq.GetPreemptingWorkloads(c.admissionChecks); len(now) > 0 {
if now, _ := cq.PreemptingWorkloads(c.admissionChecks); len(now) > 0 {
return true
}
}
Expand Down
26 changes: 18 additions & 8 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,32 +410,42 @@ func reportAdmittedActiveWorkloads(cqName string, val int) {
metrics.AdmittedActiveWorkloads.WithLabelValues(cqName).Set(float64(val))
}

// GetPreemptingWorkloads - returns two list of Workloads
// PreemptingWorkloads - returns two list of Workloads
// the first one contains the workloads that are pending preemption and should be evaluated now
// the second one contains the workloads that are pending preemption and should be evaluated later
func (cq *ClusterQueue) GetPreemptingWorkloads(acMap map[string]AdmissionCheck) ([]*workload.Info, []*workload.Info) {
preemptNow := []*workload.Info{}
preemptLater := []*workload.Info{}
func (cq *ClusterQueue) PreemptingWorkloads(acMap map[string]AdmissionCheck) ([]*workload.Info, []*workload.Info) {
var preemptNow, preemptLater []*workload.Info
for _, wl := range cq.Workloads {
// if the workload has the preemption check set to unknown
if state := workload.FindAdmissionCheck(wl.Obj.Status.AdmissionChecks, constants.PreemptionAdmissionCheckName); state != nil && state.State == kueue.CheckStatePending {
checkNow := true
skip := false
for i := range wl.Obj.Status.AdmissionChecks {
c := &wl.Obj.Status.AdmissionChecks[i]
if c.Name == constants.PreemptionAdmissionCheckName {
continue
}
if acMap[c.Name].PreemptionPolicy == kueue.Anytime {

ac, found := acMap[c.Name]
if !found {
skip = true
break
}

if ac.PreemptionPolicy == kueue.Anytime {
continue
}
if c.State != kueue.CheckStatePreemptionRequired && c.State != kueue.CheckStateReady {
checkNow = false
preemptLater = append(preemptLater, wl)
break
}
}
if checkNow {
preemptNow = append(preemptNow, wl)
if !skip {
if checkNow {
preemptNow = append(preemptNow, wl)
} else {
preemptLater = append(preemptLater, wl)
}
}
}
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/slices"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -400,20 +401,14 @@ func TestGetPreemptingWorklods(t *testing.T) {
Workloads: wlMap,
}

gotPrermptNow, gotPreemptLater := cq.GetPreemptingWorkloads(tc.checks)
gotPrermptNow, gotPreemptLater := cq.PreemptingWorkloads(tc.checks)

wantPreemptNowWorkloads := make([]*workload.Info, len(tc.wantPreemptNow))
for i, wlKey := range tc.wantPreemptNow {
wantPreemptNowWorkloads[i] = wlMap[wlKey]
}
wantPreemptNowWorkloads := slices.Map(tc.wantPreemptNow, func(s *string) *workload.Info { return wlMap[*s] })
if diff := cmp.Diff(wantPreemptNowWorkloads, gotPrermptNow, sortOpt); diff != "" {
t.Errorf("Unexpected preempt now (-want/+got):\n%s", diff)
}

wantPreemptLaterWorkloads := make([]*workload.Info, len(tc.wantPreemptLater))
for i, wlKey := range tc.wantPreemptLater {
wantPreemptLaterWorkloads[i] = wlMap[wlKey]
}
wantPreemptLaterWorkloads := slices.Map(tc.wantPreemptLater, func(s *string) *workload.Info { return wlMap[*s] })
if diff := cmp.Diff(wantPreemptLaterWorkloads, gotPreemptLater, sortOpt); diff != "" {
t.Errorf("Unexpected preempt later (-want/+got):\n%s", diff)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/admissionchecks/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *Controller) run(ctx context.Context) {
c.log.V(2).Info("Start run")

// If there is nothing to do at this point.
if !c.cache.ShouldCheckWorkloadsPreemption() {
if !c.cache.HasWorkloadsPreemptingNow() {
// skip the snapshot creation and exit
return
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func filterWorkloads(snapshot *cache.Snapshot) []*workload.Info {
preemptNow := []*workload.Info{}
preemptLater := []*workload.Info{}
for _, cq := range snapshot.ClusterQueues {
now, later := cq.GetPreemptingWorkloads(snapshot.AdmissionChecks)
now, later := cq.PreemptingWorkloads(snapshot.AdmissionChecks)
preemptNow = append(preemptNow, now...)
preemptLater = append(preemptLater, later...)
}
Expand Down

0 comments on commit ce7e628

Please sign in to comment.