Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cohorts with Resources in cache/hierarchy packages #2939

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ issues:
- staticcheck
# TODO(#768): Drop when incrementing the API version.
text: "SA1019: constants.QueueAnnotation is deprecated"
- linters:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking here: #2965

- unused
path: "^pkg/hierarchy.cohort.go"
# Show all issues from a linter
max-issues-per-linter: 0
# Show all issues with the same text
Expand Down
22 changes: 22 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/hierarchy"
Expand Down Expand Up @@ -397,6 +398,27 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
metrics.ClearCacheMetrics(cq.Name)
}

func (c *Cache) AddCohort(apiCohort *kueuealpha.Cohort) {
c.Lock()
defer c.Unlock()
cohort := newCohort(apiCohort.Name)
c.hm.AddCohort(cohort)
cohort.updateCohort(apiCohort)
}

func (c *Cache) DeleteCohort(apiCohort *kueuealpha.Cohort) {
c.Lock()
defer c.Unlock()
c.hm.DeleteCohort(apiCohort.Name)

// If the cohort still exists after deletion, it means
// that it has one or more children referencing it.
// We need to run update algorithm.
if cohort, ok := c.hm.Cohorts[apiCohort.Name]; ok {
updateCohortResourceNode(cohort)
}
}

func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error {
c.Lock()
defer c.Unlock()
Expand Down
47 changes: 47 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,53 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
},
},
{
name: "create cohort",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)
return nil
},
wantCohorts: map[string]sets.Set[string]{
"cohort": nil,
},
},
{
name: "create and delete cohort",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)
cache.DeleteCohort(cohort)
return nil
},
wantCohorts: nil,
},
{
name: "cohort remains after deletion when child exists",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)

_ = cache.AddClusterQueue(context.Background(),
utiltesting.MakeClusterQueue("cq").Cohort("cohort").Obj())
cache.DeleteCohort(cohort)
return nil
},
wantClusterQueues: map[string]*clusterQueue{
"cq": {
Name: "cq",
NamespaceSelector: labels.Everything(),
Status: active,
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
"cohort": sets.New("cq"),
},
},
}

for _, tc := range cases {
Expand Down
102 changes: 21 additions & 81 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -100,51 +99,6 @@ func (c *clusterQueue) parentHRN() hierarchicalResourceNode {
return c.Parent()
}

// cohort is a set of ClusterQueues that can borrow resources from each other.
type cohort struct {
Name string
hierarchy.Cohort[*clusterQueue, *cohort]

resourceNode ResourceNode
}

func (c *cohort) GetName() string {
return c.Name
}

// implements hierarchicalResourceNode interface.

func (c *cohort) getResourceNode() ResourceNode {
return c.resourceNode
}

func (c *cohort) parentHRN() hierarchicalResourceNode {
return c.Parent()
}

type ResourceGroup struct {
CoveredResources sets.Set[corev1.ResourceName]
Flavors []kueue.ResourceFlavorReference
// The set of key labels from all flavors.
// Those keys define the affinity terms of a workload
// that can be matched against the flavors.
LabelKeys sets.Set[string]
}

func (rg *ResourceGroup) Clone() ResourceGroup {
return ResourceGroup{
CoveredResources: rg.CoveredResources.Clone(),
Flavors: rg.Flavors,
LabelKeys: rg.LabelKeys.Clone(),
}
}

type ResourceQuota struct {
Nominal int64
BorrowingLimit *int64
LendingLimit *int64
}

type queue struct {
key string
reservingWorkloads int
Expand All @@ -154,14 +108,6 @@ type queue struct {
admittedUsage resources.FlavorResourceQuantities
}

func newCohort(name string) *cohort {
return &cohort{
name,
hierarchy.NewCohort[*clusterQueue, *cohort](),
NewResourceNode(),
}
}

// FitInCohort supports the legacy
// features.MultiplePreemptions=false path. It doesn't take into
// account BorrowingLimits. To be cleaned up in v0.10, when we delete
Expand All @@ -188,13 +134,17 @@ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borr

func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error {
if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() {
updateClusterQueueResourceNode(c)
c.AllocatableResourceGeneration += 1
if oldParent != nil && oldParent != c.Parent() {
updateCohortResourceNode(oldParent)
}
if c.HasParent() {
// clusterQueue will be updated as part of tree update.
updateCohortResourceNode(c.Parent())
} else {
// since ClusterQueue has no parent, it won't be updated
// as part of tree update.
updateClusterQueueResourceNode(c)
}
}

Expand Down Expand Up @@ -237,38 +187,28 @@ func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavor
return nil
}

func createdResourceGroups(kueueRgs []kueue.ResourceGroup) []ResourceGroup {
rgs := make([]ResourceGroup, len(kueueRgs))
for i, kueueRg := range kueueRgs {
rgs[i] = ResourceGroup{
CoveredResources: sets.New(kueueRg.CoveredResources...),
Flavors: make([]kueue.ResourceFlavorReference, 0, len(kueueRg.Flavors)),
}
for _, fIn := range kueueRg.Flavors {
rgs[i].Flavors = append(rgs[i].Flavors, fIn.Name)
}
}
return rgs
}

// updateQuotasAndResourceGroups updates Quotas and ResourceGroups.
// It returns true if any changes were made.
func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) bool {
oldRG := c.ResourceGroups
oldQuotas := c.resourceNode.Quotas
c.ResourceGroups = createdResourceGroups(in)
c.resourceNode.Quotas = createResourceQuotas(in)

c.ResourceGroups = make([]ResourceGroup, len(in))
c.resourceNode.Quotas = make(map[resources.FlavorResource]ResourceQuota, 0)
for i, rgIn := range in {
rg := &c.ResourceGroups[i]
*rg = ResourceGroup{
CoveredResources: sets.New(rgIn.CoveredResources...),
Flavors: make([]kueue.ResourceFlavorReference, 0, len(rgIn.Flavors)),
}
for i := range rgIn.Flavors {
fIn := &rgIn.Flavors[i]
for _, rIn := range fIn.Resources {
nominal := resources.ResourceValue(rIn.Name, rIn.NominalQuota)
rQuota := ResourceQuota{
Nominal: nominal,
}
if rIn.BorrowingLimit != nil {
rQuota.BorrowingLimit = ptr.To(resources.ResourceValue(rIn.Name, *rIn.BorrowingLimit))
}
if features.Enabled(features.LendingLimit) && rIn.LendingLimit != nil {
rQuota.LendingLimit = ptr.To(resources.ResourceValue(rIn.Name, *rIn.LendingLimit))
}
c.resourceNode.Quotas[resources.FlavorResource{Flavor: fIn.Name, Resource: rIn.Name}] = rQuota
}
rg.Flavors = append(rg.Flavors, fIn.Name)
}
}
// Start at 1, for backwards compatibility.
return c.AllocatableResourceGeneration == 0 ||
!equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) ||
Expand Down
57 changes: 57 additions & 0 deletions pkg/cache/cohort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/hierarchy"
)

// cohort is a set of ClusterQueues that can borrow resources from each other.
type cohort struct {
Name string
hierarchy.Cohort[*clusterQueue, *cohort]

resourceNode ResourceNode
}

func newCohort(name string) *cohort {
return &cohort{
name,
hierarchy.NewCohort[*clusterQueue, *cohort](),
NewResourceNode(),
}
}

func (c *cohort) updateCohort(apiCohort *kueuealpha.Cohort) {
c.resourceNode.Quotas = createResourceQuotas(apiCohort.Spec.ResourceGroups)
updateCohortResourceNode(c)
}

func (c *cohort) GetName() string {
return c.Name
}

// implements hierarchicalResourceNode interface.

func (c *cohort) getResourceNode() ResourceNode {
return c.resourceNode
}

func (c *cohort) parentHRN() hierarchicalResourceNode {
return c.Parent()
}
54 changes: 54 additions & 0 deletions pkg/cache/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,63 @@ limitations under the License.
package cache

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
)

type ResourceGroup struct {
CoveredResources sets.Set[corev1.ResourceName]
Flavors []kueue.ResourceFlavorReference
// The set of key labels from all flavors.
// Those keys define the affinity terms of a workload
// that can be matched against the flavors.
LabelKeys sets.Set[string]
}

func (rg *ResourceGroup) Clone() ResourceGroup {
return ResourceGroup{
CoveredResources: rg.CoveredResources.Clone(),
Flavors: rg.Flavors,
LabelKeys: rg.LabelKeys.Clone(),
}
}

type ResourceQuota struct {
Nominal int64
BorrowingLimit *int64
LendingLimit *int64
}

func createResourceQuotas(kueueRgs []kueue.ResourceGroup) map[resources.FlavorResource]ResourceQuota {
frCount := 0
for _, rg := range kueueRgs {
frCount += len(rg.Flavors) * len(rg.CoveredResources)
}
quotas := make(map[resources.FlavorResource]ResourceQuota, frCount)
for _, kueueRg := range kueueRgs {
for _, kueueFlavor := range kueueRg.Flavors {
for _, kueueQuota := range kueueFlavor.Resources {
quota := ResourceQuota{
Nominal: resources.ResourceValue(kueueQuota.Name, kueueQuota.NominalQuota),
}
if kueueQuota.BorrowingLimit != nil {
quota.BorrowingLimit = ptr.To(resources.ResourceValue(kueueQuota.Name, *kueueQuota.BorrowingLimit))
}
if features.Enabled(features.LendingLimit) && kueueQuota.LendingLimit != nil {
quota.LendingLimit = ptr.To(resources.ResourceValue(kueueQuota.Name, *kueueQuota.LendingLimit))
}
quotas[resources.FlavorResource{Flavor: kueueFlavor.Name, Resource: kueueQuota.Name}] = quota
}
}
}
return quotas
}

type resourceGroupNode interface {
resourceGroups() []ResourceGroup
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ func updateClusterQueueResourceNode(cq *clusterQueue) {
func updateCohortResourceNode(cohort *cohort) {
cohort.resourceNode.SubtreeQuota = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.SubtreeQuota))
cohort.resourceNode.Usage = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.Usage))

for fr, quota := range cohort.resourceNode.Quotas {
cohort.resourceNode.SubtreeQuota[fr] = quota.Nominal
}
for _, child := range cohort.ChildCQs() {
updateClusterQueueResourceNode(child)
for fr, childQuota := range child.resourceNode.SubtreeQuota {
cohort.resourceNode.SubtreeQuota[fr] += childQuota - child.resourceNode.guaranteedQuota(fr)
}
Expand Down
Loading