Skip to content

Commit

Permalink
Support Cohorts with Resources in cache/hierarchy packages
Browse files Browse the repository at this point in the history
  • Loading branch information
gabesaba committed Aug 30, 2024
1 parent be4a666 commit e4dcb28
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 9 deletions.
22 changes: 22 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/hierarchy"
Expand Down Expand Up @@ -397,6 +398,27 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
metrics.ClearCacheMetrics(cq.Name)
}

func (c *Cache) AddCohort(apiCohort *kueuealpha.Cohort) {
c.Lock()
defer c.Unlock()
cohort := newCohort(apiCohort.Name)
c.hm.AddCohort(cohort)
cohort.updateCohort(apiCohort)
}

func (c *Cache) DeleteCohort(apiCohort *kueuealpha.Cohort) {
c.Lock()
defer c.Unlock()
c.hm.DeleteCohort(apiCohort.Name)

// if the cohort still exists after deletion, it means
// that it has one or more children referencing it.
// We need to run update algorithm.
if cohort, ok := c.hm.Cohorts[apiCohort.Name]; ok {
updateCohortResourceNode(cohort)
}
}

func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error {
c.Lock()
defer c.Unlock()
Expand Down
47 changes: 47 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,53 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
},
},
{
name: "create cohort",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)
return nil
},
wantCohorts: map[string]sets.Set[string]{
"cohort": nil,
},
},
{
name: "create and delete cohort",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)
cache.DeleteCohort(cohort)
return nil
},
wantCohorts: nil,
},
{
name: "cohort remains after deletion when child exists",
operation: func(cache *Cache) error {
cohort := utiltesting.MakeCohort("cohort").Obj()
cache.AddCohort(cohort)

_ = cache.AddClusterQueue(context.Background(),
utiltesting.MakeClusterQueue("cq").Cohort("cohort").Obj())
cache.DeleteCohort(cohort)
return nil
},
wantClusterQueues: map[string]*clusterQueue{
"cq": {
Name: "cq",
NamespaceSelector: labels.Everything(),
Status: active,
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
"cohort": sets.New("cq"),
},
},
}

for _, tc := range cases {
Expand Down
6 changes: 5 additions & 1 deletion pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,17 @@ var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borr

func (c *clusterQueue) updateClusterQueue(in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks map[string]AdmissionCheck, oldParent *cohort) error {
if c.updateQuotasAndResourceGroups(in.Spec.ResourceGroups) || oldParent != c.Parent() {
updateClusterQueueResourceNode(c)
c.AllocatableResourceGeneration += 1
if oldParent != nil && oldParent != c.Parent() {
updateCohortResourceNode(oldParent)
}
if c.HasParent() {
// clusterQueue will be updated as part of tree update.
updateCohortResourceNode(c.Parent())
} else {
// since ClusterQueue has no parent, it won't be updated
// as part of tree update.
updateClusterQueueResourceNode(c)
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/cache/cohort.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/hierarchy"
)

Expand All @@ -36,6 +37,11 @@ func newCohort(name string) *cohort {
}
}

func (c *cohort) updateCohort(apiCohort *kueuealpha.Cohort) {
c.resourceNode.Quotas = createResourceQuotas(apiCohort.Spec.ResourceGroups)
updateCohortResourceNode(c)
}

func (c *cohort) GetName() string {
return c.Name
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/resource_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ func updateClusterQueueResourceNode(cq *clusterQueue) {
func updateCohortResourceNode(cohort *cohort) {
cohort.resourceNode.SubtreeQuota = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.SubtreeQuota))
cohort.resourceNode.Usage = make(resources.FlavorResourceQuantities, len(cohort.resourceNode.Usage))

for fr, quota := range cohort.resourceNode.Quotas {
cohort.resourceNode.SubtreeQuota[fr] = quota.Nominal
}
for _, child := range cohort.ChildCQs() {
updateClusterQueueResourceNode(child)
for fr, childQuota := range child.resourceNode.SubtreeQuota {
cohort.resourceNode.SubtreeQuota[fr] += childQuota - child.resourceNode.guaranteedQuota(fr)
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/cache/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
Expand All @@ -45,6 +46,7 @@ var snapCmpOpts = []cmp.Option{
func TestSnapshot(t *testing.T) {
testCases := map[string]struct {
cqs []*kueue.ClusterQueue
cohorts []*kueuealpha.Cohort
rfs []*kueue.ResourceFlavor
wls []*kueue.Workload
wantSnapshot Snapshot
Expand Down Expand Up @@ -699,6 +701,82 @@ func TestSnapshot(t *testing.T) {
}
}(),
},
"cohort provides resources": {
rfs: []*kueue.ResourceFlavor{
utiltesting.MakeResourceFlavor("arm").Obj(),
utiltesting.MakeResourceFlavor("x86").Obj(),
utiltesting.MakeResourceFlavor("mips").Obj(),
},
cqs: []*kueue.ClusterQueue{
utiltesting.MakeClusterQueue("cq").
Cohort("cohort").
ResourceGroup(
*utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "7", "", "3").Obj(),
*utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "5").Obj(),
).
Obj(),
},
cohorts: []*kueuealpha.Cohort{
utiltesting.MakeCohort("cohort").
ResourceGroup(
*utiltesting.MakeFlavorQuotas("arm").Resource(corev1.ResourceCPU, "10").Obj(),
*utiltesting.MakeFlavorQuotas("x86").Resource(corev1.ResourceCPU, "20").Obj(),
).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("mips").Resource(corev1.ResourceCPU, "42").Obj(),
).Obj(),
},
wantSnapshot: Snapshot{
ClusterQueues: map[string]*ClusterQueueSnapshot{
"cq": {
Name: "cq",
AllocatableResourceGeneration: 1,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
},
},
ResourceNode: ResourceNode{
Quotas: map[resources.FlavorResource]ResourceQuota{
{Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 7_000, BorrowingLimit: nil, LendingLimit: ptr.To[int64](3_000)},
{Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 5_000, BorrowingLimit: nil, LendingLimit: nil},
},
SubtreeQuota: resources.FlavorResourceQuantities{
{Flavor: "arm", Resource: corev1.ResourceCPU}: 7_000,
{Flavor: "x86", Resource: corev1.ResourceCPU}: 5_000,
},
},
FlavorFungibility: defaultFlavorFungibility,
FairWeight: oneQuantity,
Preemption: defaultPreemption,
NamespaceSelector: labels.Everything(),
Status: active,
Cohort: &CohortSnapshot{
Name: "cohort",
AllocatableResourceGeneration: 1,
ResourceNode: ResourceNode{
Quotas: map[resources.FlavorResource]ResourceQuota{
{Flavor: "arm", Resource: corev1.ResourceCPU}: {Nominal: 10_000, BorrowingLimit: nil, LendingLimit: nil},
{Flavor: "x86", Resource: corev1.ResourceCPU}: {Nominal: 20_000, BorrowingLimit: nil, LendingLimit: nil},
{Flavor: "mips", Resource: corev1.ResourceCPU}: {Nominal: 42_000, BorrowingLimit: nil, LendingLimit: nil},
},
SubtreeQuota: resources.FlavorResourceQuantities{
{Flavor: "arm", Resource: corev1.ResourceCPU}: 13_000,
{Flavor: "x86", Resource: corev1.ResourceCPU}: 25_000,
{Flavor: "mips", Resource: corev1.ResourceCPU}: 42_000,
},
},
},
},
},
ResourceFlavors: map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor{
"arm": utiltesting.MakeResourceFlavor("arm").Obj(),
"x86": utiltesting.MakeResourceFlavor("x86").Obj(),
"mips": utiltesting.MakeResourceFlavor("mips").Obj(),
},
},
},
}

for name, tc := range testCases {
Expand All @@ -714,6 +792,9 @@ func TestSnapshot(t *testing.T) {
t.Fatalf("Failed adding ClusterQueue: %v", err)
}
}
for _, cohort := range tc.cohorts {
cache.AddCohort(cohort)
}
for _, rf := range tc.rfs {
cache.AddOrUpdateResourceFlavor(rf)
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/hierarchy/cohort.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import "k8s.io/apimachinery/pkg/util/sets"

type Cohort[CQ, C nodeBase] struct {
childCqs sets.Set[CQ]
// Indicates whether this Cohort is backed
// by an API object.
explicit bool
}

func (c *Cohort[CQ, C]) Parent() C {
Expand Down Expand Up @@ -52,3 +55,11 @@ func (c *Cohort[CQ, C]) deleteClusterQueue(cq CQ) {
func (c *Cohort[CQ, C]) childCount() int {
return c.childCqs.Len()
}

func (c *Cohort[CQ, C]) isExplicit() bool {
return c.explicit
}

func (c *Cohort[CQ, C]) markExplicit() {
c.explicit = true
}
32 changes: 31 additions & 1 deletion pkg/hierarchy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,35 @@ func (c *Manager[CQ, C]) DeleteClusterQueue(name string) {
if cq, ok := c.ClusterQueues[name]; ok {
c.unwireClusterQueue(cq)
delete(c.ClusterQueues, name)
}
}

func (c *Manager[CQ, C]) AddCohort(cohort C) {
cohort.markExplicit()
if oldCohort, ok := c.Cohorts[cohort.GetName()]; ok {
c.rewireChildren(oldCohort, cohort)
}
c.Cohorts[cohort.GetName()] = cohort
}

func (c *Manager[CQ, C]) DeleteCohort(name string) {
cohort, ok := c.Cohorts[name]
delete(c.Cohorts, name)
if !ok || cohort.childCount() == 0 {
return
}
implicitCohort := c.cohortFactory(name)
c.Cohorts[implicitCohort.GetName()] = implicitCohort
c.rewireChildren(cohort, implicitCohort)
}

// rewireChildren is used when we are changing a Cohort
// from an implicit to an explcit Cohort, or vice-versa.
func (c *Manager[CQ, C]) rewireChildren(old, new C) {
for _, cq := range old.ChildCQs() {
cq.setParent(new)
new.insertClusterQueue(cq)
}
}

func (c *Manager[CQ, C]) unwireClusterQueue(cq CQ) {
Expand All @@ -75,7 +102,7 @@ func (c *Manager[CQ, C]) getOrCreateCohort(cohortName string) C {
}

func (c *Manager[CQ, C]) cleanupCohort(cohort C) {
if cohort.childCount() == 0 {
if !cohort.isExplicit() && cohort.childCount() == 0 {
delete(c.Cohorts, cohort.GetName())
}
}
Expand All @@ -96,5 +123,8 @@ type cohortNode[CQ nodeBase] interface {
insertClusterQueue(CQ)
deleteClusterQueue(CQ)
childCount() int
ChildCQs() []CQ
isExplicit() bool
markExplicit()
nodeBase
}
Loading

0 comments on commit e4dcb28

Please sign in to comment.