diff --git a/.golangci.yaml b/.golangci.yaml index 3d67c8eafb..0c22247a72 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -57,6 +57,9 @@ issues: - staticcheck # TODO(#768): Drop when incrementing the API version. text: "SA1019: constants.QueueAnnotation is deprecated" + - linters: + - unused + path: "^pkg/hierarchy.cohort.go" # Show all issues from a linter max-issues-per-linter: 0 # Show all issues with the same text diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index fd6bc28882..f431e80704 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -32,6 +32,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/hierarchy" @@ -397,6 +398,27 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { metrics.ClearCacheMetrics(cq.Name) } +func (c *Cache) AddCohort(apiCohort *kueuealpha.Cohort) { + c.Lock() + defer c.Unlock() + cohort := newCohort(apiCohort.Name) + c.hm.AddCohort(cohort) + cohort.updateCohort(apiCohort) +} + +func (c *Cache) DeleteCohort(apiCohort *kueuealpha.Cohort) { + c.Lock() + defer c.Unlock() + c.hm.DeleteCohort(apiCohort.Name) + + // If the cohort still exists after deletion, it means + // that it has one or more children referencing it. + // We need to run update algorithm. + if cohort, ok := c.hm.Cohorts[apiCohort.Name]; ok { + updateCohortResourceNode(cohort) + } +} + func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error { c.Lock() defer c.Unlock() diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 6f86796abc..c8f205b7a1 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1051,6 +1051,53 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }, }, + { + name: "create cohort", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + return nil + }, + wantCohorts: map[string]sets.Set[string]{ + "cohort": nil, + }, + }, + { + name: "create and delete cohort", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + cache.DeleteCohort(cohort) + return nil + }, + wantCohorts: nil, + }, + { + name: "cohort remains after deletion when child exists", + operation: func(cache *Cache) error { + cohort := utiltesting.MakeCohort("cohort").Obj() + cache.AddCohort(cohort) + + _ = cache.AddClusterQueue(context.Background(), + utiltesting.MakeClusterQueue("cq").Cohort("cohort").Obj()) + cache.DeleteCohort(cohort) + return nil + }, + wantClusterQueues: map[string]*clusterQueue{ + "cq": { + Name: "cq", + NamespaceSelector: labels.Everything(), + Status: active, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + FairWeight: oneQuantity, + }, + }, + wantCohorts: map[string]sets.Set[string]{ + "cohort": sets.New("cq"), + }, + }, } for _, tc := range cases { diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index f2720ca1d8..61fa63b436 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -134,13 +134,17 @@ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borr func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error { if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() { - updateClusterQueueResourceNode(c) c.AllocatableResourceGeneration += 1 if oldParent != nil && oldParent != c.Parent() { updateCohortResourceNode(oldParent) } if c.HasParent() { + // clusterQueue will be updated as part of tree update. updateCohortResourceNode(c.Parent()) + } else { + // since ClusterQueue has no parent, it won't be updated + // as part of tree update. + updateClusterQueueResourceNode(c) } } diff --git a/pkg/cache/cohort.go b/pkg/cache/cohort.go index 049ca5f971..d88929dae2 100644 --- a/pkg/cache/cohort.go +++ b/pkg/cache/cohort.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/hierarchy" ) @@ -36,6 +37,11 @@ func newCohort(name string) *cohort { } } +func (c *cohort) updateCohort(apiCohort *kueuealpha.Cohort) { + c.resourceNode.Quotas = createResourceQuotas(apiCohort.Spec.ResourceGroups) + updateCohortResourceNode(c) +} + func (c *cohort) GetName() string { return c.Name } diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 91887608f6..552bf6bdf5 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -20,6 +20,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" diff --git a/pkg/cache/resource_node.go b/pkg/cache/resource_node.go index 665196bb30..a9f893b19f 100644 --- a/pkg/cache/resource_node.go +++ b/pkg/cache/resource_node.go @@ -163,7 +163,12 @@ func updateClusterQueueResourceNode(cq *clusterQueue) { func updateCohortResourceNode(cohort *cohort) { cohort.resourceNode.SubtreeQuota = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.SubtreeQuota)) cohort.resourceNode.Usage = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.Usage)) + + for fr, quota := range cohort.resourceNode.Quotas { + cohort.resourceNode.SubtreeQuota[fr] = quota.Nominal + } for _, child := range cohort.ChildCQs() { + updateClusterQueueResourceNode(child) for fr, childQuota := range child.resourceNode.SubtreeQuota { cohort.resourceNode.SubtreeQuota[fr] += childQuota - child.resourceNode.guaranteedQuota(fr) } diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index 4e5b53ce72..6ff00dae52 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" @@ -45,6 +46,7 @@ var snapCmpOpts = []cmp.Option{ func TestSnapshot(t *testing.T) { testCases := map[string]struct { cqs []*kueue.ClusterQueue + cohorts []*kueuealpha.Cohort rfs []*kueue.ResourceFlavor wls []*kueue.Workload wantSnapshot Snapshot @@ -699,6 +701,82 @@ func TestSnapshot(t *testing.T) { } }(), }, + "cohort provides resources": { + rfs: []*kueue.ResourceFlavor{ + utiltesting.MakeResourceFlavor("arm").Obj(), + utiltesting.MakeResourceFlavor("x86").Obj(), + utiltesting.MakeResourceFlavor("mips").Obj(), + }, + cqs: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue("cq"). + Cohort("cohort"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "7", "", "3").Obj(), + *utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "5").Obj(), + ). + Obj(), + }, + cohorts: []*kueuealpha.Cohort{ + utiltesting.MakeCohort("cohort"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "10").Obj(), + *utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "20").Obj(), + ). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("mips").Resource(corev1.ResourceCPU, "42").Obj(), + ).Obj(), + }, + wantSnapshot: Snapshot{ + ClusterQueues: map[string]*ClusterQueueSnapshot{ + "cq": { + Name: "cq", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{ + { + CoveredResources: sets.New(corev1.ResourceCPU), + Flavors: []kueue.ResourceFlavorReference{"arm", "x86"}, + }, + }, + ResourceNode: ResourceNode{ + Quotas: map[resources.FlavorResource]ResourceQuota{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 7_000, BorrowingLimit: nil, LendingLimit: ptr.To[int64](3_000)}, + {Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 5_000, BorrowingLimit: nil, LendingLimit: nil}, + }, + SubtreeQuota: resources.FlavorResourceQuantities{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: 7_000, + {Flavor: "x86", Resource: corev1.ResourceCPU}: 5_000, + }, + }, + FlavorFungibility: defaultFlavorFungibility, + FairWeight: oneQuantity, + Preemption: defaultPreemption, + NamespaceSelector: labels.Everything(), + Status: active, + Cohort: &CohortSnapshot{ + Name: "cohort", + AllocatableResourceGeneration: 1, + ResourceNode: ResourceNode{ + Quotas: map[resources.FlavorResource]ResourceQuota{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 10_000, BorrowingLimit: nil, LendingLimit: nil}, + {Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 20_000, BorrowingLimit: nil, LendingLimit: nil}, + {Flavor: "mips", Resource: corev1.ResourceCPU}: {Nominal: 42_000, BorrowingLimit: nil, LendingLimit: nil}, + }, + SubtreeQuota: resources.FlavorResourceQuantities{ + {Flavor: "arm", Resource: corev1.ResourceCPU}: 13_000, + {Flavor: "x86", Resource: corev1.ResourceCPU}: 25_000, + {Flavor: "mips", Resource: corev1.ResourceCPU}: 42_000, + }, + }, + }, + }, + }, + ResourceFlavors: map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor{ + "arm": utiltesting.MakeResourceFlavor("arm").Obj(), + "x86": utiltesting.MakeResourceFlavor("x86").Obj(), + "mips": utiltesting.MakeResourceFlavor("mips").Obj(), + }, + }, + }, } for name, tc := range testCases { @@ -714,6 +792,9 @@ func TestSnapshot(t *testing.T) { t.Fatalf("Failed adding ClusterQueue: %v", err) } } + for _, cohort := range tc.cohorts { + cache.AddCohort(cohort) + } for _, rf := range tc.rfs { cache.AddOrUpdateResourceFlavor(rf) } diff --git a/pkg/hierarchy/cohort.go b/pkg/hierarchy/cohort.go index 43893dda2d..15d0ce91b2 100644 --- a/pkg/hierarchy/cohort.go +++ b/pkg/hierarchy/cohort.go @@ -20,6 +20,9 @@ import "k8s.io/apimachinery/pkg/util/sets" type Cohort[CQ, C nodeBase] struct { childCqs sets.Set[CQ] + // Indicates whether this Cohort is backed + // by an API object. + explicit bool } func (c *Cohort[CQ, C]) Parent() C { @@ -52,3 +55,11 @@ func (c *Cohort[CQ, C]) deleteClusterQueue(cq CQ) { func (c *Cohort[CQ, C]) childCount() int { return c.childCqs.Len() } + +func (c *Cohort[CQ, C]) isExplicit() bool { + return c.explicit +} + +func (c *Cohort[CQ, C]) markExplicit() { + c.explicit = true +} diff --git a/pkg/hierarchy/manager.go b/pkg/hierarchy/manager.go index d2d6e448f5..b7b4f29406 100644 --- a/pkg/hierarchy/manager.go +++ b/pkg/hierarchy/manager.go @@ -53,8 +53,35 @@ func (c *Manager[CQ, C]) DeleteClusterQueue(name string) { if cq, ok := c.ClusterQueues[name]; ok { c.unwireClusterQueue(cq) delete(c.ClusterQueues, name) + } +} + +func (c *Manager[CQ, C]) AddCohort(cohort C) { + cohort.markExplicit() + if oldCohort, ok := c.Cohorts[cohort.GetName()]; ok { + c.rewireChildren(oldCohort, cohort) + } + c.Cohorts[cohort.GetName()] = cohort +} + +func (c *Manager[CQ, C]) DeleteCohort(name string) { + cohort, ok := c.Cohorts[name] + delete(c.Cohorts, name) + if !ok || cohort.childCount() == 0 { return } + implicitCohort := c.cohortFactory(name) + c.Cohorts[implicitCohort.GetName()] = implicitCohort + c.rewireChildren(cohort, implicitCohort) +} + +// rewireChildren is used when we are changing a Cohort +// from an implicit to an explicit Cohort, or vice-versa. +func (c *Manager[CQ, C]) rewireChildren(old, new C) { + for _, cq := range old.ChildCQs() { + cq.setParent(new) + new.insertClusterQueue(cq) + } } func (c *Manager[CQ, C]) unwireClusterQueue(cq CQ) { @@ -75,7 +102,7 @@ func (c *Manager[CQ, C]) getOrCreateCohort(cohortName string) C { } func (c *Manager[CQ, C]) cleanupCohort(cohort C) { - if cohort.childCount() == 0 { + if !cohort.isExplicit() && cohort.childCount() == 0 { delete(c.Cohorts, cohort.GetName()) } } @@ -96,5 +123,8 @@ type cohortNode[CQ nodeBase] interface { insertClusterQueue(CQ) deleteClusterQueue(CQ) childCount() int + ChildCQs() []CQ + isExplicit() bool + markExplicit() nodeBase } diff --git a/pkg/hierarchy/manager_test.go b/pkg/hierarchy/manager_test.go index 782d494ece..3c902fcac0 100644 --- a/pkg/hierarchy/manager_test.go +++ b/pkg/hierarchy/manager_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/util/sets" ) @@ -29,6 +30,13 @@ func TestManager(t *testing.T) { cq string cohort string } + opts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b cqEdge) bool { + return a.cq < b.cq + }), + cmp.AllowUnexported(cqEdge{}), + } tests := map[string]struct { operations func(M) wantCqs sets.Set[string] @@ -158,11 +166,65 @@ func TestManager(t *testing.T) { wantCohorts: sets.New("cohort"), wantCqEdge: []cqEdge{{"queue1", "cohort"}}, }, + "explicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + }, + wantCohorts: sets.New("cohort"), + }, + "delete explicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New[string](), + }, + "delete explicit cohort idempotent": { + operations: func(m M) { + m.DeleteCohort("cohort") + m.AddCohort(newCohort("cohort")) + m.DeleteCohort("cohort") + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New[string](), + }, + "explicit cohort persists after child deleted": { + operations: func(m M) { + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.AddCohort(newCohort("cohort")) + m.DeleteClusterQueue("queue") + }, + wantCohorts: sets.New("cohort"), + }, + "explicit cohort downgraded to implicit cohort": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.DeleteCohort("cohort") + }, + wantCohorts: sets.New("cohort"), + wantCqs: sets.New("queue"), + wantCqEdge: []cqEdge{{"queue", "cohort"}}, + }, + "cohort upgraded to explicit then downgraded to implicit then deleted": { + operations: func(m M) { + m.AddCohort(newCohort("cohort")) + m.AddClusterQueue(newCq("queue")) + m.UpdateClusterQueueEdge("queue", "cohort") + m.DeleteCohort("cohort") + m.DeleteClusterQueue("queue") + }, + wantCohorts: sets.New[string](), + wantCqs: sets.New[string](), + wantCqEdge: []cqEdge{}, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - mgr := NewManager[*testClusterQueue, *testCohort](cohortFactory) + mgr := NewManager(newCohort) tc.operations(mgr) t.Run("verify clusterqueues", func(t *testing.T) { gotCqs := sets.New[string]() @@ -176,7 +238,7 @@ func TestManager(t *testing.T) { if diff := cmp.Diff(tc.wantCqs, gotCqs); diff != "" { t.Fatalf("Unexpected cqs -want +got %s", diff) } - if diff := cmp.Diff(sets.New(tc.wantCqEdge...), sets.New(gotEdges...)); diff != "" { + if diff := cmp.Diff(tc.wantCqEdge, gotEdges, opts...); diff != "" { t.Fatalf("Unexpected CQ->Cohort edges -want +got %s", diff) } }) @@ -192,7 +254,7 @@ func TestManager(t *testing.T) { if diff := cmp.Diff(tc.wantCohorts, gotCohorts); diff != "" { t.Fatalf("Unexpected cohorts -want +got %s", diff) } - if diff := cmp.Diff(sets.New(tc.wantCqEdge...), sets.New(gotEdges...)); diff != "" { + if diff := cmp.Diff(tc.wantCqEdge, gotEdges, opts...); diff != "" { t.Fatalf("Unexpected Cohort->CQ edges -want +got %s", diff) } }) @@ -205,7 +267,7 @@ type testCohort struct { Cohort[*testClusterQueue, *testCohort] } -func cohortFactory(name string) *testCohort { +func newCohort(name string) *testCohort { return &testCohort{ name: name, Cohort: NewCohort[*testClusterQueue, *testCohort](), diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index fd26e8315a..74c7eb0e67 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -584,6 +584,28 @@ func (q *LocalQueueWrapper) Generation(num int64) *LocalQueueWrapper { return q } +type CohortWrapper struct { + kueuealpha.Cohort +} + +func MakeCohort(name string) *CohortWrapper { + return &CohortWrapper{kueuealpha.Cohort{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }} +} + +func (c *CohortWrapper) Obj() *kueuealpha.Cohort { + return &c.Cohort +} + +// ResourceGroup adds a ResourceGroup with flavors. +func (c *CohortWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *CohortWrapper { + c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, createResourceGroup(flavors...)) + return c +} + // ClusterQueueWrapper wraps a ClusterQueue. type ClusterQueueWrapper struct{ kueue.ClusterQueue } @@ -624,8 +646,7 @@ func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheck return c } -// ResourceGroup adds a ResourceGroup with flavors. -func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { +func createResourceGroup(flavors ...kueue.FlavorQuotas) kueue.ResourceGroup { rg := kueue.ResourceGroup{ Flavors: flavors, } @@ -646,7 +667,12 @@ func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *Clus } rg.CoveredResources = resources } - c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, rg) + return rg +} + +// ResourceGroup adds a ResourceGroup with flavors. +func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { + c.Spec.ResourceGroups = append(c.Spec.ResourceGroups, createResourceGroup(flavors...)) return c }