Skip to content

Commit

Permalink
Use slice instead of set for resources in flavor.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed Oct 18, 2024
1 parent 045b4f1 commit 06e74f0
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 51 deletions.
16 changes: 6 additions & 10 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func getUsage(frq resources.FlavorResourceQuantities, cq *clusterQueue) []kueue.
Name: fName,
Resources: make([]kueue.ResourceUsage, 0, len(rg.CoveredResources)),
}
for rName := range rg.CoveredResources {
for _, rName := range rg.CoveredResources {
fr := resources.FlavorResource{Flavor: fName, Resource: rName}
rQuota := cq.QuotaFor(fr)
used := frq[fr]
Expand Down Expand Up @@ -700,21 +700,21 @@ func (c *Cache) LocalQueueUsage(qObj *kueue.LocalQueue) (*LocalQueueUsageStats,
if features.Enabled(features.ExposeFlavorsInLocalQueue) {
flavors := make(map[kueue.ResourceFlavorReference]kueue.LocalQueueFlavorStatus)

resourcesInFlavor := make(map[kueue.ResourceFlavorReference]sets.Set[corev1.ResourceName])
resourcesInFlavor := make(map[kueue.ResourceFlavorReference][]corev1.ResourceName)
for _, rg := range cqImpl.ResourceGroups {
for _, rgFlavor := range rg.Flavors {
if _, ok := resourcesInFlavor[rgFlavor]; !ok {
resourcesInFlavor[rgFlavor] = sets.New[corev1.ResourceName]()
resourcesInFlavor[rgFlavor] = make([]corev1.ResourceName, 0, len(rg.CoveredResources))
}
resourcesInFlavor[rgFlavor].Insert(rg.CoveredResources.UnsortedList()...)
resourcesInFlavor[rgFlavor] = append(resourcesInFlavor[rgFlavor], rg.CoveredResources...)
}
}

for _, rg := range cqImpl.ResourceGroups {
for _, rgFlavor := range rg.Flavors {
flavor := kueue.LocalQueueFlavorStatus{Name: rgFlavor}
if rif, ok := resourcesInFlavor[rgFlavor]; ok {
flavor.Resources = sets.List(rif)
flavor.Resources = rif
}
if rf, ok := c.resourceFlavors[rgFlavor]; ok {
flavor.NodeLabels = rf.Spec.NodeLabels
Expand Down Expand Up @@ -747,17 +747,13 @@ func filterLocalQueueUsage(orig resources.FlavorResourceQuantities, resourceGrou
Name: fName,
Resources: make([]kueue.LocalQueueResourceUsage, 0, len(rg.CoveredResources)),
}
for rName := range rg.CoveredResources {
for _, rName := range rg.CoveredResources {
fr := resources.FlavorResource{Flavor: fName, Resource: rName}
outFlvUsage.Resources = append(outFlvUsage.Resources, kueue.LocalQueueResourceUsage{
Name: rName,
Total: resources.ResourceQuantity(rName, orig[fr]),
})
}
// The resourceUsages should be in a stable order to avoid endless creation of update events.
sort.Slice(outFlvUsage.Resources, func(i, j int) bool {
return outFlvUsage.Resources[i].Name < outFlvUsage.Resources[j].Name
})
qFlvUsages = append(qFlvUsages, outFlvUsage)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func createdResourceGroups(kueueRgs []kueue.ResourceGroup) []ResourceGroup {
rgs := make([]ResourceGroup, len(kueueRgs))
for i, kueueRg := range kueueRgs {
rgs[i] = ResourceGroup{
CoveredResources: sets.New(kueueRg.CoveredResources...),
CoveredResources: append(make([]corev1.ResourceName, 0, len(kueueRg.CoveredResources)), kueueRg.CoveredResources...),
Flavors: make([]kueue.ResourceFlavorReference, 0, len(kueueRg.Flavors)),
}
for _, fIn := range kueueRg.Flavors {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package cache

import (
"slices"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -55,7 +57,7 @@ type ClusterQueueSnapshot struct {
// for the resource, or nil if the CQ doesn't provide this resource.
func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup {
for i := range c.ResourceGroups {
if c.ResourceGroups[i].CoveredResources.Has(resource) {
if slices.Contains(c.ResourceGroups[i].CoveredResources, resource) {
return &c.ResourceGroups[i]
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type ResourceGroup struct {
CoveredResources sets.Set[corev1.ResourceName]
CoveredResources []corev1.ResourceName
Flavors []kueue.ResourceFlavorReference
// The set of key labels from all flavors.
// Those keys define the affinity terms of a workload
Expand All @@ -37,7 +37,7 @@ type ResourceGroup struct {

func (rg *ResourceGroup) Clone() ResourceGroup {
return ResourceGroup{
CoveredResources: rg.CoveredResources.Clone(),
CoveredResources: append(make([]corev1.ResourceName, 0, len(rg.CoveredResources)), rg.CoveredResources...),
Flavors: rg.Flavors,
LabelKeys: rg.LabelKeys.Clone(),
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func flavorResources(r resourceGroupNode) []resources.FlavorResource {
frs := make([]resources.FlavorResource, 0, flavorResourceCount(r.resourceGroups()))
for _, rg := range r.resourceGroups() {
for _, f := range rg.Flavors {
for r := range rg.CoveredResources {
for _, r := range rg.CoveredResources {
frs = append(frs, resources.FlavorResource{Flavor: f, Resource: r})
}
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/cache/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 2,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"demand", "spot"},
LabelKeys: sets.New("instance"),
},
Expand Down Expand Up @@ -275,12 +275,12 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"spot"},
LabelKeys: sets.New("instance"),
},
{
CoveredResources: sets.New[corev1.ResourceName]("example.com/gpu"),
CoveredResources: []corev1.ResourceName{"example.com/gpu"},
Flavors: []kueue.ResourceFlavorReference{"default"},
},
},
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"default"},
},
},
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 2,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
LabelKeys: sets.New("arch"),
},
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
LabelKeys: sets.New("arch"),
},
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 2,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
LabelKeys: sets.New("arch"),
},
Expand Down Expand Up @@ -689,7 +689,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
LabelKeys: sets.New("arch"),
},
Expand Down Expand Up @@ -752,7 +752,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 2,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm", "x86"},
},
},
Expand Down Expand Up @@ -838,7 +838,7 @@ func TestSnapshot(t *testing.T) {
AllocatableResourceGeneration: 2,
ResourceGroups: []ResourceGroup{
{
CoveredResources: sets.New(corev1.ResourceCPU),
CoveredResources: []corev1.ResourceName{corev1.ResourceCPU},
Flavors: []kueue.ResourceFlavorReference{"arm"},
},
},
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestSnapshot(t *testing.T) {
for _, cq := range snapshot.ClusterQueues {
for i := range cq.ResourceGroups {
rg := &cq.ResourceGroups[i]
for rName := range rg.CoveredResources {
for _, rName := range rg.CoveredResources {
if cq.RGByResource(rName) != rg {
t.Errorf("RGByResource[%s] does return its resource group", rName)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package flavorassigner
import (
"errors"
"fmt"
"slices"
"sort"
"strings"

Expand Down Expand Up @@ -636,10 +637,10 @@ func (a *FlavorAssigner) canPreemptWhileBorrowing() bool {
(a.enableFairSharing && a.cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyNever)
}

func filterRequestedResources(req resources.Requests, allowList sets.Set[corev1.ResourceName]) resources.Requests {
func filterRequestedResources(req resources.Requests, allowList []corev1.ResourceName) resources.Requests {
filtered := make(resources.Requests)
for n, v := range req {
if allowList.Has(n) {
if slices.Contains(allowList, n) {
filtered[n] = v
}
}
Expand Down
Loading

0 comments on commit 06e74f0

Please sign in to comment.