From be4a66680e1977a7f01701d69f2b69894b1e4d73 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Fri, 30 Aug 2024 10:15:54 +0000 Subject: [PATCH 1/2] Move ResourceGroup/ResourceQuota/Cohort; Refactor Parsing --- pkg/cache/clusterqueue.go | 96 +++++++-------------------------------- pkg/cache/cohort.go | 51 +++++++++++++++++++++ pkg/cache/resource.go | 53 +++++++++++++++++++++ 3 files changed, 120 insertions(+), 80 deletions(-) create mode 100644 pkg/cache/cohort.go diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 0c82b1263b..f2720ca1d8 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -31,7 +31,6 @@ import ( "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/hierarchy" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/resources" @@ -100,51 +99,6 @@ func (c *clusterQueue) parentHRN() hierarchicalResourceNode { return c.Parent() } -// cohort is a set of ClusterQueues that can borrow resources from each other. -type cohort struct { - Name string - hierarchy.Cohort[*clusterQueue, *cohort] - - resourceNode ResourceNode -} - -func (c *cohort) GetName() string { - return c.Name -} - -// implements hierarchicalResourceNode interface. - -func (c *cohort) getResourceNode() ResourceNode { - return c.resourceNode -} - -func (c *cohort) parentHRN() hierarchicalResourceNode { - return c.Parent() -} - -type ResourceGroup struct { - CoveredResources sets.Set[corev1.ResourceName] - Flavors []kueue.ResourceFlavorReference - // The set of key labels from all flavors. - // Those keys define the affinity terms of a workload - // that can be matched against the flavors. - LabelKeys sets.Set[string] -} - -func (rg *ResourceGroup) Clone() ResourceGroup { - return ResourceGroup{ - CoveredResources: rg.CoveredResources.Clone(), - Flavors: rg.Flavors, - LabelKeys: rg.LabelKeys.Clone(), - } -} - -type ResourceQuota struct { - Nominal int64 - BorrowingLimit *int64 - LendingLimit *int64 -} - type queue struct { key string reservingWorkloads int @@ -154,14 +108,6 @@ type queue struct { admittedUsage resources.FlavorResourceQuantities } -func newCohort(name string) *cohort { - return &cohort{ - name, - hierarchy.NewCohort[*clusterQueue, *cohort](), - NewResourceNode(), - } -} - // FitInCohort supports the legacy // features.MultiplePreemptions=false path. It doesn't take into // account BorrowingLimits. To be cleaned up in v0.10, when we delete @@ -237,38 +183,28 @@ func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavor return nil } +func createdResourceGroups(kueueRgs []kueue.ResourceGroup) []ResourceGroup { + rgs := make([]ResourceGroup, len(kueueRgs)) + for i, kueueRg := range kueueRgs { + rgs[i] = ResourceGroup{ + CoveredResources: sets.New(kueueRg.CoveredResources...), + Flavors: make([]kueue.ResourceFlavorReference, 0, len(kueueRg.Flavors)), + } + for _, fIn := range kueueRg.Flavors { + rgs[i].Flavors = append(rgs[i].Flavors, fIn.Name) + } + } + return rgs +} + // updateQuotasAndResourceGroups updates Quotas and ResourceGroups. // It returns true if any changes were made. func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) bool { oldRG := c.ResourceGroups oldQuotas := c.resourceNode.Quotas + c.ResourceGroups = createdResourceGroups(in) + c.resourceNode.Quotas = createResourceQuotas(in) - c.ResourceGroups = make([]ResourceGroup, len(in)) - c.resourceNode.Quotas = make(map[resources.FlavorResource]ResourceQuota, 0) - for i, rgIn := range in { - rg := &c.ResourceGroups[i] - *rg = ResourceGroup{ - CoveredResources: sets.New(rgIn.CoveredResources...), - Flavors: make([]kueue.ResourceFlavorReference, 0, len(rgIn.Flavors)), - } - for i := range rgIn.Flavors { - fIn := &rgIn.Flavors[i] - for _, rIn := range fIn.Resources { - nominal := resources.ResourceValue(rIn.Name, rIn.NominalQuota) - rQuota := ResourceQuota{ - Nominal: nominal, - } - if rIn.BorrowingLimit != nil { - rQuota.BorrowingLimit = ptr.To(resources.ResourceValue(rIn.Name, *rIn.BorrowingLimit)) - } - if features.Enabled(features.LendingLimit) && rIn.LendingLimit != nil { - rQuota.LendingLimit = ptr.To(resources.ResourceValue(rIn.Name, *rIn.LendingLimit)) - } - c.resourceNode.Quotas[resources.FlavorResource{Flavor: fIn.Name, Resource: rIn.Name}] = rQuota - } - rg.Flavors = append(rg.Flavors, fIn.Name) - } - } // Start at 1, for backwards compatibility. return c.AllocatableResourceGeneration == 0 || !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) || diff --git a/pkg/cache/cohort.go b/pkg/cache/cohort.go new file mode 100644 index 0000000000..049ca5f971 --- /dev/null +++ b/pkg/cache/cohort.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sigs.k8s.io/kueue/pkg/hierarchy" +) + +// cohort is a set of ClusterQueues that can borrow resources from each other. +type cohort struct { + Name string + hierarchy.Cohort[*clusterQueue, *cohort] + + resourceNode ResourceNode +} + +func newCohort(name string) *cohort { + return &cohort{ + name, + hierarchy.NewCohort[*clusterQueue, *cohort](), + NewResourceNode(), + } +} + +func (c *cohort) GetName() string { + return c.Name +} + +// implements hierarchicalResourceNode interface. + +func (c *cohort) getResourceNode() ResourceNode { + return c.resourceNode +} + +func (c *cohort) parentHRN() hierarchicalResourceNode { + return c.Parent() +} diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 68e5cc9a52..91887608f6 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -17,9 +17,62 @@ limitations under the License. package cache import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/ptr" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" ) +type ResourceGroup struct { + CoveredResources sets.Set[corev1.ResourceName] + Flavors []kueue.ResourceFlavorReference + // The set of key labels from all flavors. + // Those keys define the affinity terms of a workload + // that can be matched against the flavors. + LabelKeys sets.Set[string] +} + +func (rg *ResourceGroup) Clone() ResourceGroup { + return ResourceGroup{ + CoveredResources: rg.CoveredResources.Clone(), + Flavors: rg.Flavors, + LabelKeys: rg.LabelKeys.Clone(), + } +} + +type ResourceQuota struct { + Nominal int64 + BorrowingLimit *int64 + LendingLimit *int64 +} + +func createResourceQuotas(kueueRgs []kueue.ResourceGroup) map[resources.FlavorResource]ResourceQuota { + frCount := 0 + for _, rg := range kueueRgs { + frCount += len(rg.Flavors) * len(rg.CoveredResources) + } + quotas := make(map[resources.FlavorResource]ResourceQuota, frCount) + for _, kueueRg := range kueueRgs { + for _, kueueFlavor := range kueueRg.Flavors { + for _, kueueQuota := range kueueFlavor.Resources { + quota := ResourceQuota{ + Nominal: resources.ResourceValue(kueueQuota.Name, kueueQuota.NominalQuota), + } + if kueueQuota.BorrowingLimit != nil { + quota.BorrowingLimit = ptr.To(resources.ResourceValue(kueueQuota.Name, *kueueQuota.BorrowingLimit)) + } + if features.Enabled(features.LendingLimit) && kueueQuota.LendingLimit != nil { + quota.LendingLimit = ptr.To(resources.ResourceValue(kueueQuota.Name, *kueueQuota.LendingLimit)) + } + quotas[resources.FlavorResource{Flavor: kueueFlavor.Name, Resource: kueueQuota.Name}] = quota + } + } + } + return quotas +} + type resourceGroupNode interface { resourceGroups() []ResourceGroup } From bcdebab02af2db0c6436a104194975307d8d2432 Mon Sep 17 00:00:00 2001 From: Gabe <15304068+gabesaba@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:08:53 +0000 Subject: [PATCH 2/2] Support Cohorts with Resources in cache/hierarchy packages --- .golangci.yaml | 3 ++ pkg/cache/cache.go | 22 ++++++++++ pkg/cache/cache_test.go | 47 ++++++++++++++++++++ pkg/cache/clusterqueue.go | 6 ++- pkg/cache/cohort.go | 6 +++ pkg/cache/resource.go | 1 + pkg/cache/resource_node.go | 5 +++ pkg/cache/snapshot_test.go | 81 +++++++++++++++++++++++++++++++++++ pkg/hierarchy/cohort.go | 11 +++++ pkg/hierarchy/manager.go | 32 +++++++++++++- pkg/hierarchy/manager_test.go | 70 ++++++++++++++++++++++++++++-- pkg/util/testing/wrappers.go | 32 ++++++++++++-- 12 files changed, 307 insertions(+), 9 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index 3d67c8eafb..0c22247a72 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -57,6 +57,9 @@ issues: - staticcheck # TODO(#768): Drop when incrementing the API version. text: "SA1019: constants.QueueAnnotation is deprecated" + - linters: + - unused + path: "^pkg/hierarchy.cohort.go" # Show all issues from a linter max-issues-per-linter: 0 # Show all issues with the same text diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index fd6bc28882..f431e80704 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -32,6 +32,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/hierarchy" @@ -397,6 +398,27 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { metrics.ClearCacheMetrics(cq.Name) } +func (c *Cache) AddCohort(apiCohort *kueuealpha.Cohort) { + c.Lock() + defer c.Unlock() + cohort := newCohort(apiCohort.Name) + c.hm.AddCohort(cohort) + cohort.updateCohort(apiCohort) +} + +func (c *Cache) DeleteCohort(apiCohort *kueuealpha.Cohort) { + c.Lock() + defer c.Unlock() + c.hm.DeleteCohort(apiCohort.Name) + + // If the cohort still exists after deletion, it means + // that it has one or more children referencing it. + // We need to run update algorithm. + if cohort, ok := c.hm.Cohorts[apiCohort.Name]; ok { + updateCohortResourceNode(cohort) + } +} + func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error { c.Lock() defer c.Unlock() diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 6f86796abc..c8f205b7a1 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1051,6 +1051,53 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }, }, + { + name: "create cohort", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + return nil + }, + wantCohorts: map[string]sets.Set[string]{ + "cohort": nil, + }, + }, + { + name: "create and delete cohort", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + cache.DeleteCohort(cohort) + return nil + }, + wantCohorts: nil, + }, + { + name: "cohort remains after deletion when child exists", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + + _ = cache.AddClusterQueue(context.Background(), + utiltesting.MakeClusterQueue("cq").Cohort("cohort").Obj()) + cache.DeleteCohort(cohort) + return nil + }, + wantClusterQueues: map[string]*clusterQueue{ + "cq": { + Name: "cq", + NamespaceSelector: labels.Everything(), + Status: active, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + FairWeight: oneQuantity, + }, + }, + wantCohorts: map[string]sets.Set[string]{ + "cohort": sets.New("cq"), + }, + }, } for _, tc := range cases { diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index f2720ca1d8..61fa63b436 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -134,13 +134,17 @@ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borr func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error { if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() { - updateClusterQueueResourceNode(c) c.AllocatableResourceGeneration += 1 if oldParent != nil && oldParent != c.Parent() { updateCohortResourceNode(oldParent) } if c.HasParent() { + // clusterQueue will be updated as part of tree update. updateCohortResourceNode(c.Parent()) + } else { + // since ClusterQueue has no parent, it won't be updated + // as part of tree update. + updateClusterQueueResourceNode(c) } } diff --git a/pkg/cache/cohort.go b/pkg/cache/cohort.go index 049ca5f971..d88929dae2 100644 --- a/pkg/cache/cohort.go +++ b/pkg/cache/cohort.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/hierarchy" ) @@ -36,6 +37,11 @@ func newCohort(name string) *cohort { } } +func (c *cohort) updateCohort(apiCohort *kueuealpha.Cohort) { + c.resourceNode.Quotas = createResourceQuotas(apiCohort.Spec.ResourceGroups) + updateCohortResourceNode(c) +} + func (c *cohort) GetName() string { return c.Name } diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 91887608f6..552bf6bdf5 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -20,6 +20,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" diff --git a/pkg/cache/resource_node.go b/pkg/cache/resource_node.go index 665196bb30..a9f893b19f 100644 --- a/pkg/cache/resource_node.go +++ b/pkg/cache/resource_node.go @@ -163,7 +163,12 @@ func updateClusterQueueResourceNode(cq *clusterQueue) { func updateCohortResourceNode(cohort *cohort) { cohort.resourceNode.SubtreeQuota = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.SubtreeQuota)) cohort.resourceNode.Usage = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.Usage)) + + for fr, quota := range cohort.resourceNode.Quotas { + cohort.resourceNode.SubtreeQuota[fr] = quota.Nominal + } for _, child := range cohort.ChildCQs() { + updateClusterQueueResourceNode(child) for fr, childQuota := range child.resourceNode.SubtreeQuota { cohort.resourceNode.SubtreeQuota[fr] += childQuota - child.resourceNode.guaranteedQuota(fr) } diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index 4e5b53ce72..6ff00dae52 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" @@ -45,6 +46,7 @@ var snapCmpOpts = []cmp.Option{ func TestSnapshot(t *testing.T) { testCases := map[string]struct { cqs []*kueue.ClusterQueue + cohorts []*kueuealpha.Cohort rfs []*kueue.ResourceFlavor wls []*kueue.Workload wantSnapshot Snapshot @@ -699,6 +701,82 @@ func TestSnapshot(t *testing.T) { } }(), }, + "cohort provides resources": { + rfs: []*kueue.ResourceFlavor{ + utiltesting.MakeResourceFlavor("arm").Obj(), + utiltesting.MakeResourceFlavor("x86").Obj(), + utiltesting.MakeResourceFlavor("mips").Obj(), + }, + cqs: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue("cq"). + Cohort("cohort"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "7", "", "3").Obj(), + *utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "5").Obj(), + ). + Obj(), + }, + cohorts: []*kueuealpha.Cohort{ + utiltesting.MakeCohort("cohort"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "10").Obj(), + *utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "20").Obj(), + ). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("mips").Resource(corev1.ResourceCPU, "42").Obj(), + ).Obj(), + }, + wantSnapshot: Snapshot{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ + "cq": { + Name: "cq", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{ + { + CoveredResources: sets.New(corev1.ResourceCPU), + Flavors: []kueue.ResourceFlavorReference{"arm", "x86"}, + }, + }, + ResourceNode: ResourceNode{ + Quotas: map[resources.FlavorResource]ResourceQuota{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 7_000, BorrowingLimit: nil, LendingLimit: ptr.To[int64](3_000)}, + {Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 5_000, BorrowingLimit: nil, LendingLimit: nil}, + }, + SubtreeQuota: resources.FlavorResourceQuantities{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: 7_000, + {Flavor: "x86", Resource: corev1.ResourceCPU}: 5_000, + }, + }, + FlavorFungibility: defaultFlavorFungibility, + FairWeight: oneQuantity, + Preemption: defaultPreemption, + NamespaceSelector: labels.Everything(), + Status: active, + Cohort: &CohortSnapshot{ + Name: "cohort", + AllocatableResourceGeneration: 1, + ResourceNode: ResourceNode{ + Quotas: map[resources.FlavorResource]ResourceQuota{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 10_000, BorrowingLimit: nil, LendingLimit: nil}, + {Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 20_000, BorrowingLimit: nil, LendingLimit: nil}, + {Flavor: "mips", Resource: corev1.ResourceCPU}: {Nominal: 42_000, BorrowingLimit: nil, LendingLimit: nil}, + }, + SubtreeQuota: resources.FlavorResourceQuantities{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: 13_000, + {Flavor: "x86", Resource: corev1.ResourceCPU}: 25_000, + {Flavor: "mips", Resource: corev1.ResourceCPU}: 42_000, + }, + }, + }, + }, + }, + ResourceFlavors: map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor{ + "arm": utiltesting.MakeResourceFlavor("arm").Obj(), + "x86": utiltesting.MakeResourceFlavor("x86").Obj(), + "mips": utiltesting.MakeResourceFlavor("mips").Obj(), + }, + }, + }, } for name, tc := range testCases { @@ -714,6 +792,9 @@ func TestSnapshot(t *testing.T) { t.Fatalf("Failed adding ClusterQueue: %v", err) } } + for _, cohort := range tc.cohorts { + cache.AddCohort(cohort) + } for _, rf := range tc.rfs { cache.AddOrUpdateResourceFlavor(rf) } diff --git a/pkg/hierarchy/cohort.go b/pkg/hierarchy/cohort.go index 43893dda2d..15d0ce91b2 100644 --- a/pkg/hierarchy/cohort.go +++ b/pkg/hierarchy/cohort.go @@ -20,6 +20,9 @@ import "k8s.io/apimachinery/pkg/util/sets" type Cohort[CQ, C nodeBase] struct { childCqs sets.Set[CQ] + // Indicates whether this Cohort is backed + // by an API object. + explicit bool } func (c *Cohort[CQ, C]) Parent() C { @@ -52,3 +55,11 @@ func (c *Cohort[CQ, C]) deleteClusterQueue(cq CQ) { func (c *Cohort[CQ, C]) childCount() int { return c.childCqs.Len() } + +func (c *Cohort[CQ, C]) isExplicit() bool { + return c.explicit +} + +func (c *Cohort[CQ, C]) markExplicit() { + c.explicit = true +} diff --git a/pkg/hierarchy/manager.go b/pkg/hierarchy/manager.go index d2d6e448f5..b7b4f29406 100644 --- a/pkg/hierarchy/manager.go +++ b/pkg/hierarchy/manager.go @@ -53,8 +53,35 @@ func (c *Manager[CQ, C]) DeleteClusterQueue(name string) { if cq, ok := c.ClusterQueues[name]; ok { c.unwireClusterQueue(cq) delete(c.ClusterQueues, name) + } +} + +func (c *Manager[CQ, C]) AddCohort(cohort C) { + cohort.markExplicit() + if oldCohort, ok := c.Cohorts[cohort.GetName()]; ok { + c.rewireChildren(oldCohort, cohort) + } + c.Cohorts[cohort.GetName()] = cohort +} + +func (c *Manager[CQ, C]) DeleteCohort(name string) { + cohort, ok := c.Cohorts[name] + delete(c.Cohorts, name) + if !ok || cohort.childCount() == 0 { return } + implicitCohort := c.cohortFactory(name) + c.Cohorts[implicitCohort.GetName()] = implicitCohort + c.rewireChildren(cohort, implicitCohort) +} + +// rewireChildren is used when we are changing a Cohort +// from an implicit to an explicit Cohort, or vice-versa. +func (c *Manager[CQ, C]) rewireChildren(old, new C) { + for _, cq := range old.ChildCQs() { + cq.setParent(new) + new.insertClusterQueue(cq) + } } func (c *Manager[CQ, C]) unwireClusterQueue(cq CQ) { @@ -75,7 +102,7 @@ func (c *Manager[CQ, C]) getOrCreateCohort(cohortName string) C { } func (c *Manager[CQ, C]) cleanupCohort(cohort C) { - if cohort.childCount() == 0 { + if !cohort.isExplicit() && cohort.childCount() == 0 { delete(c.Cohorts, cohort.GetName()) } } @@ -96,5 +123,8 @@ type cohortNode[CQ nodeBase] interface { insertClusterQueue(CQ) deleteClusterQueue(CQ) childCount() int + ChildCQs() []CQ + isExplicit() bool + markExplicit() nodeBase } diff --git a/pkg/hierarchy/manager_test.go b/pkg/hierarchy/manager_test.go index 782d494ece..3c902fcac0 100644 --- a/pkg/hierarchy/manager_test.go +++ b/pkg/hierarchy/manager_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/util/sets" ) @@ -29,6 +30,13 @@ func TestManager(t *testing.T) { cq string cohort string } + opts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b cqEdge) bool { + return a.cq < b.cq + }), + cmp.AllowUnexported(cqEdge{}), + } tests := map[string]struct { operations func(M) wantCqs sets.Set[string] @@ -158,11 +166,65 @@ func TestManager(t *testing.T) { wantCohorts: sets.New("cohort"), wantCqEdge: []cqEdge{{"queue1", "cohort"}}, }, + "explicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + }, + wantCohorts: sets.New("cohort"), + }, + "delete explicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New[string](), + }, + "delete explicit cohort idempotent": { + operations: func(m M) { + m.DeleteCohort("cohort") + m.AddCohort(newCohort("cohort")) + m.DeleteCohort("cohort") + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New[string](), + }, + "explicit cohort persists after child deleted": { + operations: func(m M) { + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.AddCohort(newCohort("cohort")) + m.DeleteClusterQueue("queue") + }, + wantCohorts: sets.New("cohort"), + }, + "explicit cohort downgraded to implicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New("cohort"), + wantCqs: sets.New("queue"), + wantCqEdge: []cqEdge{{"queue", "cohort"}}, + }, + "cohort upgraded to explicit then downgraded to implicit then deleted": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.DeleteCohort("cohort") + m.DeleteClusterQueue("queue") + }, + wantCohorts: sets.New[string](), + wantCqs: sets.New[string](), + wantCqEdge: []cqEdge{}, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - mgr := NewManager[*testClusterQueue, *testCohort](cohortFactory) + mgr := NewManager(newCohort) tc.operations(mgr) t.Run("verify clusterqueues", func(t *testing.T) { gotCqs := sets.New[string]() @@ -176,7 +238,7 @@ func TestManager(t *testing.T) { if diff := cmp.Diff(tc.wantCqs, gotCqs); diff != "" { t.Fatalf("Unexpected cqs -want +got %s", diff) } - if diff := cmp.Diff(sets.New(tc.wantCqEdge...), sets.New(gotEdges...)); diff != "" { + if diff := cmp.Diff(tc.wantCqEdge, gotEdges, opts...); diff != "" { t.Fatalf("Unexpected CQ->Cohort edges -want +got %s", diff) } }) @@ -192,7 +254,7 @@ func TestManager(t *testing.T) { if diff := cmp.Diff(tc.wantCohorts, gotCohorts); diff != "" { t.Fatalf("Unexpected cohorts -want +got %s", diff) } - if diff := cmp.Diff(sets.New(tc.wantCqEdge...), sets.New(gotEdges...)); diff != "" { + if diff := cmp.Diff(tc.wantCqEdge, gotEdges, opts...); diff != "" { t.Fatalf("Unexpected Cohort->CQ edges -want +got %s", diff) } }) @@ -205,7 +267,7 @@ type testCohort struct { Cohort[*testClusterQueue, *testCohort] } -func cohortFactory(name string) *testCohort { +func newCohort(name string) *testCohort { return &testCohort{ name: name, Cohort: NewCohort[*testClusterQueue, *testCohort](), diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index fd26e8315a..74c7eb0e67 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -584,6 +584,28 @@ func (q *LocalQueueWrapper) Generation(num int64) *LocalQueueWrapper { return q } +type CohortWrapper struct { + kueuealpha.Cohort +} + +func MakeCohort(name string) *CohortWrapper { + return &CohortWrapper{kueuealpha.Cohort{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }} +} + +func (c *CohortWrapper) Obj() *kueuealpha.Cohort { + return &c.Cohort +} + +// ResourceGroup adds a ResourceGroup with flavors. +func (c *CohortWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *CohortWrapper { + c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, createResourceGroup(flavors...)) + return c +} + // ClusterQueueWrapper wraps a ClusterQueue. type ClusterQueueWrapper struct{ kueue.ClusterQueue } @@ -624,8 +646,7 @@ func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheck return c } -// ResourceGroup adds a ResourceGroup with flavors. -func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { +func createResourceGroup(flavors ...kueue.FlavorQuotas) kueue.ResourceGroup { rg := kueue.ResourceGroup{ Flavors: flavors, } @@ -646,7 +667,12 @@ func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *Clus } rg.CoveredResources = resources } - c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, rg) + return rg +} + +// ResourceGroup adds a ResourceGroup with flavors. +func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { + c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, createResourceGroup(flavors...)) return c }