From 58990de4d1c4ecae5f3851d1d112d56b74bb404e Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Fri, 12 Jul 2024 15:20:31 +0000 Subject: [PATCH 1/2] Fix preemption test --- pkg/scheduler/preemption/preemption_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 187e929112..a74dd51e9e 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -636,7 +636,7 @@ func TestPreemption(t *testing.T) { Mode: flavorassigner.Preempt, }, corev1.ResourceMemory: &flavorassigner.FlavorAssignment{ - Name: "alpha", + Name: "default", Mode: flavorassigner.Preempt, }, }), From 3541f1481114068e765c757695d8caabf3deab12 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Wed, 17 Jul 2024 12:09:02 +0000 Subject: [PATCH 2/2] Refactor preemption.go --- pkg/cache/clusterqueue_snapshot.go | 36 ++++++++++++ pkg/scheduler/preemption/preemption.go | 79 ++++++-------------------- 2 files changed, 52 insertions(+), 63 deletions(-) diff --git a/pkg/cache/clusterqueue_snapshot.go b/pkg/cache/clusterqueue_snapshot.go index fe91cf9622..a813e1f73e 100644 --- a/pkg/cache/clusterqueue_snapshot.go +++ b/pkg/cache/clusterqueue_snapshot.go @@ -52,6 +52,42 @@ func (c *ClusterQueueSnapshot) QuotaFor(fr resources.FlavorResource) *ResourceQu return c.Quotas[fr] } +func (c *ClusterQueueSnapshot) Borrowing(fr resources.FlavorResource) bool { + return c.BorrowingWith(fr, 0) +} + +func (c *ClusterQueueSnapshot) BorrowingWith(fr resources.FlavorResource, val int64) bool { + return c.usageFor(fr)+val > c.nominal(fr) +} + +func (c *ClusterQueueSnapshot) Available(fr resources.FlavorResource) int64 { + if c.Cohort == nil { + return max(0, c.nominal(fr)-c.usageFor(fr)) + } + capacityAvailable := c.RequestableCohortQuota(fr.Flavor, fr.Resource) - c.UsedCohortQuota(fr.Flavor, fr.Resource) + + // if the borrowing limit exists, we cap our available capacity by the borrowing limit. + if borrowingLimit := c.borrowingLimit(fr); borrowingLimit != nil { + withBorrowingRemaining := c.nominal(fr) + *borrowingLimit - c.usageFor(fr) + capacityAvailable = min(capacityAvailable, withBorrowingRemaining) + } + return max(0, capacityAvailable) +} + +func (c *ClusterQueueSnapshot) nominal(fr resources.FlavorResource) int64 { + if quota := c.QuotaFor(fr); quota != nil { + return quota.Nominal + } + return 0 +} + +func (c *ClusterQueueSnapshot) borrowingLimit(fr resources.FlavorResource) *int64 { + if quota := c.QuotaFor(fr); quota != nil { + return quota.BorrowingLimit + } + return nil +} + // The methods below implement several interfaces. See // dominantResourceShareNode, resourceGroupNode, and netQuotaNode. diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 3becc59a96..6ede97c7d3 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -467,21 +467,17 @@ func cqHeapFromCandidates(candidates []*workload.Info, firstOnly bool, snapshot return cqHeap } -type resourcesPerFlavor map[kueue.ResourceFlavorReference]sets.Set[corev1.ResourceName] +type resourcesPerFlavor = sets.Set[resources.FlavorResource] func resourcesRequiringPreemption(assignment flavorassigner.Assignment) resourcesPerFlavor { - resPerFlavor := make(resourcesPerFlavor) + resPerFlavor := sets.New[resources.FlavorResource]() for _, ps := range assignment.PodSets { for res, flvAssignment := range ps.Flavors { // assignments with NoFit mode wouldn't enter the preemption path. if flvAssignment.Mode != flavorassigner.Preempt { continue } - if resPerFlavor[flvAssignment.Name] == nil { - resPerFlavor[flvAssignment.Name] = sets.New(res) - } else { - resPerFlavor[flvAssignment.Name].Insert(res) - } + resPerFlavor.Insert(resources.FlavorResource{Flavor: flvAssignment.Name, Resource: res}) } } return resPerFlavor @@ -543,15 +539,9 @@ func cqIsBorrowing(cq *cache.ClusterQueueSnapshot, resPerFlv resourcesPerFlavor) if cq.Cohort == nil { return false } - for _, rg := range cq.ResourceGroups { - for _, fName := range rg.Flavors { - fUsage := cq.Usage[fName] - for rName := range resPerFlv[fName] { - quota := cq.QuotaFor(resources.FlavorResource{Flavor: fName, Resource: rName}) - if fUsage[rName] > quota.Nominal { - return true - } - } + for fr := range resPerFlv { + if cq.Borrowing(fr) { + return true } } return false @@ -560,7 +550,7 @@ func cqIsBorrowing(cq *cache.ClusterQueueSnapshot, resPerFlv resourcesPerFlavor) func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool { for _, ps := range wl.TotalRequests { for res, flv := range ps.Flavors { - if resPerFlv[flv].Has(res) { + if resPerFlv.Has(resources.FlavorResource{Flavor: flv, Resource: res}) { return true } } @@ -572,58 +562,21 @@ func workloadUsesResources(wl *workload.Info, resPerFlv resourcesPerFlavor) bool // requestable resources and simulated usage of the ClusterQueue and its cohort, // if it belongs to one. func workloadFits(wlReq resources.FlavorResourceQuantitiesFlat, cq *cache.ClusterQueueSnapshot, allowBorrowing bool) bool { - for _, rg := range cq.ResourceGroups { - for _, fName := range rg.Flavors { - cqResUsage := cq.Usage[fName] - for rName := range rg.CoveredResources { - resource := cq.QuotaFor(resources.FlavorResource{Flavor: fName, Resource: rName}) - rReq, found := wlReq[resources.FlavorResource{Flavor: fName, Resource: rName}] - if !found { - // Workload doesn't request this FlavorResource. - continue - } - - if cq.Cohort == nil || !allowBorrowing { - if cqResUsage[rName]+rReq > resource.Nominal { - return false - } - } else { - // When resource.BorrowingLimit == nil there is no borrowing - // limit, so we can skip the check. - if resource.BorrowingLimit != nil { - if cqResUsage[rName]+rReq > resource.Nominal+*resource.BorrowingLimit { - return false - } - } - } - - if cq.Cohort != nil { - cohortResUsage := cq.UsedCohortQuota(fName, rName) - requestableQuota := cq.RequestableCohortQuota(fName, rName) - if cohortResUsage+rReq > requestableQuota { - return false - } - } - } + for fr, v := range wlReq { + if !allowBorrowing && cq.BorrowingWith(fr, v) { + return false + } + if v > cq.Available(fr) { + return false } } return true } func queueUnderNominalInResourcesNeedingPreemption(resPerFlv resourcesPerFlavor, cq *cache.ClusterQueueSnapshot) bool { - for _, rg := range cq.ResourceGroups { - for _, fName := range rg.Flavors { - flvReq, found := resPerFlv[fName] - if !found { - // Workload doesn't request this flavor. - continue - } - cqResUsage := cq.Usage[fName] - for rName := range flvReq { - if cqResUsage[rName] >= cq.QuotaFor(resources.FlavorResource{Flavor: fName, Resource: rName}).Nominal { - return false - } - } + for fr := range resPerFlv { + if cq.Usage.For(fr) >= cq.QuotaFor(fr).Nominal { + return false } } return true