diff --git a/PROJECT b/PROJECT index dab69c36a0..af89701cd7 100644 --- a/PROJECT +++ b/PROJECT @@ -15,7 +15,7 @@ resources: version: v1alpha1 - api: crdVersion: v1 - namespaced: true + namespaced: false controller: true domain: x-k8s.io group: kueue @@ -33,7 +33,7 @@ resources: version: v1alpha1 - api: crdVersion: v1 - namespaced: true + namespaced: false domain: x-k8s.io group: kueue kind: ResourceFlavor diff --git a/api/v1alpha1/clusterqueue_types.go b/api/v1alpha1/clusterqueue_types.go index 0bd05f5919..63941512ad 100644 --- a/api/v1alpha1/clusterqueue_types.go +++ b/api/v1alpha1/clusterqueue_types.go @@ -221,29 +221,22 @@ type Resource struct { // guaranteed: 100 // // +listType=map - // +listMapKey=name + // +listMapKey=resourceFlavor Flavors []Flavor `json:"flavors,omitempty"` } type Flavor struct { - // name is the type name, e.g., nvidia-tesla-k80. + // resourceFlavor is a reference to the resourceFlavor that defines this flavor. // +kubebuilder:default=default - Name string `json:"name"` + ResourceFlavor ResourceFlavorReference `json:"resourceFlavor"` // quota is the limit of resource usage at a point in time. Quota Quota `json:"quota"` - - // labels associated with this type. Those labels are matched against or - // converted to node affinity constraints on the workload’s pods. - // For example, cloud.provider.com/accelerator: nvidia-tesla-k80. - Labels map[string]string `json:"labels,omitempty"` - - // taints associated with this constraint that workloads must explicitly - // “tolerate” to be able to use this type. - // e.g., cloud.provider.com/preemptible="true":NoSchedule - Taints []corev1.Taint `json:"taints,omitempty"` } +// ResourceFlavorReference is the name of the ResourceFlavor. +type ResourceFlavorReference string + type Quota struct { // guaranteed amount of resource requests that are available to be used by // running workloads assigned to this quota. This value should not exceed diff --git a/api/v1alpha1/resourceflavor_types.go b/api/v1alpha1/resourceflavor_types.go index 50ae436d72..b43b090255 100644 --- a/api/v1alpha1/resourceflavor_types.go +++ b/api/v1alpha1/resourceflavor_types.go @@ -22,7 +22,7 @@ import ( ) //+kubebuilder:object:root=true -//+kubebuilder:subresource:status +//+kubebuilder:resource:scope=Cluster // ResourceFlavor is the Schema for the resourceflavors API type ResourceFlavor struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 666df1dae1..054cbe3a0e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -171,20 +171,6 @@ func (in *ClusterQueueStatus) DeepCopy() *ClusterQueueStatus { func (in *Flavor) DeepCopyInto(out *Flavor) { *out = *in in.Quota.DeepCopyInto(&out.Quota) - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } - if in.Taints != nil { - in, out := &in.Taints, &out.Taints - *out = make([]corev1.Taint, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Flavor. diff --git a/config/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index 092bd1b0a4..fa628da8ed 100644 --- a/config/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -186,18 +186,6 @@ spec: name: on-demand quota: guaranteed: 100" items: properties: - labels: - additionalProperties: - type: string - description: 'labels associated with this type. Those - labels are matched against or converted to node affinity - constraints on the workload’s pods. For example, cloud.provider.com/accelerator: - nvidia-tesla-k80.' - type: object - name: - default: default - description: name is the type name, e.g., nvidia-tesla-k80. - type: string quota: description: quota is the limit of resource usage at a point in time. @@ -227,46 +215,18 @@ spec: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true type: object - taints: - description: taints associated with this constraint that - workloads must explicitly “tolerate” to be able to use - this type. e.g., cloud.provider.com/preemptible="true":NoSchedule - items: - description: The node this Taint is attached to has - the "effect" on any pod that does not tolerate the - Taint. - properties: - effect: - description: Required. The effect of the taint on - pods that do not tolerate the taint. Valid effects - are NoSchedule, PreferNoSchedule and NoExecute. - type: string - key: - description: Required. The taint key to be applied - to a node. - type: string - timeAdded: - description: TimeAdded represents the time at which - the taint was added. It is only written for NoExecute - taints. - format: date-time - type: string - value: - description: The taint value corresponding to the - taint key. - type: string - required: - - effect - - key - type: object - type: array + resourceFlavor: + default: default + description: resourceFlavor is a reference to the resourceFlavor + that defines this flavor. + type: string required: - - name - quota + - resourceFlavor type: object type: array x-kubernetes-list-map-keys: - - name + - resourceFlavor x-kubernetes-list-type: map name: description: name of the resource. For example, cpu, memory diff --git a/config/crd/bases/kueue.x-k8s.io_resourceflavors.yaml b/config/crd/bases/kueue.x-k8s.io_resourceflavors.yaml index 6cfbe30133..02d92b5476 100644 --- a/config/crd/bases/kueue.x-k8s.io_resourceflavors.yaml +++ b/config/crd/bases/kueue.x-k8s.io_resourceflavors.yaml @@ -13,7 +13,7 @@ spec: listKind: ResourceFlavorList plural: resourceflavors singular: resourceflavor - scope: Namespaced + scope: Cluster versions: - name: v1alpha1 schema: @@ -70,8 +70,6 @@ spec: type: object served: true storage: true - subresources: - status: {} status: acceptedNames: kind: "" diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index a50c5f02f7..7d719eb8e1 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -115,6 +115,14 @@ rules: - get - patch - update +- apiGroups: + - kueue.x-k8s.io + resources: + - resourceflavors + verbs: + - get + - list + - watch - apiGroups: - scheduling.k8s.io resources: diff --git a/config/samples/minimal.yaml b/config/samples/minimal.yaml index 95c4630287..dff028f0e2 100644 --- a/config/samples/minimal.yaml +++ b/config/samples/minimal.yaml @@ -1,3 +1,7 @@ +apiVersion: kueue.x-k8s.io/v1alpha1 +kind: ResourceFlavor +metadata: + name: default --- apiVersion: kueue.x-k8s.io/v1alpha1 kind: ClusterQueue @@ -8,13 +12,13 @@ spec: requestableResources: - name: "cpu" flavors: - - name: default + - resourceFlavor: default quota: guaranteed: 9 ceiling: 9 - name: "memory" flavors: - - name: default + - resourceFlavor: default quota: guaranteed: 36Gi ceiling: 36Gi diff --git a/main.go b/main.go index 01f20525e1..3bdb49f1f3 100644 --- a/main.go +++ b/main.go @@ -105,6 +105,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "QueuedWorkload") os.Exit(1) } + if err = core.NewResourceFlavorReconciler(cache).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ResourceFlavor") + os.Exit(1) + } if err = job.NewReconciler(mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor(constants.JobControllerName)).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Job") diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index dbae0dea1a..d8bc033bc4 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,6 +46,7 @@ type Cache struct { clusterQueues map[string]*ClusterQueue cohorts map[string]*Cohort assumedWorkloads map[string]string + resourceFlavors map[string]*kueue.ResourceFlavor } func New(client client.Client) *Cache { @@ -54,6 +55,7 @@ func New(client client.Client) *Cache { clusterQueues: make(map[string]*ClusterQueue), cohorts: make(map[string]*Cohort), assumedWorkloads: make(map[string]string), + resourceFlavors: make(map[string]*kueue.ResourceFlavor), } } @@ -96,30 +98,28 @@ type FlavorLimits struct { Name string Guaranteed int64 Ceiling int64 - Taints []corev1.Taint - Labels map[string]string } -func NewClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) { - c := &ClusterQueue{ +func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*ClusterQueue, error) { + cqImpl := &ClusterQueue{ Name: cq.Name, Workloads: map[string]*workload.Info{}, } - if err := c.update(cq); err != nil { + if err := cqImpl.update(cq, c.resourceFlavors); err != nil { return nil, err } - return c, nil + return cqImpl, nil } -func (c *ClusterQueue) update(in *kueue.ClusterQueue) error { - c.RequestableResources = resourcesByName(in.Spec.RequestableResources) +func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[string]*kueue.ResourceFlavor) error { + c.RequestableResources = resourceLimitsByName(in.Spec.RequestableResources) nsSelector, err := metav1.LabelSelectorAsSelector(in.Spec.NamespaceSelector) if err != nil { return err } c.NamespaceSelector = nsSelector - labelKeys := map[corev1.ResourceName]sets.String{} + usedResources := make(Resources, len(in.Spec.RequestableResources)) for _, r := range in.Spec.RequestableResources { if len(r.Flavors) == 0 { @@ -128,25 +128,44 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue) error { existingUsedFlavors := c.UsedResources[r.Name] usedFlavors := make(map[string]int64, len(r.Flavors)) + for _, f := range r.Flavors { + usedFlavors[string(f.ResourceFlavor)] = existingUsedFlavors[string(f.ResourceFlavor)] + } + usedResources[r.Name] = usedFlavors + } + c.UsedResources = usedResources + c.UpdateLabelKeys(resourceFlavors) + return nil +} + +// UpdateLabelkeys updates a ClusterQueue's LabelKeys based on the passed ResourceFlavors set. +// Exported only for testing. +func (c *ClusterQueue) UpdateLabelKeys(flavors map[string]*kueue.ResourceFlavor) { + labelKeys := map[corev1.ResourceName]sets.String{} + for rName, flvLimits := range c.RequestableResources { + if len(flvLimits) == 0 { + continue + } resKeys := sets.NewString() - for _, t := range r.Flavors { - usedFlavors[t.Name] = existingUsedFlavors[t.Name] - for k := range t.Labels { - resKeys.Insert(k) + for _, l := range flvLimits { + if flv, exist := flavors[l.Name]; exist { + for k := range flv.Labels { + resKeys.Insert(k) + } } + // We don't error here on missing flavor since ResourceFlavor add events may + // come after ClusterQueue add/update. } - usedResources[r.Name] = usedFlavors if len(resKeys) != 0 { - labelKeys[r.Name] = resKeys + labelKeys[rName] = resKeys } } - c.UsedResources = usedResources + c.LabelKeys = nil if len(labelKeys) != 0 { c.LabelKeys = labelKeys } - return nil } func (c *ClusterQueue) addWorkload(w *kueue.QueuedWorkload) error { @@ -184,6 +203,24 @@ func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { } } +func (c *Cache) AddOrUpdateResourceFlavor(rf *kueue.ResourceFlavor) { + c.Lock() + c.resourceFlavors[rf.Name] = rf + for _, cq := range c.clusterQueues { + // We call update on all ClusterQueues irrespective of which CQ actually use this flavor + // because it is not expensive to do so, and is not worth tracking which ClusterQueues use + // which flavors. + cq.UpdateLabelKeys(c.resourceFlavors) + } + c.Unlock() +} + +func (c *Cache) DeleteResourceFlavor(rf *kueue.ResourceFlavor) { + c.Lock() + delete(c.resourceFlavors, rf.Name) + c.Unlock() +} + func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error { c.Lock() defer c.Unlock() @@ -191,7 +228,7 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err if _, ok := c.clusterQueues[cq.Name]; ok { return fmt.Errorf("ClusterQueue already exists") } - cqImpl, err := NewClusterQueue(cq) + cqImpl, err := c.newClusterQueue(cq) if err != nil { return err } @@ -222,7 +259,7 @@ func (c *Cache) UpdateClusterQueue(cq *kueue.ClusterQueue) error { if !ok { return errCqNotFound } - if err := cqImpl.update(cq); err != nil { + if err := cqImpl.update(cq, c.resourceFlavors); err != nil { return err } if cqImpl.Cohort != nil { @@ -427,25 +464,18 @@ func (c *Cache) deleteClusterQueueFromCohort(cq *ClusterQueue) { cq.Cohort = nil } -func resourcesByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorLimits { +func resourceLimitsByName(in []kueue.Resource) map[corev1.ResourceName][]FlavorLimits { out := make(map[corev1.ResourceName][]FlavorLimits, len(in)) for _, r := range in { flavors := make([]FlavorLimits, len(r.Flavors)) for i := range flavors { f := &r.Flavors[i] - fInfo := FlavorLimits{ - Name: f.Name, + fLimits := FlavorLimits{ + Name: string(f.ResourceFlavor), Guaranteed: workload.ResourceValue(r.Name, f.Quota.Guaranteed), Ceiling: workload.ResourceValue(r.Name, f.Quota.Ceiling), - Taints: append([]corev1.Taint(nil), f.Taints...), - } - if len(f.Labels) != 0 { - fInfo.Labels = make(map[string]string, len(f.Labels)) - for k, v := range f.Labels { - fInfo.Labels[k] = v - } } - flavors[i] = fInfo + flavors[i] = fLimits } out[r.Name] = flavors diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 70a2f5e645..cdeb94e310 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -38,72 +38,126 @@ import ( ) func TestCacheClusterQueueOperations(t *testing.T) { + initialClusterQueues := []kueue.ClusterQueue{ + { + ObjectMeta: metav1.ObjectMeta{Name: "a"}, + Spec: kueue.ClusterQueueSpec{ + RequestableResources: []kueue.Resource{ + { + Name: corev1.ResourceCPU, + Flavors: []kueue.Flavor{ + { + ResourceFlavor: "default", + Quota: kueue.Quota{ + Guaranteed: resource.MustParse("10"), + Ceiling: resource.MustParse("20"), + }, + }, + }, + }}, + Cohort: "one", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "b"}, + Spec: kueue.ClusterQueueSpec{ + RequestableResources: []kueue.Resource{ + { + Name: corev1.ResourceCPU, + Flavors: []kueue.Flavor{{ResourceFlavor: "default"}}, + }}, + Cohort: "one", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "c"}, + Spec: kueue.ClusterQueueSpec{Cohort: "two"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "d"}, + }, + } + setup := func(cache *Cache) { + cache.AddOrUpdateResourceFlavor(&kueue.ResourceFlavor{ + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + Labels: map[string]string{"cpuType": "default"}, + }) + for _, c := range initialClusterQueues { + if err := cache.AddClusterQueue(context.Background(), &c); err != nil { + t.Fatalf("Failed adding ClusterQueue: %v", err) + } + } + } scheme := runtime.NewScheme() if err := kueue.AddToScheme(scheme); err != nil { t.Fatalf("Failed adding kueue scheme: %v", err) } - cache := New(fake.NewClientBuilder().WithScheme(scheme).Build()) - steps := []struct { + cases := []struct { name string - operation func() + operation func(*Cache) wantClusterQueues map[string]*ClusterQueue wantCohorts map[string]sets.String }{ { name: "add", - operation: func() { - clusterQueues := []kueue.ClusterQueue{ - { - ObjectMeta: metav1.ObjectMeta{Name: "a"}, - Spec: kueue.ClusterQueueSpec{ - RequestableResources: []kueue.Resource{ - { - Name: corev1.ResourceCPU, - Flavors: []kueue.Flavor{ - { - Name: "default", - Quota: kueue.Quota{ - Guaranteed: resource.MustParse("10"), - Ceiling: resource.MustParse("20"), - }, - Labels: map[string]string{"cpuType": "default"}, - }, - }, - }}, - Cohort: "one", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "b"}, - Spec: kueue.ClusterQueueSpec{ - RequestableResources: []kueue.Resource{ - { - Name: corev1.ResourceCPU, - Flavors: []kueue.Flavor{{Name: "default"}}, - }}, - Cohort: "one", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "c"}, - Spec: kueue.ClusterQueueSpec{Cohort: "two"}, + operation: func(cache *Cache) { + setup(cache) + }, + wantClusterQueues: map[string]*ClusterQueue{ + "a": { + Name: "a", + RequestableResources: map[corev1.ResourceName][]FlavorLimits{ + corev1.ResourceCPU: {{Name: "default", Guaranteed: 10000, Ceiling: 20000}}, }, - { - ObjectMeta: metav1.ObjectMeta{Name: "d"}, + NamespaceSelector: labels.Nothing(), + LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, + UsedResources: Resources{corev1.ResourceCPU: {"default": 0}}, + }, + "b": { + Name: "b", + RequestableResources: map[corev1.ResourceName][]FlavorLimits{ + corev1.ResourceCPU: {{Name: "default"}}, }, - } - for _, c := range clusterQueues { + NamespaceSelector: labels.Nothing(), + UsedResources: Resources{corev1.ResourceCPU: {"default": 0}}, + LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, + }, + "c": { + Name: "c", + RequestableResources: map[corev1.ResourceName][]FlavorLimits{}, + NamespaceSelector: labels.Nothing(), + UsedResources: Resources{}, + }, + "d": { + Name: "d", + RequestableResources: map[corev1.ResourceName][]FlavorLimits{}, + NamespaceSelector: labels.Nothing(), + UsedResources: Resources{}, + }, + }, + wantCohorts: map[string]sets.String{ + "one": sets.NewString("a", "b"), + "two": sets.NewString("c"), + }, + }, + { + name: "add flavors after queue capacties", + operation: func(cache *Cache) { + for _, c := range initialClusterQueues { if err := cache.AddClusterQueue(context.Background(), &c); err != nil { t.Fatalf("Failed adding ClusterQueue: %v", err) } } + cache.AddOrUpdateResourceFlavor(&kueue.ResourceFlavor{ + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + Labels: map[string]string{"cpuType": "default"}, + }) }, wantClusterQueues: map[string]*ClusterQueue{ "a": { Name: "a", RequestableResources: map[corev1.ResourceName][]FlavorLimits{ - corev1.ResourceCPU: {{Name: "default", Guaranteed: 10000, Ceiling: 20000, - Labels: map[string]string{"cpuType": "default"}}}, + corev1.ResourceCPU: {{Name: "default", Guaranteed: 10000, Ceiling: 20000}}, }, NamespaceSelector: labels.Nothing(), LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, @@ -116,6 +170,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, NamespaceSelector: labels.Nothing(), UsedResources: Resources{corev1.ResourceCPU: {"default": 0}}, + LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, }, "c": { Name: "c", @@ -137,7 +192,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, { name: "update", - operation: func() { + operation: func(cache *Cache) { + setup(cache) clusterQueues := []kueue.ClusterQueue{ { ObjectMeta: metav1.ObjectMeta{Name: "a"}, @@ -147,15 +203,11 @@ func TestCacheClusterQueueOperations(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "default", + ResourceFlavor: "default", Quota: kueue.Quota{ Guaranteed: resource.MustParse("5"), Ceiling: resource.MustParse("10"), }, - Labels: map[string]string{ - "cpuType": "default", - "region": "central", - }, }, }, }}, @@ -176,14 +228,16 @@ func TestCacheClusterQueueOperations(t *testing.T) { t.Fatalf("Failed updating ClusterQueue: %v", err) } } + cache.AddOrUpdateResourceFlavor(&kueue.ResourceFlavor{ + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + Labels: map[string]string{"cpuType": "default", "region": "central"}, + }) }, wantClusterQueues: map[string]*ClusterQueue{ "a": { Name: "a", RequestableResources: map[corev1.ResourceName][]FlavorLimits{ - corev1.ResourceCPU: {{Name: "default", Guaranteed: 5000, Ceiling: 10000, - Labels: map[string]string{"cpuType": "default", "region": "central"}, - }}, + corev1.ResourceCPU: {{Name: "default", Guaranteed: 5000, Ceiling: 10000}}, }, NamespaceSelector: labels.Nothing(), LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType", "region")}, @@ -215,7 +269,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, { name: "delete", - operation: func() { + operation: func(cache *Cache) { + setup(cache) clusterQueues := []kueue.ClusterQueue{ {ObjectMeta: metav1.ObjectMeta{Name: "a"}}, {ObjectMeta: metav1.ObjectMeta{Name: "d"}}, @@ -226,10 +281,13 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "b": { - Name: "b", - RequestableResources: map[corev1.ResourceName][]FlavorLimits{}, - NamespaceSelector: labels.Everything(), - UsedResources: Resources{}, + Name: "b", + RequestableResources: map[corev1.ResourceName][]FlavorLimits{ + corev1.ResourceCPU: {{Name: "default"}}, + }, + NamespaceSelector: labels.Nothing(), + UsedResources: Resources{corev1.ResourceCPU: {"default": 0}}, + LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, }, "c": { Name: "c", @@ -244,10 +302,11 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }, } - for _, step := range steps { - t.Run(step.name, func(t *testing.T) { - step.operation() - if diff := cmp.Diff(step.wantClusterQueues, cache.clusterQueues, + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cache := New(fake.NewClientBuilder().WithScheme(scheme).Build()) + tc.operation(cache) + if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues, cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "Workloads")); diff != "" { t.Errorf("Unexpected clusterQueues (-want,+got):\n%s", diff) } @@ -260,7 +319,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { } gotCohorts[name] = gotCohort } - if diff := cmp.Diff(step.wantCohorts, gotCohorts); diff != "" { + if diff := cmp.Diff(tc.wantCohorts, gotCohorts); diff != "" { t.Errorf("Unexpected cohorts (-want,+got):\n%s", diff) } }) @@ -276,8 +335,8 @@ func TestCacheWorkloadOperations(t *testing.T) { { Name: "cpu", Flavors: []kueue.Flavor{ - {Name: "on-demand"}, - {Name: "spot"}, + {ResourceFlavor: "on-demand"}, + {ResourceFlavor: "spot"}, }, }, }, @@ -290,8 +349,8 @@ func TestCacheWorkloadOperations(t *testing.T) { { Name: "cpu", Flavors: []kueue.Flavor{ - {Name: "on-demand"}, - {Name: "spot"}, + {ResourceFlavor: "on-demand"}, + {ResourceFlavor: "spot"}, }, }, }, @@ -737,7 +796,7 @@ func TestClusterQueueUsage(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "default", + ResourceFlavor: "default", Quota: kueue.Quota{ Guaranteed: resource.MustParse("10"), Ceiling: resource.MustParse("20"), @@ -749,14 +808,14 @@ func TestClusterQueueUsage(t *testing.T) { Name: "example.com/gpu", Flavors: []kueue.Flavor{ { - Name: "model_a", + ResourceFlavor: "model_a", Quota: kueue.Quota{ Guaranteed: resource.MustParse("5"), Ceiling: resource.MustParse("10"), }, }, { - Name: "model_b", + ResourceFlavor: "model_b", Quota: kueue.Quota{ Guaranteed: resource.MustParse("5"), Ceiling: resource.MustParse("10"), diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index b60d953e1e..1c3d6aa7b5 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -17,11 +17,13 @@ limitations under the License. package cache import ( + kueue "sigs.k8s.io/kueue/api/v1alpha1" "sigs.k8s.io/kueue/pkg/workload" ) type Snapshot struct { - ClusterQueues map[string]*ClusterQueue + ClusterQueues map[string]*ClusterQueue + ResourceFlavors map[string]*kueue.ResourceFlavor } func (c *Cache) Snapshot() Snapshot { @@ -29,11 +31,16 @@ func (c *Cache) Snapshot() Snapshot { defer c.RUnlock() snap := Snapshot{ - ClusterQueues: make(map[string]*ClusterQueue, len(c.clusterQueues)), + ClusterQueues: make(map[string]*ClusterQueue, len(c.clusterQueues)), + ResourceFlavors: make(map[string]*kueue.ResourceFlavor, len(c.resourceFlavors)), } for _, cq := range c.clusterQueues { snap.ClusterQueues[cq.Name] = cq.snapshot() } + for _, rf := range c.resourceFlavors { + // Shallow copy is enough + snap.ResourceFlavors[rf.Name] = rf + } for _, cohort := range c.cohorts { cohortCopy := newCohort(cohort.Name, len(cohort.members)) for cq := range cohort.members { diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index a2d5955c54..b5b6b862a4 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -53,18 +53,16 @@ func TestSnapshot(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "demand", + ResourceFlavor: "demand", Quota: kueue.Quota{ Guaranteed: resource.MustParse("100"), }, - Labels: map[string]string{"foo": "bar", "instance": "on-demand"}, }, { - Name: "spot", + ResourceFlavor: "spot", Quota: kueue.Quota{ Guaranteed: resource.MustParse("200"), }, - Labels: map[string]string{"baz": "bar", "instance": "spot"}, }, }, }, @@ -82,7 +80,7 @@ func TestSnapshot(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "spot", + ResourceFlavor: "spot", Quota: kueue.Quota{ Guaranteed: resource.MustParse("100"), }, @@ -93,7 +91,7 @@ func TestSnapshot(t *testing.T) { Name: "example.com/gpu", Flavors: []kueue.Flavor{ { - Name: "default", + ResourceFlavor: "default", Quota: kueue.Quota{ Guaranteed: resource.MustParse("50"), }, @@ -113,7 +111,7 @@ func TestSnapshot(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "default", + ResourceFlavor: "default", Quota: kueue.Quota{ Guaranteed: resource.MustParse("100"), }, @@ -125,12 +123,25 @@ func TestSnapshot(t *testing.T) { }, } for _, c := range clusterQueues { - // Purposely do not make a copy of clusterQueues. Clones of necessary fields are + // Purposely do not make a copy of clusterQueues. Clones of necessary fields are // done in AddClusterQueue. if err := cache.AddClusterQueue(context.Background(), &c); err != nil { t.Fatalf("Failed adding ClusterQueue: %v", err) } } + flavors := []kueue.ResourceFlavor{ + { + ObjectMeta: metav1.ObjectMeta{Name: "demand"}, + Labels: map[string]string{"foo": "bar", "instance": "demand"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "spot"}, + Labels: map[string]string{"baz": "bar", "instance": "spot"}, + }, + } + for i := range flavors { + cache.AddOrUpdateResourceFlavor(&flavors[i]) + } workloads := []kueue.QueuedWorkload{ { ObjectMeta: metav1.ObjectMeta{Name: "alpha"}, @@ -249,12 +260,10 @@ func TestSnapshot(t *testing.T) { { Name: "demand", Guaranteed: 100_000, - Labels: map[string]string{"foo": "bar", "instance": "on-demand"}, }, { Name: "spot", Guaranteed: 200_000, - Labels: map[string]string{"baz": "bar", "instance": "spot"}, }, }, }, @@ -300,6 +309,7 @@ func TestSnapshot(t *testing.T) { "/gamma": workload.NewInfo(&workloads[2]), }, NamespaceSelector: labels.Nothing(), + LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: {"baz": {}, "instance": {}}}, }, "bar": { Name: "bar", @@ -318,6 +328,16 @@ func TestSnapshot(t *testing.T) { NamespaceSelector: labels.Nothing(), }, }, + ResourceFlavors: map[string]*kueue.ResourceFlavor{ + "demand": { + ObjectMeta: metav1.ObjectMeta{Name: "demand"}, + Labels: map[string]string{"foo": "bar", "instance": "demand"}, + }, + "spot": { + ObjectMeta: metav1.ObjectMeta{Name: "spot"}, + Labels: map[string]string{"baz": "bar", "instance": "spot"}, + }, + }, } if diff := cmp.Diff(wantSnapshot, snapshot, cmpopts.IgnoreUnexported(Cohort{})); diff != "" { t.Errorf("Unexpected Snapshot (-want,+got):\n%s", diff) diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go new file mode 100644 index 0000000000..7fa8c0fafb --- /dev/null +++ b/pkg/controller/core/resourceflavor_controller.go @@ -0,0 +1,95 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + "github.com/go-logr/logr" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + + kueue "sigs.k8s.io/kueue/api/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" +) + +// ResourceFlavorReconciler reconciles a ResourceFlavor object +type ResourceFlavorReconciler struct { + log logr.Logger + cache *cache.Cache +} + +func NewResourceFlavorReconciler(cache *cache.Cache) *ResourceFlavorReconciler { + return &ResourceFlavorReconciler{ + log: ctrl.Log.WithName("resourceflavor-reconciler"), + cache: cache, + } +} + +//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch + +func (r *ResourceFlavorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // Nothing to do here. + return ctrl.Result{}, nil +} + +func (r *ResourceFlavorReconciler) Create(e event.CreateEvent) bool { + flv, match := e.Object.(*kueue.ResourceFlavor) + if !match { + return false + } + log := r.log.WithValues("resourceFlavor", klog.KObj(flv)) + log.V(2).Info("ResourceFlavor create event") + r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()) + return false +} + +func (r *ResourceFlavorReconciler) Delete(e event.DeleteEvent) bool { + flv, match := e.Object.(*kueue.ResourceFlavor) + if !match { + return false + } + log := r.log.WithValues("resourceFlavor", klog.KObj(flv)) + log.V(2).Info("ResourceFlavor delete event") + r.cache.DeleteResourceFlavor(flv) + return false +} + +func (r *ResourceFlavorReconciler) Update(e event.UpdateEvent) bool { + flv, match := e.ObjectNew.(*kueue.ResourceFlavor) + if !match { + return false + } + log := r.log.WithValues("resourceFlavor", klog.KObj(flv)) + log.V(2).Info("ResourceFlavor update event") + r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()) + return false +} + +func (r *ResourceFlavorReconciler) Generic(e event.GenericEvent) bool { + r.log.V(3).Info("Ignore generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind()) + return false +} + +// SetupWithManager sets up the controller with the Manager. +func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&kueue.ResourceFlavor{}). + WithEventFilter(r). + Complete(r) +} diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index a199dc2aa7..76822b0d4f 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -92,7 +93,7 @@ func (r *JobReconciler) SetupWithManager(mgr ctrl.Manager) error { //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queuedworkloads,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queuedworkloads/status,verbs=get;update;patch //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queuedworkloads/finalizers,verbs=update -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch +//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var job batchv1.Job @@ -224,17 +225,13 @@ func (r *JobReconciler) stopJob(ctx context.Context, w *kueue.QueuedWorkload, func (r *JobReconciler) startJob(ctx context.Context, w *kueue.QueuedWorkload, job *batchv1.Job) error { log := ctrl.LoggerFrom(ctx) - // Lookup the clusterQueue to fetch the node affinity labels to apply on the job. - clusterQueue := kueue.ClusterQueue{} - if err := r.client.Get(ctx, types.NamespacedName{Name: string(w.Spec.Admission.ClusterQueue)}, &clusterQueue); err != nil { - log.Error(err, "fetching ClusterQueue") - return err - } - if len(w.Spec.PodSets) != 1 { return fmt.Errorf("one podset must exist, found %d", len(w.Spec.PodSets)) } - nodeSelector := getNodeSelectors(&clusterQueue, w) + nodeSelector, err := r.getNodeSelectors(ctx, w) + if err != nil { + return err + } if len(nodeSelector) != 0 { if job.Spec.Template.Spec.NodeSelector == nil { job.Spec.Template.Spec.NodeSelector = nodeSelector @@ -258,32 +255,28 @@ func (r *JobReconciler) startJob(ctx context.Context, w *kueue.QueuedWorkload, j return nil } -func getNodeSelectors(cq *kueue.ClusterQueue, w *kueue.QueuedWorkload) map[string]string { +func (r *JobReconciler) getNodeSelectors(ctx context.Context, w *kueue.QueuedWorkload) (map[string]string, error) { if len(w.Spec.Admission.PodSetFlavors[0].Flavors) == 0 { - return nil - } - - // Create a map of resources-to-flavors. - // May be cache this? - flavors := make(map[corev1.ResourceName]map[string]*kueue.Flavor, len(cq.Spec.RequestableResources)) - for _, r := range cq.Spec.RequestableResources { - flavors[r.Name] = make(map[string]*kueue.Flavor, len(r.Flavors)) - for j := range r.Flavors { - flavors[r.Name][r.Flavors[j].Name] = &r.Flavors[j] - } + return nil, nil } + processedFlvs := sets.NewString() nodeSelector := map[string]string{} - for res, flvr := range w.Spec.Admission.PodSetFlavors[0].Flavors { - if cqRes, existRes := flavors[res]; existRes { - if cqFlvr, existFlvr := cqRes[flvr]; existFlvr { - for k, v := range cqFlvr.Labels { - nodeSelector[k] = v - } - } + for _, flvName := range w.Spec.Admission.PodSetFlavors[0].Flavors { + if processedFlvs.Has(flvName) { + continue + } + // Lookup the ResourceFlavors to fetch the node affinity labels to apply on the job. + flv := kueue.ResourceFlavor{} + if err := r.client.Get(ctx, types.NamespacedName{Name: flvName}, &flv); err != nil { + return nil, err + } + for k, v := range flv.Labels { + nodeSelector[k] = v } + processedFlvs.Insert(flvName) } - return nodeSelector + return nodeSelector, nil } func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv1.Job) error { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 681207bd60..f1d0cf4a22 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -159,7 +159,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna continue } e := entry{Info: w} - if !e.assignFlavors(log, cq) { + if !e.assignFlavors(log, snap.ResourceFlavors, cq) { log.V(2).Info("Workload didn't fit in remaining clusterQueue even when borrowing") continue } @@ -173,14 +173,14 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna // borrow from the cohort. // It returns whether the entry would fit. If it doesn't fit, the object is // unmodified. -func (e *entry) assignFlavors(log logr.Logger, cq *cache.ClusterQueue) bool { +func (e *entry) assignFlavors(log logr.Logger, resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue) bool { flavoredRequests := make([]workload.PodSetResources, 0, len(e.TotalRequests)) wUsed := make(cache.Resources) wBorrows := make(cache.Resources) for i, podSet := range e.TotalRequests { flavors := make(map[corev1.ResourceName]string, len(podSet.Requests)) for resName, reqVal := range podSet.Requests { - rFlavor, borrow := findFlavorForResource(log, resName, reqVal, wUsed[resName], cq, &e.Obj.Spec.PodSets[i].Spec) + rFlavor, borrow := findFlavorForResource(log, resName, reqVal, wUsed[resName], resourceFlavors, cq, &e.Obj.Spec.PodSets[i].Spec) if rFlavor == "" { return false } @@ -263,11 +263,17 @@ func findFlavorForResource( name corev1.ResourceName, val int64, wUsed map[string]int64, + resourceFlavors map[string]*kueue.ResourceFlavor, cq *cache.ClusterQueue, spec *corev1.PodSpec) (string, int64) { // We will only check against the flavors' labels for the resource. selector := flavorSelector(spec, cq.LabelKeys[name]) - for _, flavor := range cq.RequestableResources[name] { + for _, flvLimit := range cq.RequestableResources[name] { + flavor, exist := resourceFlavors[flvLimit.Name] + if !exist { + log.Error(nil, "Flavor %v not found", flvLimit.Name) + continue + } _, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Taints, spec.Tolerations, func(t *corev1.Taint) bool { return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute }) @@ -283,7 +289,7 @@ func findFlavorForResource( } // Check considering the flavor usage by previous pod sets. - ok, borrow := fitsFlavorLimits(name, val+wUsed[flavor.Name], cq, &flavor) + ok, borrow := fitsFlavorLimits(name, val+wUsed[flavor.Name], cq, &flvLimit) if ok { return flavor.Name, borrow } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index b21cf6476c..8295570928 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -49,6 +49,12 @@ const ( ) func TestSchedule(t *testing.T) { + resourceFlavors := []*kueue.ResourceFlavor{ + {ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "spot"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "model-a"}}, + } clusterQueues := []kueue.ClusterQueue{ { ObjectMeta: metav1.ObjectMeta{Name: "sales"}, @@ -68,7 +74,7 @@ func TestSchedule(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "default", + ResourceFlavor: "default", Quota: kueue.Quota{ Guaranteed: resource.MustParse("50"), Ceiling: resource.MustParse("50"), @@ -98,14 +104,14 @@ func TestSchedule(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "on-demand", + ResourceFlavor: "on-demand", Quota: kueue.Quota{ Guaranteed: resource.MustParse("50"), Ceiling: resource.MustParse("100"), }, }, { - Name: "spot", + ResourceFlavor: "spot", Quota: kueue.Quota{ Guaranteed: resource.MustParse("100"), Ceiling: resource.MustParse("100"), @@ -135,14 +141,14 @@ func TestSchedule(t *testing.T) { Name: corev1.ResourceCPU, Flavors: []kueue.Flavor{ { - Name: "on-demand", + ResourceFlavor: "on-demand", Quota: kueue.Quota{ Guaranteed: resource.MustParse("50"), Ceiling: resource.MustParse("60"), }, }, { - Name: "spot", + ResourceFlavor: "spot", Quota: kueue.Quota{ Guaranteed: resource.MustParse("0"), Ceiling: resource.MustParse("100"), @@ -154,7 +160,7 @@ func TestSchedule(t *testing.T) { Name: "example.com/gpu", Flavors: []kueue.Flavor{ { - Name: "model-a", + ResourceFlavor: "model-a", Quota: kueue.Quota{ Guaranteed: resource.MustParse("20"), Ceiling: resource.MustParse("20"), @@ -719,6 +725,9 @@ func TestSchedule(t *testing.T) { t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) } } + for i := range resourceFlavors { + cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i]) + } workloadWatch, err := cl.Watch(ctx, &kueue.QueuedWorkloadList{}) if err != nil { t.Fatalf("Failed setting up watch: %v", err) @@ -791,6 +800,28 @@ func TestSchedule(t *testing.T) { } func TestEntryAssignFlavors(t *testing.T) { + resourceFlavors := map[string]*kueue.ResourceFlavor{ + "default": { + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + }, + "one": { + ObjectMeta: metav1.ObjectMeta{Name: "one"}, + Labels: map[string]string{"type": "one"}, + }, + "two": { + ObjectMeta: metav1.ObjectMeta{Name: "two"}, + Labels: map[string]string{"type": "two"}, + }, + "tainted": { + ObjectMeta: metav1.ObjectMeta{Name: "tainted"}, + Taints: []corev1.Taint{{ + Key: "instance", + Value: "spot", + Effect: corev1.TaintEffectNoSchedule, + }}, + }, + } + cases := map[string]struct { wlPods []kueue.PodSet clusterQueue cache.ClusterQueue @@ -846,18 +877,14 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "default", Guaranteed: 4000, Taints: []corev1.Taint{{ - Key: "instance", - Value: "spot", - Effect: corev1.TaintEffectNoSchedule, - }}}, + {Name: "tainted", Guaranteed: 4000}, }), }, }, wantFits: true, wantFlavors: map[string]map[corev1.ResourceName]string{ "main": { - corev1.ResourceCPU: "default", + corev1.ResourceCPU: "tainted", }, }, }, @@ -950,11 +977,32 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: 4000, Taints: []corev1.Taint{{ - Key: "instance", - Value: "spot", - Effect: corev1.TaintEffectNoSchedule, - }}}, + {Name: "tainted", Guaranteed: 4000}, + {Name: "two", Guaranteed: 4000}, + }), + }, + }, + wantFits: true, + wantFlavors: map[string]map[corev1.ResourceName]string{ + "main": { + corev1.ResourceCPU: "two", + }, + }, + }, + "multiple flavors, skips missing ResourceFlavor": { + wlPods: []kueue.PodSet{ + { + Count: 1, + Name: "main", + Spec: utiltesting.PodSpecForRequest(map[corev1.ResourceName]string{ + corev1.ResourceCPU: "3", + }), + }, + }, + clusterQueue: cache.ClusterQueue{ + RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ + corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ + {Name: "non-existant", Guaranteed: 4000}, {Name: "two", Guaranteed: 4000}, }), }, @@ -980,7 +1028,7 @@ func TestEntryAssignFlavors(t *testing.T) { }, }, // ignored:foo should get ignored - NodeSelector: map[string]string{"cpuType": "two", "ignored1": "foo"}, + NodeSelector: map[string]string{"type": "two", "ignored1": "foo"}, Affinity: &corev1.Affinity{NodeAffinity: &corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ @@ -1003,8 +1051,8 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one"}}, - {Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}}, + {Name: "one", Guaranteed: 4000}, + {Name: "two", Guaranteed: 4000}, }), }, LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, @@ -1039,12 +1087,7 @@ func TestEntryAssignFlavors(t *testing.T) { { MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "cpuType", - Operator: corev1.NodeSelectorOpIn, - Values: []string{"two"}, - }, - { - Key: "memType", + Key: "type", Operator: corev1.NodeSelectorOpIn, Values: []string{"two"}, }, @@ -1059,18 +1102,14 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one", "group": "group1"}}, - {Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}}, + {Name: "one", Guaranteed: 4000}, + {Name: "two", Guaranteed: 4000}, }), corev1.ResourceMemory: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: utiltesting.Gi, Labels: map[string]string{"memType": "one"}}, - {Name: "two", Guaranteed: utiltesting.Gi, Labels: map[string]string{"memType": "two"}}, + {Name: "one", Guaranteed: utiltesting.Gi}, + {Name: "two", Guaranteed: utiltesting.Gi}, }), }, - LabelKeys: map[corev1.ResourceName]sets.String{ - corev1.ResourceCPU: sets.NewString("cpuType", "group"), - corev1.ResourceMemory: sets.NewString("memType"), - }, }, wantFits: true, wantFlavors: map[string]map[corev1.ResourceName]string{ @@ -1127,11 +1166,10 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one"}}, - {Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}}, + {Name: "one", Guaranteed: 4000}, + {Name: "two", Guaranteed: 4000}, }), }, - LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, }, wantFits: true, wantFlavors: map[string]map[corev1.ResourceName]string{ @@ -1159,7 +1197,7 @@ func TestEntryAssignFlavors(t *testing.T) { { MatchExpressions: []corev1.NodeSelectorRequirement{ { - Key: "cpuType", + Key: "type", Operator: corev1.NodeSelectorOpIn, Values: []string{"three"}, }, @@ -1174,8 +1212,8 @@ func TestEntryAssignFlavors(t *testing.T) { clusterQueue: cache.ClusterQueue{ RequestableResources: map[corev1.ResourceName][]cache.FlavorLimits{ corev1.ResourceCPU: noBorrowing([]cache.FlavorLimits{ - {Name: "one", Guaranteed: 4000, Labels: map[string]string{"cpuType": "one"}}, - {Name: "two", Guaranteed: 4000, Labels: map[string]string{"cpuType": "two"}}, + {Name: "one", Guaranteed: 4000}, + {Name: "two", Guaranteed: 4000}, }), }, LabelKeys: map[corev1.ResourceName]sets.String{corev1.ResourceCPU: sets.NewString("cpuType")}, @@ -1360,7 +1398,8 @@ func TestEntryAssignFlavors(t *testing.T) { }, }), } - fits := e.assignFlavors(log, &tc.clusterQueue) + tc.clusterQueue.UpdateLabelKeys(resourceFlavors) + fits := e.assignFlavors(log, resourceFlavors, &tc.clusterQueue) if fits != tc.wantFits { t.Errorf("e.assignFlavors(_)=%t, want %t", fits, tc.wantFits) } diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 18050cb00d..2aa4eceddd 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -310,18 +310,17 @@ func (r *ResourceWrapper) Flavor(f *kueue.Flavor) *ResourceWrapper { type FlavorWrapper struct{ kueue.Flavor } // MakeFlavor creates a wrapper for a resource flavor. -func MakeFlavor(name, guaranteed string) *FlavorWrapper { +func MakeFlavor(rf, guaranteed string) *FlavorWrapper { return &FlavorWrapper{kueue.Flavor{ - Name: name, + ResourceFlavor: kueue.ResourceFlavorReference(rf), Quota: kueue.Quota{ Guaranteed: resource.MustParse(guaranteed), Ceiling: resource.MustParse(guaranteed), }, - Labels: map[string]string{}, }} } -// Obj returns the inner resource flavor. +// Obj returns the inner flavor. func (f *FlavorWrapper) Obj() *kueue.Flavor { return &f.Flavor } @@ -332,14 +331,32 @@ func (f *FlavorWrapper) Ceiling(c string) *FlavorWrapper { return f } -// Label adds a label to the flavor. -func (f *FlavorWrapper) Label(k, v string) *FlavorWrapper { - f.Labels[k] = v - return f +// ResourceFlavorWrapper wraps a ResourceFlavor. +type ResourceFlavorWrapper struct{ kueue.ResourceFlavor } + +// MakeResourceFlavor creates a wrapper for a ResourceFlavor. +func MakeResourceFlavor(name string) *ResourceFlavorWrapper { + return &ResourceFlavorWrapper{kueue.ResourceFlavor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Labels: map[string]string{}, + }} } -// Taint adds a taint to the flavor. -func (f *FlavorWrapper) Taint(t corev1.Taint) *FlavorWrapper { - f.Taints = append(f.Taints, t) - return f +// Obj returns the inner ResourceFlavor. +func (rf *ResourceFlavorWrapper) Obj() *kueue.ResourceFlavor { + return &rf.ResourceFlavor +} + +// Label adds a label to the ResourceFlavor. +func (rf *ResourceFlavorWrapper) Label(k, v string) *ResourceFlavorWrapper { + rf.Labels[k] = v + return rf +} + +// Taint adds a taint to the ResourceFlavor. +func (rf *ResourceFlavorWrapper) Taint(t corev1.Taint) *ResourceFlavorWrapper { + rf.Taints = append(rf.Taints, t) + return rf } diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index 4cd99fc8d8..a2ad386d9f 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -42,8 +42,6 @@ const ( jobNamespace = "default" jobKey = jobNamespace + "/" + jobName labelKey = "cloud.provider.com/instance" - flavorOnDemand = "on-demand" - flavorSpot = "spot" priorityClassName = "test-priority-class" priorityValue = 10 ) @@ -114,17 +112,21 @@ var _ = ginkgo.Describe("Job controller", func() { }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) ginkgo.By("checking the job is unsuspended when workload is assigned") + onDemandFlavor := testing.MakeResourceFlavor("on-demand").Label(labelKey, "on-demand").Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + spotFlavor := testing.MakeResourceFlavor("spot").Label(labelKey, "spot").Obj() + gomega.Expect(k8sClient.Create(ctx, spotFlavor)).Should(gomega.Succeed()) clusterQueue := testing.MakeClusterQueue("cluster-queue"). Resource(testing.MakeResource(corev1.ResourceCPU). - Flavor(testing.MakeFlavor(flavorOnDemand, "5").Label(labelKey, flavorOnDemand).Obj()). - Flavor(testing.MakeFlavor(flavorSpot, "5").Label(labelKey, flavorSpot).Obj()). + Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Obj()). + Flavor(testing.MakeFlavor(spotFlavor.Name, "5").Obj()). Obj()).Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) createdWorkload.Spec.Admission = &kueue.Admission{ ClusterQueue: kueue.ClusterQueueReference(clusterQueue.Name), PodSetFlavors: []kueue.PodSetFlavors{{ Flavors: map[corev1.ResourceName]string{ - corev1.ResourceCPU: flavorOnDemand, + corev1.ResourceCPU: onDemandFlavor.Name, }, }}, } @@ -140,7 +142,7 @@ var _ = ginkgo.Describe("Job controller", func() { return ok }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) gomega.Expect(len(createdJob.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(1)) - gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(flavorOnDemand)) + gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(onDemandFlavor.Name)) gomega.Consistently(func() bool { if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { return false @@ -178,7 +180,7 @@ var _ = ginkgo.Describe("Job controller", func() { ClusterQueue: kueue.ClusterQueueReference(clusterQueue.Name), PodSetFlavors: []kueue.PodSetFlavors{{ Flavors: map[corev1.ResourceName]string{ - corev1.ResourceCPU: flavorSpot, + corev1.ResourceCPU: spotFlavor.Name, }, }}, } @@ -190,7 +192,7 @@ var _ = ginkgo.Describe("Job controller", func() { return !*createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) gomega.Expect(len(createdJob.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(1)) - gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(flavorSpot)) + gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(spotFlavor.Name)) gomega.Consistently(func() bool { if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { return false diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 66e17a0d29..110b9e1345 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -92,6 +92,15 @@ func DeleteClusterQueue(ctx context.Context, c client.Client, cq *kueue.ClusterQ return nil } +func DeleteResourceFlavor(ctx context.Context, c client.Client, rf *kueue.ResourceFlavor) error { + if rf != nil { + if err := c.Delete(ctx, rf); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + return nil +} + func DeleteQueue(ctx context.Context, c client.Client, q *kueue.Queue) error { if q != nil { if err := c.Delete(ctx, q); err != nil && !apierrors.IsNotFound(err) { diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index fa5ead8dce..159aca1332 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -34,17 +34,18 @@ import ( var _ = ginkgo.Describe("Scheduler", func() { const ( - instanceKey = "cloud.provider.com/instance" - onDemandFlavor = "on-demand" - spotFlavor = "spot" + instanceKey = "cloud.provider.com/instance" ) var ( - ns *corev1.Namespace - prodClusterQ *kueue.ClusterQueue - devClusterQ *kueue.ClusterQueue - prodQueue *kueue.Queue - devQueue *kueue.Queue + ns *corev1.Namespace + prodClusterQ *kueue.ClusterQueue + devClusterQ *kueue.ClusterQueue + prodQueue *kueue.Queue + devQueue *kueue.Queue + onDemandFlavor *kueue.ResourceFlavor + spotTaintedFlavor *kueue.ResourceFlavor + spotUntaintedFlavor *kueue.ResourceFlavor ) ginkgo.BeforeEach(func() { @@ -54,27 +55,35 @@ var _ = ginkgo.Describe("Scheduler", func() { }, } gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + onDemandFlavor = testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + + spotTaintedFlavor = testing.MakeResourceFlavor("spot-tainted"). + Label(instanceKey, "spot-tainted"). + Taint(corev1.Taint{ + Key: instanceKey, + Value: "spot-tainted", + Effect: corev1.TaintEffectNoSchedule, + }).Obj() + gomega.Expect(k8sClient.Create(ctx, spotTaintedFlavor)).Should(gomega.Succeed()) + + spotUntaintedFlavor = testing.MakeResourceFlavor("spot-untainted").Label(instanceKey, "spot-untainted").Obj() + gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).Should(gomega.Succeed()) + prodClusterQ = testing.MakeClusterQueue("prod-cq"). Cohort("prod"). Resource(testing.MakeResource(corev1.ResourceCPU). - Flavor(testing.MakeFlavor(spotFlavor, "5"). - Taint(corev1.Taint{ - Key: instanceKey, - Value: spotFlavor, - Effect: corev1.TaintEffectNoSchedule, - }). - Label(instanceKey, spotFlavor).Obj()). - Flavor(testing.MakeFlavor(onDemandFlavor, "5"). - Ceiling("10"). - Label(instanceKey, onDemandFlavor).Obj()). + Flavor(testing.MakeFlavor(spotTaintedFlavor.Name, "5").Obj()). + Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Ceiling("10").Obj()). Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, prodClusterQ)).Should(gomega.Succeed()) devClusterQ = testing.MakeClusterQueue("dev-clusterqueue"). Resource(testing.MakeResource(corev1.ResourceCPU). - Flavor(testing.MakeFlavor(spotFlavor, "5").Label(instanceKey, spotFlavor).Obj()). - Flavor(testing.MakeFlavor(onDemandFlavor, "5").Label(instanceKey, onDemandFlavor).Obj()). + Flavor(testing.MakeFlavor(spotUntaintedFlavor.Name, "5").Obj()). + Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Obj()). Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, devClusterQ)).Should(gomega.Succeed()) @@ -84,12 +93,16 @@ var _ = ginkgo.Describe("Scheduler", func() { devQueue = testing.MakeQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) + }) ginkgo.AfterEach(func() { gomega.Expect(framework.DeleteNamespace(ctx, k8sClient, ns)).ToNot(gomega.HaveOccurred()) gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, prodClusterQ)).ToNot(gomega.HaveOccurred()) gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, devClusterQ)).ToNot(gomega.HaveOccurred()) + gomega.Expect(framework.DeleteResourceFlavor(ctx, k8sClient, onDemandFlavor)).ToNot(gomega.HaveOccurred()) + gomega.Expect(framework.DeleteResourceFlavor(ctx, k8sClient, spotTaintedFlavor)).ToNot(gomega.HaveOccurred()) + gomega.Expect(framework.DeleteResourceFlavor(ctx, k8sClient, spotUntaintedFlavor)).ToNot(gomega.HaveOccurred()) ns = nil }) @@ -103,7 +116,7 @@ var _ = ginkgo.Describe("Scheduler", func() { err := k8sClient.Get(ctx, lookupKey1, createdProdJob1) return err == nil && !*createdProdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdProdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor)) + gomega.Expect(createdProdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) ginkgo.By("checking a second no-fit prod job does not start") prodJob2 := testing.MakeJob("prod-job2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj() @@ -122,7 +135,7 @@ var _ = ginkgo.Describe("Scheduler", func() { key := types.NamespacedName{Name: devJob.Name, Namespace: devJob.Namespace} return k8sClient.Get(ctx, key, createdDevJob) == nil && !*createdDevJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdDevJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotFlavor)) + gomega.Expect(createdDevJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) ginkgo.By("checking the second prod job starts when the first finishes") createdProdJob1.Status.Conditions = append(createdProdJob1.Status.Conditions, @@ -136,7 +149,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Eventually(func() bool { return k8sClient.Get(ctx, lookupKey2, createdProdJob2) == nil && !*createdProdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdProdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor)) + gomega.Expect(createdProdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) }) ginkgo.It("Should schedule jobs on tolerated flavors", func() { @@ -148,7 +161,7 @@ var _ = ginkgo.Describe("Scheduler", func() { lookupKey := types.NamespacedName{Name: job1.Name, Namespace: job1.Namespace} return k8sClient.Get(ctx, lookupKey, createdJob1) == nil && !*createdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor)) + gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) // TODO(#8): uncomment the following once we have proper re-queueing. // ginkgo.By("checking a second job without toleration doesn't start") @@ -165,7 +178,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Toleration(corev1.Toleration{ Key: instanceKey, Operator: corev1.TolerationOpEqual, - Value: spotFlavor, + Value: spotTaintedFlavor.Name, Effect: corev1.TaintEffectNoSchedule, }). Request(corev1.ResourceCPU, "5").Obj() @@ -175,7 +188,7 @@ var _ = ginkgo.Describe("Scheduler", func() { lookupKey := types.NamespacedName{Name: job3.Name, Namespace: job3.Namespace} return k8sClient.Get(ctx, lookupKey, createdJob3) == nil && !*createdJob3.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdJob3.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotFlavor)) + gomega.Expect(createdJob3.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotTaintedFlavor.Name)) }) ginkgo.It("Should schedule jobs using borrowed ClusterQueue", func() { @@ -192,7 +205,7 @@ var _ = ginkgo.Describe("Scheduler", func() { fallbackClusterQueue := testing.MakeClusterQueue("fallback-cq"). Cohort(prodClusterQ.Spec.Cohort). Resource(testing.MakeResource(corev1.ResourceCPU). - Flavor(testing.MakeFlavor(onDemandFlavor, "5").Ceiling("10").Label(instanceKey, onDemandFlavor).Obj()). + Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Ceiling("10").Obj()). Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, fallbackClusterQueue)).Should(gomega.Succeed()) @@ -202,7 +215,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Eventually(func() bool { return k8sClient.Get(ctx, lookupKey, createdJob) == nil && !*createdJob.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor)) + gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) }) ginkgo.It("Should schedule jobs with affinity to specific flavor", func() { @@ -214,11 +227,11 @@ var _ = ginkgo.Describe("Scheduler", func() { lookupKey := types.NamespacedName{Name: job1.Name, Namespace: job1.Namespace} return k8sClient.Get(ctx, lookupKey, createdJob1) == nil && !*createdJob1.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) - gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotFlavor)) + gomega.Expect(createdJob1.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) ginkgo.By("checking a second job with affinity to on-demand") job2 := testing.MakeJob("affinity-job", ns.Name).Queue(devQueue.Name). - NodeSelector(instanceKey, onDemandFlavor). + NodeSelector(instanceKey, onDemandFlavor.Name). NodeSelector("foo", "bar"). Request(corev1.ResourceCPU, "1").Obj() gomega.Expect(k8sClient.Create(ctx, job2)).Should(gomega.Succeed()) @@ -228,7 +241,7 @@ var _ = ginkgo.Describe("Scheduler", func() { return k8sClient.Get(ctx, lookupKey, createdJob2) == nil && !*createdJob2.Spec.Suspend }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) gomega.Expect(len(createdJob2.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(2)) - gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor)) + gomega.Expect(createdJob2.Spec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) }) ginkgo.It("Should schedule jobs from the selected namespaces", func() { @@ -242,7 +255,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }, }, }). - Resource(testing.MakeResource(corev1.ResourceCPU).Flavor(testing.MakeFlavor(onDemandFlavor, "5").Obj()).Obj()). + Resource(testing.MakeResource(corev1.ResourceCPU).Flavor(testing.MakeFlavor(onDemandFlavor.Name, "5").Obj()).Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, clusterQ)).Should(gomega.Succeed()) defer func() { diff --git a/test/integration/scheduler/suite_test.go b/test/integration/scheduler/suite_test.go index 785f8147cb..8aa9367d99 100644 --- a/test/integration/scheduler/suite_test.go +++ b/test/integration/scheduler/suite_test.go @@ -84,6 +84,9 @@ func managerAndSchedulerSetup(mgr manager.Manager) { err = kueuectrl.NewQueuedWorkloadReconciler(queues, cache).SetupWithManager(mgr) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = kueuectrl.NewResourceFlavorReconciler(cache).SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = workloadjob.NewReconciler(mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor(constants.JobControllerName)).SetupWithManager(mgr) gomega.Expect(err).NotTo(gomega.HaveOccurred())