diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 65b2cccba9..2081cbf98c 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -62,6 +62,10 @@ type Configuration struct { // Integrations provide configuration options for AI/ML/Batch frameworks // integrations (including K8S job). Integrations *Integrations `json:"integrations,omitempty"` + + // QueueVisibility is configuration to expose the information about the top + // pending workloads. + QueueVisibility *QueueVisibility `json:"queueVisibility,omitempty"` } type ControllerManager struct { @@ -226,3 +230,24 @@ type Integrations struct { // - "kubeflow.org/tfjob" Frameworks []string `json:"frameworks,omitempty"` } + +type QueueVisibility struct { + // ClusterQueues is configuration to expose the information + // about the top pending workloads in the cluster queue. + ClusterQueues *ClusterQueueVisibility `json:"clusterQueues,omitempty"` + + // UpdateIntervalSeconds specifies the time interval for updates to the structure + // of the top pending workloads in the queues. + // The minimum value is 1. + // Defaults to 5. + UpdateIntervalSeconds int32 `json:"updateIntervalSeconds,omitempty"` +} + +type ClusterQueueVisibility struct { + // MaxCount indicates the maximal number of pending workloads exposed in the + // cluster queue status. When the value is set to 0, then ClusterQueue + // visibility updates are disabled. + // The maximal value is 4000. + // Defaults to 10. + MaxCount int32 `json:"maxCount,omitempty"` +} diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index 02336f2dcd..b1cf6a0b15 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -29,16 +29,18 @@ import ( ) const ( - DefaultNamespace = "kueue-system" - DefaultWebhookServiceName = "kueue-webhook-service" - DefaultWebhookSecretName = "kueue-webhook-server-cert" - DefaultWebhookPort = 9443 - DefaultHealthProbeBindAddress = ":8081" - DefaultMetricsBindAddress = ":8080" - DefaultLeaderElectionID = "c1f6bfd2.kueue.x-k8s.io" - DefaultClientConnectionQPS float32 = 20.0 - DefaultClientConnectionBurst int32 = 30 - defaultPodsReadyTimeout = 5 * time.Minute + DefaultNamespace = "kueue-system" + DefaultWebhookServiceName = "kueue-webhook-service" + DefaultWebhookSecretName = "kueue-webhook-server-cert" + DefaultWebhookPort = 9443 + DefaultHealthProbeBindAddress = ":8081" + DefaultMetricsBindAddress = ":8080" + DefaultLeaderElectionID = "c1f6bfd2.kueue.x-k8s.io" + DefaultClientConnectionQPS float32 = 20.0 + DefaultClientConnectionBurst int32 = 30 + defaultPodsReadyTimeout = 5 * time.Minute + DefaultQueueVisibilityUpdateIntervalSeconds int32 = 5 + DefaultClusterQueuesMaxCount int32 = 10 ) func addDefaultingFuncs(scheme *runtime.Scheme) error { @@ -116,4 +118,15 @@ func SetDefaults_Configuration(cfg *Configuration) { if cfg.Integrations.Frameworks == nil { cfg.Integrations.Frameworks = []string{job.FrameworkName} } + if cfg.QueueVisibility == nil { + cfg.QueueVisibility = &QueueVisibility{} + } + if cfg.QueueVisibility.UpdateIntervalSeconds == 0 { + cfg.QueueVisibility.UpdateIntervalSeconds = DefaultQueueVisibilityUpdateIntervalSeconds + } + if cfg.QueueVisibility.ClusterQueues == nil { + cfg.QueueVisibility.ClusterQueues = &ClusterQueueVisibility{ + MaxCount: DefaultClusterQueuesMaxCount, + } + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 0c79f73dce..b2ef277ea0 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -55,6 +55,12 @@ func TestSetDefaults_Configuration(t *testing.T) { defaultIntegrations := &Integrations{ Frameworks: []string{job.FrameworkName}, } + defaultQueueVisibility := &QueueVisibility{ + UpdateIntervalSeconds: DefaultQueueVisibilityUpdateIntervalSeconds, + ClusterQueues: &ClusterQueueVisibility{ + MaxCount: 10, + }, + } podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute} @@ -76,6 +82,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "defaulting ControllerManager": { @@ -111,6 +118,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "should not default ControllerManager": { @@ -133,7 +141,8 @@ func TestSetDefaults_Configuration(t *testing.T) { InternalCertManagement: &InternalCertManagement{ Enable: ptr.To(false), }, - Integrations: defaultIntegrations, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, want: &Configuration{ Namespace: ptr.To(DefaultNamespace), @@ -157,6 +166,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "should not set LeaderElectionID": { @@ -191,6 +201,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "defaulting InternalCertManagement": { @@ -207,6 +218,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "should not default InternalCertManagement": { @@ -224,6 +236,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "should not default values in custom ClientConnection": { @@ -247,7 +260,8 @@ func TestSetDefaults_Configuration(t *testing.T) { QPS: ptr.To[float32](123.0), Burst: ptr.To[int32](456), }, - Integrations: defaultIntegrations, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "should default empty custom ClientConnection": { @@ -266,6 +280,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "defaulting waitForPodsReady.timeout": { @@ -290,6 +305,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "set waitForPodsReady.blockAdmission to false when enable is false": { @@ -314,6 +330,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "respecting provided waitForPodsReady.timeout": { @@ -339,6 +356,7 @@ func TestSetDefaults_Configuration(t *testing.T) { }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, }, "integrations": { @@ -360,6 +378,35 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: &Integrations{ Frameworks: []string{"a", "b"}, }, + QueueVisibility: defaultQueueVisibility, + }, + }, + "queue visibility": { + original: &Configuration{ + InternalCertManagement: &InternalCertManagement{ + Enable: ptr.To(false), + }, + QueueVisibility: &QueueVisibility{ + UpdateIntervalSeconds: 10, + ClusterQueues: &ClusterQueueVisibility{ + MaxCount: 0, + }, + }, + }, + want: &Configuration{ + Namespace: ptr.To(DefaultNamespace), + ControllerManager: defaultCtrlManagerConfigurationSpec, + InternalCertManagement: &InternalCertManagement{ + Enable: ptr.To(false), + }, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: &QueueVisibility{ + UpdateIntervalSeconds: 10, + ClusterQueues: &ClusterQueueVisibility{ + MaxCount: 0, + }, + }, }, }, } diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 12bc37a5a6..62b9bcde31 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -53,6 +53,21 @@ func (in *ClientConnection) DeepCopy() *ClientConnection { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterQueueVisibility) DeepCopyInto(out *ClusterQueueVisibility) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueVisibility. +func (in *ClusterQueueVisibility) DeepCopy() *ClusterQueueVisibility { + if in == nil { + return nil + } + out := new(ClusterQueueVisibility) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Configuration) DeepCopyInto(out *Configuration) { *out = *in @@ -83,6 +98,11 @@ func (in *Configuration) DeepCopyInto(out *Configuration) { *out = new(Integrations) (*in).DeepCopyInto(*out) } + if in.QueueVisibility != nil { + in, out := &in.QueueVisibility, &out.QueueVisibility + *out = new(QueueVisibility) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration. @@ -258,6 +278,26 @@ func (in *InternalCertManagement) DeepCopy() *InternalCertManagement { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *QueueVisibility) DeepCopyInto(out *QueueVisibility) { + *out = *in + if in.ClusterQueues != nil { + in, out := &in.ClusterQueues, &out.ClusterQueues + *out = new(ClusterQueueVisibility) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueVisibility. +func (in *QueueVisibility) DeepCopy() *QueueVisibility { + if in == nil { + return nil + } + out := new(QueueVisibility) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WaitForPodsReady) DeepCopyInto(out *WaitForPodsReady) { *out = *in diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 22bdaa73a5..20cacab21d 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -208,6 +208,31 @@ type ClusterQueueStatus struct { // +patchStrategy=merge // +patchMergeKey=type Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` + + // PendingWorkloadsStatus contains the information exposed about the current + // status of the pending workloads in the cluster queue. + // +optional + PendingWorkloadsStatus *ClusterQueuePendingWorkloadsStatus `json:"pendingWorkloadsStatus"` +} + +type ClusterQueuePendingWorkloadsStatus struct { + // Head contains the list of top pending workloads. + // +listType=atomic + // +optional + Head []ClusterQueuePendingWorkload `json:"clusterQueuePendingWorkload"` + + // LastChangeTime indicates the time of the last change of the structure. + LastChangeTime metav1.Time `json:"lastChangeTime"` +} + +// ClusterQueuePendingWorkload contains the information identifying a pending workload +// in the cluster queue. +type ClusterQueuePendingWorkload struct { + // Name indicates the name of the pending workload. + Name string `json:"name"` + + // Namespace indicates the name of the pending workload. + Namespace string `json:"namespace"` } type FlavorUsage struct { diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index a77fdee540..ce250d0d02 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -222,6 +222,42 @@ func (in *ClusterQueueList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterQueuePendingWorkload) DeepCopyInto(out *ClusterQueuePendingWorkload) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueuePendingWorkload. +func (in *ClusterQueuePendingWorkload) DeepCopy() *ClusterQueuePendingWorkload { + if in == nil { + return nil + } + out := new(ClusterQueuePendingWorkload) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterQueuePendingWorkloadsStatus) DeepCopyInto(out *ClusterQueuePendingWorkloadsStatus) { + *out = *in + if in.Head != nil { + in, out := &in.Head, &out.Head + *out = make([]ClusterQueuePendingWorkload, len(*in)) + copy(*out, *in) + } + in.LastChangeTime.DeepCopyInto(&out.LastChangeTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueuePendingWorkloadsStatus. +func (in *ClusterQueuePendingWorkloadsStatus) DeepCopy() *ClusterQueuePendingWorkloadsStatus { + if in == nil { + return nil + } + out := new(ClusterQueuePendingWorkloadsStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterQueuePreemption) DeepCopyInto(out *ClusterQueuePreemption) { *out = *in @@ -291,6 +327,11 @@ func (in *ClusterQueueStatus) DeepCopyInto(out *ClusterQueueStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.PendingWorkloadsStatus != nil { + in, out := &in.PendingWorkloadsStatus, &out.PendingWorkloadsStatus + *out = new(ClusterQueuePendingWorkloadsStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueStatus. diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index e5e3ee447d..3fc4ce14ac 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -440,6 +440,38 @@ spec: waiting to be admitted to this clusterQueue. format: int32 type: integer + pendingWorkloadsStatus: + description: PendingWorkloadsStatus contains the information exposed + about the current status of the pending workloads in the cluster + queue. + properties: + clusterQueuePendingWorkload: + description: Head contains the list of top pending workloads. + items: + description: ClusterQueuePendingWorkload contains the information + identifying a pending workload in the cluster queue. + properties: + name: + description: Name indicates the name of the pending workload. + type: string + namespace: + description: Namespace indicates the name of the pending + workload. + type: string + required: + - name + - namespace + type: object + type: array + x-kubernetes-list-type: atomic + lastChangeTime: + description: LastChangeTime indicates the time of the last change + of the structure. + format: date-time + type: string + required: + - lastChangeTime + type: object type: object type: object served: true diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkload.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkload.go new file mode 100644 index 0000000000..c875452d74 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkload.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +// ClusterQueuePendingWorkloadApplyConfiguration represents an declarative configuration of the ClusterQueuePendingWorkload type for use +// with apply. +type ClusterQueuePendingWorkloadApplyConfiguration struct { + Name *string `json:"name,omitempty"` + Namespace *string `json:"namespace,omitempty"` +} + +// ClusterQueuePendingWorkloadApplyConfiguration constructs an declarative configuration of the ClusterQueuePendingWorkload type for use with +// apply. +func ClusterQueuePendingWorkload() *ClusterQueuePendingWorkloadApplyConfiguration { + return &ClusterQueuePendingWorkloadApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *ClusterQueuePendingWorkloadApplyConfiguration) WithName(value string) *ClusterQueuePendingWorkloadApplyConfiguration { + b.Name = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *ClusterQueuePendingWorkloadApplyConfiguration) WithNamespace(value string) *ClusterQueuePendingWorkloadApplyConfiguration { + b.Namespace = &value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkloadsstatus.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkloadsstatus.go new file mode 100644 index 0000000000..be6ee38a79 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuependingworkloadsstatus.go @@ -0,0 +1,56 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ClusterQueuePendingWorkloadsStatusApplyConfiguration represents an declarative configuration of the ClusterQueuePendingWorkloadsStatus type for use +// with apply. +type ClusterQueuePendingWorkloadsStatusApplyConfiguration struct { + Head []ClusterQueuePendingWorkloadApplyConfiguration `json:"clusterQueuePendingWorkload,omitempty"` + LastChangeTime *v1.Time `json:"lastChangeTime,omitempty"` +} + +// ClusterQueuePendingWorkloadsStatusApplyConfiguration constructs an declarative configuration of the ClusterQueuePendingWorkloadsStatus type for use with +// apply. +func ClusterQueuePendingWorkloadsStatus() *ClusterQueuePendingWorkloadsStatusApplyConfiguration { + return &ClusterQueuePendingWorkloadsStatusApplyConfiguration{} +} + +// WithHead adds the given value to the Head field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Head field. +func (b *ClusterQueuePendingWorkloadsStatusApplyConfiguration) WithHead(values ...*ClusterQueuePendingWorkloadApplyConfiguration) *ClusterQueuePendingWorkloadsStatusApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithHead") + } + b.Head = append(b.Head, *values[i]) + } + return b +} + +// WithLastChangeTime sets the LastChangeTime field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the LastChangeTime field is set to the value of the last call. +func (b *ClusterQueuePendingWorkloadsStatusApplyConfiguration) WithLastChangeTime(value v1.Time) *ClusterQueuePendingWorkloadsStatusApplyConfiguration { + b.LastChangeTime = &value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go index 921faec474..3992496da6 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuestatus.go @@ -24,10 +24,11 @@ import ( // ClusterQueueStatusApplyConfiguration represents an declarative configuration of the ClusterQueueStatus type for use // with apply. type ClusterQueueStatusApplyConfiguration struct { - FlavorsUsage []FlavorUsageApplyConfiguration `json:"flavorsUsage,omitempty"` - PendingWorkloads *int32 `json:"pendingWorkloads,omitempty"` - AdmittedWorkloads *int32 `json:"admittedWorkloads,omitempty"` - Conditions []v1.Condition `json:"conditions,omitempty"` + FlavorsUsage []FlavorUsageApplyConfiguration `json:"flavorsUsage,omitempty"` + PendingWorkloads *int32 `json:"pendingWorkloads,omitempty"` + AdmittedWorkloads *int32 `json:"admittedWorkloads,omitempty"` + Conditions []v1.Condition `json:"conditions,omitempty"` + PendingWorkloadsStatus *ClusterQueuePendingWorkloadsStatusApplyConfiguration `json:"pendingWorkloadsStatus,omitempty"` } // ClusterQueueStatusApplyConfiguration constructs an declarative configuration of the ClusterQueueStatus type for use with @@ -74,3 +75,11 @@ func (b *ClusterQueueStatusApplyConfiguration) WithConditions(values ...v1.Condi } return b } + +// WithPendingWorkloadsStatus sets the PendingWorkloadsStatus field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the PendingWorkloadsStatus field is set to the value of the last call. +func (b *ClusterQueueStatusApplyConfiguration) WithPendingWorkloadsStatus(value *ClusterQueuePendingWorkloadsStatusApplyConfiguration) *ClusterQueueStatusApplyConfiguration { + b.PendingWorkloadsStatus = value + return b +} diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index cc1c92e631..ab629db4ed 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -32,6 +32,10 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): return &kueuev1beta1.ClusterQueueApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("ClusterQueuePendingWorkload"): + return &kueuev1beta1.ClusterQueuePendingWorkloadApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("ClusterQueuePendingWorkloadsStatus"): + return &kueuev1beta1.ClusterQueuePendingWorkloadsStatusApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueuePreemption"): return &kueuev1beta1.ClusterQueuePreemptionApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueueSpec"): diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index dc75621fd4..afddd3d816 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -99,6 +99,12 @@ integrations: // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, }, + QueueVisibility: &config.QueueVisibility{ + UpdateIntervalSeconds: config.DefaultQueueVisibilityUpdateIntervalSeconds, + ClusterQueues: &config.ClusterQueueVisibility{ + MaxCount: config.DefaultClusterQueuesMaxCount, + }, + }, }, }, { diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index dbca92eedd..b8713d5364 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -427,6 +427,38 @@ spec: waiting to be admitted to this clusterQueue. format: int32 type: integer + pendingWorkloadsStatus: + description: PendingWorkloadsStatus contains the information exposed + about the current status of the pending workloads in the cluster + queue. + properties: + clusterQueuePendingWorkload: + description: Head contains the list of top pending workloads. + items: + description: ClusterQueuePendingWorkload contains the information + identifying a pending workload in the cluster queue. + properties: + name: + description: Name indicates the name of the pending workload. + type: string + namespace: + description: Namespace indicates the name of the pending + workload. + type: string + required: + - name + - namespace + type: object + type: array + x-kubernetes-list-type: atomic + lastChangeTime: + description: LastChangeTime indicates the time of the last change + of the structure. + format: date-time + type: string + required: + - lastChangeTime + type: object type: object type: object served: true diff --git a/pkg/config/config.go b/pkg/config/config.go index c8d9f7b51e..ef920014f3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -162,6 +162,9 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co return options, cfg, err } } + if err = validate(&cfg).ToAggregate(); err != nil { + return options, cfg, err + } addTo(&options, &cfg) return options, cfg, err } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index f9daeb57f8..45141b9c8b 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -222,6 +222,17 @@ integrations: `), os.FileMode(0600)); err != nil { t.Fatal(err) } + queueVisibilityConfig := filepath.Join(tmpDir, "queueVisibility.yaml") + if err := os.WriteFile(queueVisibilityConfig, []byte(` +apiVersion: config.kueue.x-k8s.io/v1beta1 +kind: Configuration +queueVisibility: + updateIntervalSeconds: 10 + clusterQueues: + maxCount: 0 +`), os.FileMode(0600)); err != nil { + t.Fatal(err) + } defaultControlOptions := ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -262,6 +273,13 @@ integrations: Frameworks: []string{job.FrameworkName}, } + defaultQueueVisibility := &configapi.QueueVisibility{ + UpdateIntervalSeconds: configapi.DefaultQueueVisibilityUpdateIntervalSeconds, + ClusterQueues: &configapi.ClusterQueueVisibility{ + MaxCount: 10, + }, + } + testcases := []struct { name string configFile string @@ -277,6 +295,7 @@ integrations: InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -312,6 +331,7 @@ integrations: InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: defaultControlOptions, }, @@ -328,6 +348,7 @@ integrations: InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: ":38081", @@ -358,6 +379,7 @@ integrations: }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: defaultControlOptions, }, @@ -376,6 +398,7 @@ integrations: }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: defaultControlOptions, }, @@ -392,6 +415,7 @@ integrations: InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -423,6 +447,7 @@ integrations: }, ClientConnection: defaultClientConnection, Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -449,7 +474,8 @@ integrations: QPS: ptr.To[float32](50), Burst: ptr.To[int32](100), }, - Integrations: defaultIntegrations, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: defaultControlOptions, }, @@ -468,7 +494,8 @@ integrations: QPS: ptr.To[float32](50), Burst: ptr.To[int32](100), }, - Integrations: defaultIntegrations, + Integrations: defaultIntegrations, + QueueVisibility: defaultQueueVisibility, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -515,6 +542,37 @@ integrations: // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, }, + QueueVisibility: defaultQueueVisibility, + }, + wantOptions: ctrl.Options{ + HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, + MetricsBindAddress: configapi.DefaultMetricsBindAddress, + WebhookServer: &webhook.DefaultServer{ + Options: webhook.Options{ + Port: configapi.DefaultWebhookPort, + }, + }, + }, + }, + { + name: "queue visibility config", + configFile: queueVisibilityConfig, + wantConfiguration: configapi.Configuration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: configapi.GroupVersion.String(), + Kind: "Configuration", + }, + Namespace: ptr.To(configapi.DefaultNamespace), + ManageJobsWithoutQueueName: false, + InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, + Integrations: defaultIntegrations, + QueueVisibility: &configapi.QueueVisibility{ + UpdateIntervalSeconds: 10, + ClusterQueues: &configapi.ClusterQueueVisibility{ + MaxCount: 0, + }, + }, }, wantOptions: ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -610,6 +668,10 @@ func TestEncode(t *testing.T) { "integrations": map[string]any{ "frameworks": []any{"batch/job"}, }, + "queueVisibility": map[string]any{ + "updateIntervalSeconds": int64(configapi.DefaultQueueVisibilityUpdateIntervalSeconds), + "clusterQueues": map[string]any{"maxCount": int64(10)}, + }, }, }, } diff --git a/pkg/config/validation.go b/pkg/config/validation.go new file mode 100644 index 0000000000..1204cceb9b --- /dev/null +++ b/pkg/config/validation.go @@ -0,0 +1,31 @@ +package config + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/validation/field" + + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" +) + +const ( + queueVisibilityClusterQueuesMaxValue = 4000 + queueVisibilityClusterQueuesUpdateIntervalSeconds = 1 +) + +func validate(cfg *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + if cfg.QueueVisibility != nil { + queueVisibilityPath := field.NewPath("queueVisibility") + if cfg.QueueVisibility.ClusterQueues != nil { + clusterQueues := queueVisibilityPath.Child("clusterQueues") + if cfg.QueueVisibility.ClusterQueues.MaxCount > queueVisibilityClusterQueuesMaxValue { + allErrs = append(allErrs, field.Invalid(clusterQueues.Child("maxCount"), cfg.QueueVisibility.ClusterQueues.MaxCount, fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue))) + } + } + if cfg.QueueVisibility.UpdateIntervalSeconds < queueVisibilityClusterQueuesUpdateIntervalSeconds { + allErrs = append(allErrs, field.Invalid(queueVisibilityPath.Child("updateIntervalSeconds"), cfg.QueueVisibility.UpdateIntervalSeconds, fmt.Sprintf("greater than or equal to %d", queueVisibilityClusterQueuesUpdateIntervalSeconds))) + } + } + return allErrs +} diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go new file mode 100644 index 0000000000..afd6eb2227 --- /dev/null +++ b/pkg/config/validation_test.go @@ -0,0 +1,59 @@ +package config + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/util/validation/field" + + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" +) + +func TestValidate(t *testing.T) { + testcases := []struct { + name string + cfg *configapi.Configuration + wantErr field.ErrorList + }{ + + { + name: "empty", + cfg: &configapi.Configuration{}, + wantErr: nil, + }, + { + name: "invalid queue visibility UpdateIntervalSeconds", + cfg: &configapi.Configuration{ + QueueVisibility: &configapi.QueueVisibility{ + UpdateIntervalSeconds: 0, + }, + }, + wantErr: field.ErrorList{ + field.Invalid(field.NewPath("queueVisibility").Child("updateIntervalSeconds"), 0, fmt.Sprintf("greater than or equal to %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)), + }, + }, + { + name: "invalid queue visibility cluster queue max count", + cfg: &configapi.Configuration{ + QueueVisibility: &configapi.QueueVisibility{ + ClusterQueues: &configapi.ClusterQueueVisibility{ + MaxCount: 4001, + }, + UpdateIntervalSeconds: 1, + }, + }, + wantErr: field.ErrorList{ + field.Invalid(field.NewPath("queueVisibility").Child("clusterQueues").Child("maxCount"), 4001, fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue)), + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" { + t.Errorf("Unexpected returned error (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 1af0fbe9b2..c8af28e9aa 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -18,6 +18,7 @@ package core import ( "context" + "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -26,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -45,38 +47,89 @@ import ( "sigs.k8s.io/kueue/pkg/workload" ) +const snapshotWorkers = 5 + type ClusterQueueUpdateWatcher interface { NotifyClusterQueueUpdate(*kueue.ClusterQueue, *kueue.ClusterQueue) } // ClusterQueueReconciler reconciles a ClusterQueue object type ClusterQueueReconciler struct { - client client.Client - log logr.Logger - qManager *queue.Manager - cache *cache.Cache - wlUpdateCh chan event.GenericEvent - rfUpdateCh chan event.GenericEvent - watchers []ClusterQueueUpdateWatcher - reportResourceMetrics bool + client client.Client + log logr.Logger + qManager *queue.Manager + cache *cache.Cache + snapshotsQueue workqueue.Interface + wlUpdateCh chan event.GenericEvent + rfUpdateCh chan event.GenericEvent + watchers []ClusterQueueUpdateWatcher + reportResourceMetrics bool + queueVisibilityUpdateInterval time.Duration + queueVisibilityClusterQueuesMaxCount int32 +} + +type ClusterQueueReconcilerOptions struct { + Watchers []ClusterQueueUpdateWatcher + ReportResourceMetrics bool + QueueVisibilityUpdateInterval time.Duration + QueueVisibilityClusterQueuesMaxCount int32 +} + +// Option configures the reconciler. +type ClusterQueueReconcilerOption func(*ClusterQueueReconcilerOptions) + +func WithWatchers(watchers ...ClusterQueueUpdateWatcher) ClusterQueueReconcilerOption { + return func(o *ClusterQueueReconcilerOptions) { + o.Watchers = watchers + } +} + +func WithReportResourceMetrics(report bool) ClusterQueueReconcilerOption { + return func(o *ClusterQueueReconcilerOptions) { + o.ReportResourceMetrics = report + } +} + +// WithQueueVisibilityUpdateInterval specifies the time interval for updates to the structure +// of the top pending workloads in the queues. +func WithQueueVisibilityUpdateInterval(interval time.Duration) ClusterQueueReconcilerOption { + return func(o *ClusterQueueReconcilerOptions) { + o.QueueVisibilityUpdateInterval = interval + } } +// WithQueueVisibilityClusterQueuesMaxCount indicates the maximal number of pending workloads exposed in the +// cluster queue status +func WithQueueVisibilityClusterQueuesMaxCount(value int32) ClusterQueueReconcilerOption { + return func(o *ClusterQueueReconcilerOptions) { + o.QueueVisibilityClusterQueuesMaxCount = value + } +} + +var DefaultOptions = ClusterQueueReconcilerOptions{} + func NewClusterQueueReconciler( client client.Client, qMgr *queue.Manager, cache *cache.Cache, - resourceMetrics bool, - watchers ...ClusterQueueUpdateWatcher, + opts ...ClusterQueueReconcilerOption, ) *ClusterQueueReconciler { + options := DefaultOptions + for _, opt := range opts { + opt(&options) + } return &ClusterQueueReconciler{ - client: client, - log: ctrl.Log.WithName("cluster-queue-reconciler"), - qManager: qMgr, - cache: cache, - wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), - rfUpdateCh: make(chan event.GenericEvent, updateChBuffer), - watchers: watchers, - reportResourceMetrics: resourceMetrics, + client: client, + log: ctrl.Log.WithName("cluster-queue-reconciler"), + qManager: qMgr, + cache: cache, + snapshotsQueue: workqueue.New(), + wlUpdateCh: make(chan event.GenericEvent, updateChBuffer), + rfUpdateCh: make(chan event.GenericEvent, updateChBuffer), + watchers: options.Watchers, + reportResourceMetrics: options.ReportResourceMetrics, + queueVisibilityUpdateInterval: options.QueueVisibilityUpdateInterval, + queueVisibilityClusterQueuesMaxCount: options.QueueVisibilityClusterQueuesMaxCount, } } @@ -218,6 +271,7 @@ func (r *ClusterQueueReconciler) Delete(e event.DeleteEvent) bool { r.log.V(2).Info("ClusterQueue delete event", "clusterQueue", klog.KObj(cq)) r.cache.DeleteClusterQueue(cq) r.qManager.DeleteClusterQueue(cq) + r.qManager.DeleteSnapshot(cq) metrics.ClearClusterQueueResourceMetrics(cq.Name) r.log.V(2).Info("Cleared resource metrics for deleted ClusterQueue.", "clusterQueue", klog.KObj(cq)) @@ -486,6 +540,7 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged( cq.Status.FlavorsUsage = usage cq.Status.AdmittedWorkloads = int32(workloads) cq.Status.PendingWorkloads = int32(pendingWorkloads) + cq.Status.PendingWorkloadsStatus = r.getWorkloadsStatus(cq) meta.SetStatusCondition(&cq.Status.Conditions, metav1.Condition{ Type: kueue.ClusterQueueActive, Status: conditionStatus, @@ -497,3 +552,77 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged( } return nil } + +// Taking snapshot of cluster queue is enabled when maxcount non-zero +func (r *ClusterQueueReconciler) isVisibilityEnabled() bool { + return r.queueVisibilityClusterQueuesMaxCount > 0 +} + +func (r *ClusterQueueReconciler) getWorkloadsStatus(cq *kueue.ClusterQueue) *kueue.ClusterQueuePendingWorkloadsStatus { + if !r.isVisibilityEnabled() { + return nil + } + if cq.Status.PendingWorkloadsStatus == nil { + return &kueue.ClusterQueuePendingWorkloadsStatus{ + Head: r.qManager.GetSnapshot(cq.Name), + LastChangeTime: metav1.Time{Time: time.Now()}, + } + } + if time.Since(cq.Status.PendingWorkloadsStatus.LastChangeTime.Time) >= r.queueVisibilityUpdateInterval { + pendingWorkloads := r.qManager.GetSnapshot(cq.Name) + if !equality.Semantic.DeepEqual(cq.Status.PendingWorkloadsStatus.Head, pendingWorkloads) { + return &kueue.ClusterQueuePendingWorkloadsStatus{ + Head: pendingWorkloads, + LastChangeTime: metav1.Time{Time: time.Now()}, + } + } + } + return cq.Status.PendingWorkloadsStatus +} + +func (r *ClusterQueueReconciler) Start(ctx context.Context) error { + if !r.isVisibilityEnabled() { + return nil + } + + defer r.snapshotsQueue.ShutDown() + + for i := 0; i < snapshotWorkers; i++ { + go wait.UntilWithContext(ctx, r.takeSnapshot, r.queueVisibilityUpdateInterval) + } + + go wait.UntilWithContext(ctx, r.enqueueTakeSnapshot, r.queueVisibilityUpdateInterval) + + <-ctx.Done() + + return nil +} + +func (r *ClusterQueueReconciler) enqueueTakeSnapshot(ctx context.Context) { + for _, cq := range r.qManager.GetClusterQueueNames() { + r.snapshotsQueue.Add(cq) + } +} + +func (r *ClusterQueueReconciler) takeSnapshot(ctx context.Context) { + for r.processNextSnapshot(ctx) { + } +} + +func (r *ClusterQueueReconciler) processNextSnapshot(ctx context.Context) bool { + log := ctrl.LoggerFrom(ctx).WithName("processNextSnapshot") + + key, quit := r.snapshotsQueue.Get() + if quit { + return false + } + + startTime := time.Now() + defer func() { + log.V(5).Info("Finished snapshot job", "key", key, "elapsed", time.Since(startTime)) + }() + + defer r.snapshotsQueue.Done(key) + r.qManager.UpdateSnapshot(key.(string), r.queueVisibilityClusterQueuesMaxCount) + return true +} diff --git a/pkg/controller/core/clusterqueue_controller_test.go b/pkg/controller/core/clusterqueue_controller_test.go index 4a28c43cc2..72d19c1dca 100644 --- a/pkg/controller/core/clusterqueue_controller_test.go +++ b/pkg/controller/core/clusterqueue_controller_test.go @@ -17,13 +17,16 @@ limitations under the License. package core import ( + "context" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -200,9 +203,12 @@ func TestUpdateCqStatusIfChanged(t *testing.T) { if err != nil { t.Errorf("Updating ClusterQueueStatus: %v", err) } - if diff := cmp.Diff(tc.wantCqStatus, cq.Status, + configCmpOpts := []cmp.Option{ cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), - cmpopts.EquateEmpty()); len(diff) != 0 { + cmpopts.IgnoreFields(kueue.ClusterQueuePendingWorkloadsStatus{}, "LastChangeTime"), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(tc.wantCqStatus, cq.Status, configCmpOpts...); len(diff) != 0 { t.Errorf("unexpected ClusterQueueStatus (-want,+got):\n%s", diff) } }) @@ -478,3 +484,82 @@ func TestRecordResourceMetrics(t *testing.T) { }) } } + +func TestClusterQueuePendingWorkloadsStatus(t *testing.T) { + cqName := "test-cq" + lqName := "test-lq" + const lowPrio, highPrio = 0, 100 + defaultWls := &kueue.WorkloadList{ + Items: []kueue.Workload{ + *utiltesting.MakeWorkload("one", "").Queue(lqName).Priority(highPrio).Obj(), + *utiltesting.MakeWorkload("two", "").Queue(lqName).Priority(lowPrio).Obj(), + }, + } + testCases := map[string]struct { + queueVisibilityUpdateInterval time.Duration + queueVisibilityClusterQueuesMaxCount int32 + wantPendingWorkloadsStatus *kueue.ClusterQueuePendingWorkloadsStatus + }{ + "taking snapshot of cluster queue is disabled": {}, + "taking snapshot of cluster queue is enabled": { + queueVisibilityClusterQueuesMaxCount: 2, + queueVisibilityUpdateInterval: 10 * time.Millisecond, + wantPendingWorkloadsStatus: &kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{ + {Name: "one"}, {Name: "two"}, + }, + }, + }, + "verify the head of pending workloads when the number of pending workloads exceeds MaxCount": { + queueVisibilityClusterQueuesMaxCount: 1, + queueVisibilityUpdateInterval: 10 * time.Millisecond, + wantPendingWorkloadsStatus: &kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{ + {Name: "one"}, + }, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + cq := utiltesting.MakeClusterQueue(cqName). + QueueingStrategy(kueue.StrictFIFO).Obj() + lq := utiltesting.MakeLocalQueue(lqName, ""). + ClusterQueue(cqName).Obj() + ctx := context.Background() + + cl := utiltesting.NewClientBuilder().WithLists(defaultWls).WithObjects(lq, cq).WithStatusSubresource(lq, cq). + Build() + cCache := cache.New(cl) + qManager := queue.NewManager(cl, cCache) + if err := qManager.AddClusterQueue(ctx, cq); err != nil { + t.Fatalf("Inserting clusterQueue in manager: %v", err) + } + if err := qManager.AddLocalQueue(ctx, lq); err != nil { + t.Fatalf("Inserting localQueue in manager: %v", err) + } + + r := NewClusterQueueReconciler( + cl, + qManager, + cCache, + WithQueueVisibilityUpdateInterval(tc.queueVisibilityUpdateInterval), + WithQueueVisibilityClusterQueuesMaxCount(tc.queueVisibilityClusterQueuesMaxCount), + ) + + go func() { + if err := r.Start(ctx); err != nil { + t.Errorf("error starting the cluster queue reconciler: %v", err) + } + }() + + diff := "" + if err := wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) { + diff = cmp.Diff(tc.wantPendingWorkloadsStatus, r.getWorkloadsStatus(cq), cmpopts.IgnoreFields(kueue.ClusterQueuePendingWorkloadsStatus{}, "LastChangeTime")) + return diff == "", nil + }); err != nil { + t.Fatalf("Failed to get the expected pending workloads status, last diff=%s", diff) + } + }) + } +} diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index ae96536811..138d5c76bb 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -40,7 +40,18 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache return "LocalQueue", err } - cqRec := NewClusterQueueReconciler(mgr.GetClient(), qManager, cc, cfg.Metrics.EnableClusterQueueResources, rfRec) + cqRec := NewClusterQueueReconciler( + mgr.GetClient(), + qManager, + cc, + WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)), + WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)), + WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources), + WithWatchers(rfRec), + ) + if err := mgr.Add(cqRec); err != nil { + return "Unable to add ClusterQueue to manager", err + } rfRec.AddUpdateWatcher(cqRec) if err := cqRec.SetupWithManager(mgr); err != nil { return "ClusterQueue", err @@ -57,3 +68,17 @@ func podsReadyTimeout(cfg *config.Configuration) *time.Duration { } return nil } + +func queueVisibilityUpdateInterval(cfg *config.Configuration) time.Duration { + if cfg.QueueVisibility != nil { + return time.Duration(cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second + } + return 0 +} + +func queueVisibilityClusterQueuesMaxCount(cfg *config.Configuration) int32 { + if cfg.QueueVisibility != nil && cfg.QueueVisibility.ClusterQueues != nil { + return cfg.QueueVisibility.ClusterQueues.MaxCount + } + return 0 +} diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index 0769dc13f6..1a4170a66d 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -18,6 +18,8 @@ package queue import ( "context" + "sort" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -51,6 +53,8 @@ type clusterQueueBase struct { // queueInadmissibleCycle stores the popId at the time when // QueueInadmissibleWorkloads is called. queueInadmissibleCycle int64 + + rwm sync.RWMutex } func newClusterQueueImpl(keyFunc func(obj interface{}) string, lessFunc func(a, b interface{}) bool) *clusterQueueBase { @@ -58,6 +62,7 @@ func newClusterQueueImpl(keyFunc func(obj interface{}) string, lessFunc func(a, heap: heap.New(keyFunc, lessFunc), inadmissibleWorkloads: make(map[string]*workload.Info), queueInadmissibleCycle: -1, + rwm: sync.RWMutex{}, } } @@ -76,6 +81,8 @@ func (c *clusterQueueBase) Cohort() string { } func (c *clusterQueueBase) AddFromLocalQueue(q *LocalQueue) bool { + c.rwm.Lock() + defer c.rwm.Unlock() added := false for _, info := range q.items { if c.heap.PushIfNotPresent(info) { @@ -86,6 +93,8 @@ func (c *clusterQueueBase) AddFromLocalQueue(q *LocalQueue) bool { } func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) { + c.rwm.Lock() + defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) oldInfo := c.inadmissibleWorkloads[key] if oldInfo != nil { @@ -111,6 +120,8 @@ func (c *clusterQueueBase) Delete(w *kueue.Workload) { } func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) { + c.rwm.Lock() + defer c.rwm.Unlock() for _, w := range q.items { key := workload.Key(w.Obj) if wl := c.inadmissibleWorkloads[key]; wl != nil { @@ -128,6 +139,8 @@ func (c *clusterQueueBase) DeleteFromLocalQueue(q *LocalQueue) { // the workload will be pushed back to heap directly. Otherwise, the workload // will be put into the inadmissibleWorkloads. func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate bool) bool { + c.rwm.Lock() + defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) if immediate || c.queueInadmissibleCycle >= c.popCycle { // If the workload was inadmissible, move it back into the queue. @@ -155,6 +168,8 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b // QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. // If at least one workload is moved, returns true. Otherwise returns false. func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool { + c.rwm.Lock() + defer c.rwm.Unlock() c.queueInadmissibleCycle = c.popCycle if len(c.inadmissibleWorkloads) == 0 { return false @@ -177,6 +192,8 @@ func (c *clusterQueueBase) QueueInadmissibleWorkloads(ctx context.Context, clien } func (c *clusterQueueBase) Pending() int { + c.rwm.RLock() + defer c.rwm.RUnlock() return c.PendingActive() + c.PendingInadmissible() } @@ -189,6 +206,8 @@ func (c *clusterQueueBase) PendingInadmissible() int { } func (c *clusterQueueBase) Pop() *workload.Info { + c.rwm.Lock() + defer c.rwm.Unlock() c.popCycle++ if c.heap.Len() == 0 { return nil @@ -199,6 +218,8 @@ func (c *clusterQueueBase) Pop() *workload.Info { } func (c *clusterQueueBase) Dump() (sets.Set[string], bool) { + c.rwm.RLock() + defer c.rwm.RUnlock() if c.heap.Len() == 0 { return nil, false } @@ -211,6 +232,8 @@ func (c *clusterQueueBase) Dump() (sets.Set[string], bool) { } func (c *clusterQueueBase) DumpInadmissible() (sets.Set[string], bool) { + c.rwm.RLock() + defer c.rwm.RUnlock() if len(c.inadmissibleWorkloads) == 0 { return nil, false } @@ -221,7 +244,27 @@ func (c *clusterQueueBase) DumpInadmissible() (sets.Set[string], bool) { return elements, true } +func (c *clusterQueueBase) Snapshot() []*workload.Info { + c.rwm.RLock() + defer c.rwm.RUnlock() + totalLen := c.heap.Len() + len(c.inadmissibleWorkloads) + elements := make([]*workload.Info, 0, totalLen) + for _, e := range c.heap.List() { + info := e.(*workload.Info) + elements = append(elements, info) + } + for _, e := range c.inadmissibleWorkloads { + elements = append(elements, e) + } + sort.Slice(elements, func(i, j int) bool { + return queueOrdering(elements[i], elements[j]) + }) + return elements +} + func (c *clusterQueueBase) Info(key string) *workload.Info { + c.rwm.RLock() + defer c.rwm.RUnlock() info := c.heap.GetByKey(key) if info == nil { return nil diff --git a/pkg/queue/cluster_queue_interface.go b/pkg/queue/cluster_queue_interface.go index ab611f987e..ed6e27ccfd 100644 --- a/pkg/queue/cluster_queue_interface.go +++ b/pkg/queue/cluster_queue_interface.go @@ -92,6 +92,9 @@ type ClusterQueue interface { // Otherwise returns true. Dump() (sets.Set[string], bool) DumpInadmissible() (sets.Set[string], bool) + // Snapshot returns a copy of the current workloads in the heap of + // this ClusterQueue. + Snapshot() []*workload.Info // Info returns workload.Info for the workload key. // Users of this method should not modify the returned object. Info(string) *workload.Info diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 5c9e4e8b33..ac73185b14 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -48,17 +48,22 @@ type Manager struct { clusterQueues map[string]ClusterQueue localQueues map[string]*LocalQueue + snapshotsMutex sync.RWMutex + snapshots map[string][]kueue.ClusterQueuePendingWorkload + // Key is cohort's name. Value is a set of associated ClusterQueue names. cohorts map[string]sets.Set[string] } func NewManager(client client.Client, checker StatusChecker) *Manager { m := &Manager{ - client: client, - statusChecker: checker, - localQueues: make(map[string]*LocalQueue), - clusterQueues: make(map[string]ClusterQueue), - cohorts: make(map[string]sets.Set[string]), + client: client, + statusChecker: checker, + localQueues: make(map[string]*LocalQueue), + clusterQueues: make(map[string]ClusterQueue), + cohorts: make(map[string]sets.Set[string]), + snapshotsMutex: sync.RWMutex{}, + snapshots: make(map[string][]kueue.ClusterQueuePendingWorkload, 0), } m.cond.L = &m.RWMutex return m @@ -547,3 +552,58 @@ func (m *Manager) reportPendingWorkloads(cqName string, cq ClusterQueue) { } metrics.ReportPendingWorkloads(cqName, active, inadmissible) } + +func (m *Manager) GetClusterQueueNames() []string { + m.RLock() + defer m.RUnlock() + clusterQueueNames := make([]string, 0, len(m.clusterQueues)) + for k := range m.clusterQueues { + clusterQueueNames = append(clusterQueueNames, k) + } + return clusterQueueNames +} + +func (m *Manager) getClusterQueue(cqName string) ClusterQueue { + m.RLock() + defer m.RUnlock() + return m.clusterQueues[cqName] +} + +func (m *Manager) UpdateSnapshot(cqName string, maxCount int32) { + cq := m.getClusterQueue(cqName) + if cq == nil { + return + } + workloads := make([]kueue.ClusterQueuePendingWorkload, 0) + for index, info := range cq.Snapshot() { + if int32(index) >= maxCount { + break + } + if info == nil { + continue + } + workloads = append(workloads, kueue.ClusterQueuePendingWorkload{ + Name: info.Obj.Name, + Namespace: info.Obj.Namespace, + }) + } + m.setSnapshot(cqName, workloads) +} + +func (m *Manager) setSnapshot(cqName string, workloads []kueue.ClusterQueuePendingWorkload) { + m.snapshotsMutex.Lock() + defer m.snapshotsMutex.Unlock() + m.snapshots[cqName] = workloads +} + +func (m *Manager) GetSnapshot(cqName string) []kueue.ClusterQueuePendingWorkload { + m.snapshotsMutex.RLock() + defer m.snapshotsMutex.RUnlock() + return m.snapshots[cqName] +} + +func (m *Manager) DeleteSnapshot(cq *kueue.ClusterQueue) { + m.snapshotsMutex.Lock() + defer m.snapshotsMutex.Unlock() + delete(m.snapshots, cq.Name) +} diff --git a/test/integration/controller/core/clusterqueue_controller_test.go b/test/integration/controller/core/clusterqueue_controller_test.go index 0952b413bb..0800b6ea8e 100644 --- a/test/integration/controller/core/clusterqueue_controller_test.go +++ b/test/integration/controller/core/clusterqueue_controller_test.go @@ -47,6 +47,8 @@ const ( ) var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") +var ignoreLastChangeTime = cmpopts.IgnoreFields(kueue.ClusterQueuePendingWorkloadsStatus{}, "LastChangeTime") +var ignorePendingWorkloadsStatus = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "PendingWorkloadsStatus") var _ = ginkgo.Describe("ClusterQueue controller", func() { var ( @@ -183,7 +185,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { Message: "Can't admit new workloads; some flavors are not found", }, }, - }, ignoreConditionTimestamps)) + }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) // Workloads are inadmissible because ResourceFlavors don't exist here yet. util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 5) util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0) @@ -269,7 +271,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { Message: "Can admit new workloads", }, }, - }, ignoreConditionTimestamps)) + }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) util.ExpectPendingWorkloadsMetric(clusterQueue, 1, 0) util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 4) @@ -296,7 +298,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { Message: "Can admit new workloads", }, }, - }, ignoreConditionTimestamps)) + }, ignoreConditionTimestamps, ignorePendingWorkloadsStatus)) util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 0) }) @@ -603,4 +605,141 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) }) }) + + ginkgo.When("Reconciling clusterQueue pending workload status", func() { + var ( + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + onDemandFlavor *kueue.ResourceFlavor + ) + + ginkgo.BeforeEach(func() { + onDemandFlavor = testing.MakeResourceFlavor(flavorOnDemand).Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed()) + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas(flavorOnDemand). + Resource(corev1.ResourceCPU, "5", "5").Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) + }) + + ginkgo.It("Should update of the pending workloads when a new workload is scheduled", func() { + const lowPrio, midPrio, highPrio = 0, 10, 100 + workloadsFirstBatch := []*kueue.Workload{ + testing.MakeWorkload("one", ns.Name).Queue(localQueue.Name).Priority(highPrio). + Request(corev1.ResourceCPU, "2").Request(resourceGPU, "2").Obj(), + testing.MakeWorkload("two", ns.Name).Queue(localQueue.Name).Priority(midPrio). + Request(corev1.ResourceCPU, "3").Request(resourceGPU, "3").Obj(), + } + + ginkgo.By("Verify pending workload status before adding workloads") + gomega.Eventually(func() *kueue.ClusterQueuePendingWorkloadsStatus { + var updatedCq kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) + return updatedCq.Status.PendingWorkloadsStatus + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(&kueue.ClusterQueuePendingWorkloadsStatus{}, ignoreLastChangeTime)) + + ginkgo.By("Creating workloads") + for _, w := range workloadsFirstBatch { + gomega.Expect(k8sClient.Create(ctx, w)).To(gomega.Succeed()) + } + gomega.Eventually(func() *kueue.ClusterQueuePendingWorkloadsStatus { + var updatedCq kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) + return updatedCq.Status.PendingWorkloadsStatus + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(&kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{ + { + Name: "one", + Namespace: ns.Name, + }, + { + Name: "two", + Namespace: ns.Name, + }, + }, + }, ignoreLastChangeTime)) + + ginkgo.By("Creating a new workload") + workloadsSecondBatch := []*kueue.Workload{ + testing.MakeWorkload("three", ns.Name).Queue(localQueue.Name).Priority(midPrio). + Request(corev1.ResourceCPU, "2").Request(resourceGPU, "2").Obj(), + testing.MakeWorkload("four", ns.Name).Queue(localQueue.Name).Priority(lowPrio). + Request(corev1.ResourceCPU, "3").Request(resourceGPU, "3").Obj(), + } + for _, w := range workloadsSecondBatch { + gomega.Expect(k8sClient.Create(ctx, w)).To(gomega.Succeed()) + } + + ginkgo.By("Verify the head of pending workloads when the number of pending workloads exceeds MaxCount") + gomega.Eventually(func() *kueue.ClusterQueuePendingWorkloadsStatus { + var updatedCq kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) + return updatedCq.Status.PendingWorkloadsStatus + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(&kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{ + { + Name: "one", + Namespace: ns.Name, + }, + { + Name: "two", + Namespace: ns.Name, + }, + { + Name: "three", + Namespace: ns.Name, + }, + }, + }, ignoreLastChangeTime)) + + ginkgo.By("Admitting workloads") + for _, w := range workloadsFirstBatch { + gomega.Eventually(func() error { + var newWL kueue.Workload + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(w), &newWL)).To(gomega.Succeed()) + return util.SetQuotaReservation(ctx, k8sClient, &newWL, testing.MakeAdmission(clusterQueue.Name).Obj()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + } + + gomega.Eventually(func() *kueue.ClusterQueuePendingWorkloadsStatus { + var updatedCQ kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCQ)).To(gomega.Succeed()) + return updatedCQ.Status.PendingWorkloadsStatus + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(&kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{ + { + Name: "three", + Namespace: ns.Name, + }, + { + Name: "four", + Namespace: ns.Name, + }, + }, + }, ignoreLastChangeTime)) + + ginkgo.By("Finishing workload", func() { + util.FinishWorkloads(ctx, k8sClient, workloadsFirstBatch...) + util.FinishWorkloads(ctx, k8sClient, workloadsSecondBatch...) + }) + + gomega.Eventually(func() *kueue.ClusterQueuePendingWorkloadsStatus { + var updatedCq kueue.ClusterQueue + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(clusterQueue), &updatedCq)).To(gomega.Succeed()) + return updatedCq.Status.PendingWorkloadsStatus + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(&kueue.ClusterQueuePendingWorkloadsStatus{ + Head: []kueue.ClusterQueuePendingWorkload{}, + }, ignoreLastChangeTime)) + }) + }) }) diff --git a/test/integration/controller/core/suite_test.go b/test/integration/controller/core/suite_test.go index e3d504d679..3316e226da 100644 --- a/test/integration/controller/core/suite_test.go +++ b/test/integration/controller/core/suite_test.go @@ -72,11 +72,17 @@ func managerSetup(mgr manager.Manager, ctx context.Context) { failedWebhook, err := webhooks.Setup(mgr) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) - cCache := cache.New(mgr.GetClient()) - queues := queue.NewManager(mgr.GetClient(), cCache) - controllersCfg := &config.Configuration{} controllersCfg.Metrics.EnableClusterQueueResources = true + controllersCfg.QueueVisibility = &config.QueueVisibility{ + UpdateIntervalSeconds: 1, + ClusterQueues: &config.ClusterQueueVisibility{ + MaxCount: 3, + }, + } + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) diff --git a/test/integration/scheduler/podsready/scheduler_test.go b/test/integration/scheduler/podsready/scheduler_test.go index dc6ae1eb4b..d091186648 100644 --- a/test/integration/scheduler/podsready/scheduler_test.go +++ b/test/integration/scheduler/podsready/scheduler_test.go @@ -39,6 +39,7 @@ import ( ) var ignoreCQConditions = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "Conditions") +var ignorePendingWorkloadsStatus = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "PendingWorkloadsStatus") // +kubebuilder:docs-gen:collapse=Imports @@ -265,7 +266,7 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { Total: resource.MustParse("2"), }}, }}, - }, ignoreCQConditions)) + }, ignoreCQConditions, ignorePendingWorkloadsStatus)) ginkgo.By("wait for the timeout to be exceeded") time.Sleep(podsReadyTimeout) @@ -294,7 +295,7 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() { Total: resource.MustParse("0"), }}, }}, - }, ignoreCQConditions)) + }, ignoreCQConditions, ignorePendingWorkloadsStatus)) ginkgo.By("verify the active workload metric is decreased for the cluster queue") util.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 0) diff --git a/test/integration/scheduler/workload_controller_test.go b/test/integration/scheduler/workload_controller_test.go index 4872b86f7c..2cd6e818cd 100644 --- a/test/integration/scheduler/workload_controller_test.go +++ b/test/integration/scheduler/workload_controller_test.go @@ -36,6 +36,7 @@ import ( // +kubebuilder:docs-gen:collapse=Imports var ignoreCqCondition = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "Conditions") +var ignorePendingWorkloadsStatus = cmpopts.IgnoreFields(kueue.ClusterQueueStatus{}, "PendingWorkloadsStatus") var _ = ginkgo.Describe("Workload controller with scheduler", func() { var ( @@ -121,7 +122,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("2"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) }) }) @@ -176,7 +177,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("1"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) }) }) @@ -232,7 +233,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }, }, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -274,7 +275,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { }, }, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -333,7 +334,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("1"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) ginkgo.By("Check podSets spec", func() { @@ -436,7 +437,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("5"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) }) }) @@ -526,7 +527,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("5"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) }) }) @@ -560,7 +561,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { Total: resource.MustParse("0"), }}, }}, - }, ignoreCqCondition)) + }, ignoreCqCondition, ignorePendingWorkloadsStatus)) }) gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true)