Skip to content

Commit

Permalink
Update Snapshotting logic to use hierarchy.Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gabesaba committed Sep 5, 2024
1 parent 4c50cbc commit 42dab87
Show file tree
Hide file tree
Showing 9 changed files with 777 additions and 726 deletions.
15 changes: 6 additions & 9 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/workload"
)

type ClusterQueueSnapshot struct {
Name string
Cohort *CohortSnapshot
ResourceGroups []ResourceGroup
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
Expand All @@ -48,6 +48,7 @@ type ClusterQueueSnapshot struct {
AllocatableResourceGeneration int64

ResourceNode ResourceNode
hierarchy.ClusterQueue[*CohortSnapshot]
}

// RGByResource returns the ResourceGroup which contains capacity
Expand Down Expand Up @@ -108,17 +109,9 @@ func (c *ClusterQueueSnapshot) PotentialAvailable(fr resources.FlavorResource) i
return potentialAvailable(c, fr)
}

func (c *ClusterQueueSnapshot) Parent() *CohortSnapshot {
return c.Cohort
}

// The methods below implement several interfaces. See
// dominantResourceShareNode, resourceGroupNode, and netQuotaNode.

func (c *ClusterQueueSnapshot) HasParent() bool {
return c.Cohort != nil
}

func (c *ClusterQueueSnapshot) fairWeight() *resource.Quantity {
return &c.FairWeight
}
Expand All @@ -135,6 +128,10 @@ func (c *ClusterQueueSnapshot) parentResources() ResourceNode {
return c.Parent().ResourceNode
}

func (c *ClusterQueueSnapshot) GetName() string {
return c.Name
}

// The methods below implement hierarchicalResourceNode interface.

func (c *ClusterQueueSnapshot) getResourceNode() ResourceNode {
Expand Down
18 changes: 8 additions & 10 deletions pkg/cache/cohort_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,25 @@ limitations under the License.

package cache

import (
"k8s.io/apimachinery/pkg/util/sets"
)
import "sigs.k8s.io/kueue/pkg/hierarchy"

type CohortSnapshot struct {
Name string
Members sets.Set[*ClusterQueueSnapshot]
Name string

ResourceNode ResourceNode
hierarchy.Cohort[*ClusterQueueSnapshot, *CohortSnapshot]
}

// The methods below implement hierarchicalResourceNode interface.

func (c *CohortSnapshot) HasParent() bool {
return false
func (c *CohortSnapshot) GetName() string {
return c.Name
}

// The methods below implement hierarchicalResourceNode interface.

func (c *CohortSnapshot) getResourceNode() ResourceNode {
return c.ResourceNode
}

func (c *CohortSnapshot) parentHRN() hierarchicalResourceNode {
return nil
return c.Parent()
}
52 changes: 26 additions & 26 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"k8s.io/klog/v2"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/hierarchy"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
)

type Snapshot struct {
ClusterQueues map[string]*ClusterQueueSnapshot
hierarchy.Manager[*ClusterQueueSnapshot, *CohortSnapshot]
ResourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor
InactiveClusterQueueSets sets.Set[string]
}
Expand All @@ -51,12 +52,10 @@ func (s *Snapshot) AddWorkload(wl *workload.Info) {
}

func (s *Snapshot) Log(log logr.Logger) {
cohorts := make(map[string]*CohortSnapshot)
for name, cq := range s.ClusterQueues {
cohortName := "<none>"
if cq.Cohort != nil {
cohortName = cq.Cohort.Name
cohorts[cohortName] = cq.Cohort
if cq.HasParent() {
cohortName = cq.Parent().Name
}

log.Info("Found ClusterQueue",
Expand All @@ -67,7 +66,7 @@ func (s *Snapshot) Log(log logr.Logger) {
"workloads", utilmaps.Keys(cq.Workloads),
)
}
for name, cohort := range cohorts {
for name, cohort := range s.Cohorts {
log.Info("Found cohort",
"cohort", name,
"resources", cohort.ResourceNode.SubtreeQuota,
Expand All @@ -81,30 +80,33 @@ func (c *Cache) Snapshot() Snapshot {
defer c.RUnlock()

snap := Snapshot{
ClusterQueues: make(map[string]*ClusterQueueSnapshot, len(c.hm.ClusterQueues)),
Manager: hierarchy.NewManager(newCohortSnapshot),
ResourceFlavors: make(map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, len(c.resourceFlavors)),
InactiveClusterQueueSets: sets.New[string](),
}
for _, cohort := range c.hm.Cohorts {
snap.AddCohort(snapshotCohort(cohort))
}
for _, cq := range c.hm.ClusterQueues {
if !cq.Active() {
snap.InactiveClusterQueueSets.Insert(cq.Name)
continue
}
snap.ClusterQueues[cq.Name] = cq.snapshot()
snap.AddClusterQueue(snapshotClusterQueue(cq))
if cq.HasParent() {
snap.UpdateClusterQueueEdge(cq.Name, cq.Parent().Name)
}
}
for name, rf := range c.resourceFlavors {
// Shallow copy is enough
snap.ResourceFlavors[name] = rf
}
for _, cohort := range c.hm.Cohorts {
cohort.snapshotInto(snap.ClusterQueues)
}
return snap
}

// snapshot creates a copy of ClusterQueue that includes references to immutable
// objects and deep copies of changing ones. A reference to the cohort is not included.
func (c *clusterQueue) snapshot() *ClusterQueueSnapshot {
// snapshotClusterQueue creates a copy of ClusterQueue that includes
// references to immutable objects and deep copies of changing ones.
func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot {
cc := &ClusterQueueSnapshot{
Name: c.Name,
ResourceGroups: make([]ResourceGroup, len(c.ResourceGroups)),
Expand All @@ -124,17 +126,15 @@ func (c *clusterQueue) snapshot() *ClusterQueueSnapshot {
return cc
}

func (c *cohort) snapshotInto(cqs map[string]*ClusterQueueSnapshot) {
cohortSnap := &CohortSnapshot{
Name: c.Name,
Members: make(sets.Set[*ClusterQueueSnapshot], len(c.ChildCQs())),
ResourceNode: c.resourceNode.Clone(),
}
for _, cq := range c.ChildCQs() {
if cq.Active() {
cqSnap := cqs[cq.Name]
cqSnap.Cohort = cohortSnap
cohortSnap.Members.Insert(cqSnap)
}
func snapshotCohort(c *cohort) *CohortSnapshot {
snapshot := newCohortSnapshot(c.Name)
snapshot.ResourceNode = c.resourceNode.Clone()
return snapshot
}

func newCohortSnapshot(name string) *CohortSnapshot {
return &CohortSnapshot{
Name: name,
Cohort: hierarchy.NewCohort[*ClusterQueueSnapshot, *CohortSnapshot](),
}
}
Loading

0 comments on commit 42dab87

Please sign in to comment.