From f228a662eb45565e7f7fe2aa46ccae2598f6c483 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 21 Sep 2023 12:13:00 +0900 Subject: [PATCH] Support kubeflow.org/paddlejob Signed-off-by: Yuki Iwai --- apis/config/v1beta1/configuration_types.go | 1 + .../templates/rbac/paddlejob_editor_role.yaml | 27 + .../templates/rbac/paddlejob_viewer_role.yaml | 23 + charts/kueue/templates/rbac/role.yaml | 17 + charts/kueue/values.yaml | 1 + cmd/kueue/main_test.go | 2 +- .../manager/controller_manager_config.yaml | 1 + config/components/rbac/kustomization.yaml | 2 + .../rbac/paddlejob_editor_role.yaml | 27 + .../rbac/paddlejob_viewer_role.yaml | 23 + config/components/rbac/role.yaml | 17 + config/components/webhook/manifests.yaml | 39 ++ pkg/controller/jobs/kubeflow/jobs/jobs.go | 1 + .../jobs/paddlejob/paddlejob_controller.go | 98 +++ .../paddlejob/paddlejob_controller_test.go | 384 ++++++++++++ .../jobs/paddlejob/paddlejob_webhook.go | 95 +++ .../jobs/paddlejob/paddlejob_webhook_test.go | 56 ++ pkg/util/testingjobs/paddlejob/wrappers.go | 168 ++++++ .../paddlejob/paddlejob_controller_test.go | 571 ++++++++++++++++++ .../controller/jobs/paddlejob/suite_test.go | 97 +++ 20 files changed, 1649 insertions(+), 1 deletion(-) create mode 100644 charts/kueue/templates/rbac/paddlejob_editor_role.yaml create mode 100644 charts/kueue/templates/rbac/paddlejob_viewer_role.yaml create mode 100644 config/components/rbac/paddlejob_editor_role.yaml create mode 100644 config/components/rbac/paddlejob_viewer_role.yaml create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook.go create mode 100644 pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook_test.go create mode 100644 pkg/util/testingjobs/paddlejob/wrappers.go create mode 100644 test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go create mode 100644 test/integration/controller/jobs/paddlejob/suite_test.go diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index ad7f4f0ce9..45bcfdba4a 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -226,6 +226,7 @@ type Integrations struct { // - "kubeflow.org/mpijob" // - "ray.io/rayjob" // - "jobset.x-k8s.io/jobset" + // - "kubeflow.org/paddlejob" // - "kubeflow.org/pytorchjob" // - "kubeflow.org/tfjob" // - "kubeflow.org/xgboostjob" diff --git a/charts/kueue/templates/rbac/paddlejob_editor_role.yaml b/charts/kueue/templates/rbac/paddlejob_editor_role.yaml new file mode 100644 index 0000000000..7e6c789891 --- /dev/null +++ b/charts/kueue/templates/rbac/paddlejob_editor_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to edit paddlejobs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: '{{ include "kueue.fullname" . }}-paddlejob-editor-role' + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: + - apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get diff --git a/charts/kueue/templates/rbac/paddlejob_viewer_role.yaml b/charts/kueue/templates/rbac/paddlejob_viewer_role.yaml new file mode 100644 index 0000000000..801df2ec75 --- /dev/null +++ b/charts/kueue/templates/rbac/paddlejob_viewer_role.yaml @@ -0,0 +1,23 @@ +# permissions for end users to view paddlejobs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: '{{ include "kueue.fullname" . }}-paddlejob-viewer-role' + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: + - apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - get + - list + - watch + - apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 34c5b99af3..6ce2b1ba46 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -114,6 +114,23 @@ rules: verbs: - get - update + - apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - get + - list + - patch + - update + - watch + - apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get + - update - apiGroups: - kubeflow.org resources: diff --git a/charts/kueue/values.yaml b/charts/kueue/values.yaml index 55ac94cb6f..5ae7484a2e 100644 --- a/charts/kueue/values.yaml +++ b/charts/kueue/values.yaml @@ -87,6 +87,7 @@ managerConfig: - "batch/job" - "kubeflow.org/mpijob" - "ray.io/rayjob" + - "kubeflow.org/paddlejob" - "kubeflow.org/pytorchjob" - "kubeflow.org/tfjob" - "kubeflow.org/xgboostjob" diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 7978edb30a..4aff834924 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -110,7 +110,7 @@ integrations: { name: "bad integrations config", configFile: badIntegrationsConfig, - wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"ray.io/rayjob\""), + wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"ray.io/rayjob\""), }, } diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml index d59eaefc3d..ae1cb45cf5 100644 --- a/config/components/manager/controller_manager_config.yaml +++ b/config/components/manager/controller_manager_config.yaml @@ -34,6 +34,7 @@ integrations: - "kubeflow.org/mpijob" - "ray.io/rayjob" - "jobset.x-k8s.io/jobset" + - "kubeflow.org/paddlejob" - "kubeflow.org/pytorchjob" - "kubeflow.org/tfjob" - "kubeflow.org/xgboostjob" diff --git a/config/components/rbac/kustomization.yaml b/config/components/rbac/kustomization.yaml index ca2675c32d..828295432e 100644 --- a/config/components/rbac/kustomization.yaml +++ b/config/components/rbac/kustomization.yaml @@ -42,3 +42,5 @@ resources: - tfjob_viewer_role.yaml - xgboostjob_editor_role.yaml - xgboostjob_viewer_role.yaml +- paddlejob_editor_role.yaml +- paddlejob_viewer_role.yaml diff --git a/config/components/rbac/paddlejob_editor_role.yaml b/config/components/rbac/paddlejob_editor_role.yaml new file mode 100644 index 0000000000..c6f9c9111c --- /dev/null +++ b/config/components/rbac/paddlejob_editor_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to edit paddlejobs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: paddlejob-editor-role + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: +- apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get diff --git a/config/components/rbac/paddlejob_viewer_role.yaml b/config/components/rbac/paddlejob_viewer_role.yaml new file mode 100644 index 0000000000..1718b0317b --- /dev/null +++ b/config/components/rbac/paddlejob_viewer_role.yaml @@ -0,0 +1,23 @@ +# permissions for end users to view paddlejobs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: paddlejob-viewer-role + labels: + rbac.kueue.x-k8s.io/batch-admin: "true" + rbac.kueue.x-k8s.io/batch-user: "true" +rules: +- apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - get + - list + - watch +- apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 52df06619d..4307cfc4d8 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -115,6 +115,23 @@ rules: verbs: - get - update +- apiGroups: + - kubeflow.org + resources: + - paddlejobs + verbs: + - get + - list + - patch + - update + - watch +- apiGroups: + - kubeflow.org + resources: + - paddlejobs/status + verbs: + - get + - update - apiGroups: - kubeflow.org resources: diff --git a/config/components/webhook/manifests.yaml b/config/components/webhook/manifests.yaml index a17a0cfae9..75eb1c5d4a 100644 --- a/config/components/webhook/manifests.yaml +++ b/config/components/webhook/manifests.yaml @@ -42,6 +42,25 @@ webhooks: resources: - jobsets sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-kubeflow-org-v1-paddlejob + failurePolicy: Fail + name: mpaddlejob.kb.io + rules: + - apiGroups: + - kubeflow.org + apiVersions: + - v1 + operations: + - CREATE + resources: + - paddlejobs + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -241,6 +260,26 @@ webhooks: resources: - jobsets sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-kubeflow-org-v1-paddlejob + failurePolicy: Fail + name: vpaddlejob.kb.io + rules: + - apiGroups: + - kubeflow.org + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - paddlejobs + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/pkg/controller/jobs/kubeflow/jobs/jobs.go b/pkg/controller/jobs/kubeflow/jobs/jobs.go index a9a9509d6d..8e24d2a42d 100644 --- a/pkg/controller/jobs/kubeflow/jobs/jobs.go +++ b/pkg/controller/jobs/kubeflow/jobs/jobs.go @@ -18,6 +18,7 @@ package jobs // Reference the job framework integration packages to ensure linking. import ( + _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob" diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go new file mode 100644 index 0000000000..098f2eb347 --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/kubeflowjob" +) + +var ( + gvk = kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind) + FrameworkName = "kubeflow.org/paddlejob" +) + +func init() { + utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupPaddleJobWebhook, + JobType: &kftraining.PaddleJob{}, + AddToScheme: kftraining.AddToScheme, + })) +} + +// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=kubeflow.org,resources=paddlejobs/status,verbs=get;update +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch + +var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.GenericJob { + return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.PaddleJob{})} +}, nil) + +type JobControl kftraining.PaddleJob + +var _ kubeflowjob.KFJobControl = (*JobControl)(nil) + +func (j *JobControl) Object() client.Object { + return (*kftraining.PaddleJob)(j) +} + +func fromObject(o runtime.Object) *kubeflowjob.KubeflowJob { + return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(o.(*kftraining.PaddleJob))} +} + +func (j *JobControl) GVK() schema.GroupVersionKind { + return gvk +} + +func (j *JobControl) RunPolicy() *kftraining.RunPolicy { + return &j.Spec.RunPolicy +} + +func (j *JobControl) ReplicaSpecs() map[kftraining.ReplicaType]*kftraining.ReplicaSpec { + return j.Spec.PaddleReplicaSpecs +} + +func (j *JobControl) JobStatus() kftraining.JobStatus { + return j.Status +} + +func (j *JobControl) OrderedReplicaTypes() []kftraining.ReplicaType { + return []kftraining.ReplicaType{kftraining.PaddleJobReplicaTypeMaster, kftraining.PaddleJobReplicaTypeWorker} +} + +func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { + return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk) +} + +func GetWorkloadNameForPaddleJob(jobName string) string { + return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, gvk) +} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go new file mode 100644 index 0000000000..a2ad455bc1 --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller_test.go @@ -0,0 +1,384 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" +) + +func TestPriorityClass(t *testing.T) { + testcases := map[string]struct { + job kftraining.PaddleJob + wantPriorityClassName string + }{ + "none priority class name specified": { + job: kftraining.PaddleJob{}, + wantPriorityClassName: "", + }, + "priority specified at runPolicy and replicas; use priority in runPolicy": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + RunPolicy: kftraining.RunPolicy{ + SchedulingPolicy: &kftraining.SchedulingPolicy{ + PriorityClass: "scheduling-priority", + }, + }, + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "master-priority", + }, + }, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "worker-priority", + }, + }, + }, + }, + }, + }, + wantPriorityClassName: "scheduling-priority", + }, + "runPolicy present, but without priority; fallback to master": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + RunPolicy: kftraining.RunPolicy{ + SchedulingPolicy: &kftraining.SchedulingPolicy{}, + }, + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "master-priority", + }, + }, + }, + }, + }, + }, + wantPriorityClassName: "master-priority", + }, + "specified on master takes precedence over worker": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "master-priority", + }, + }, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "worker-priority", + }, + }, + }, + }, + }, + }, + wantPriorityClassName: "master-priority", + }, + "master present, but without priority; fallback to worker": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{}, + }, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "worker-priority", + }, + }, + }, + }, + }, + }, + wantPriorityClassName: "worker-priority", + }, + "specified on worker only": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: {}, + kftraining.PaddleJobReplicaTypeWorker: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + PriorityClassName: "worker-priority", + }, + }, + }, + }, + }, + }, + wantPriorityClassName: "worker-priority", + }, + "worker present, but without priority; fallback to empty": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: {}, + kftraining.PaddleJobReplicaTypeWorker: { + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{}, + }, + }, + }, + }, + }, + wantPriorityClassName: "", + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + paddleJob := fromObject(&tc.job) + gotPriorityClassName := paddleJob.PriorityClass() + if tc.wantPriorityClassName != gotPriorityClassName { + t.Errorf("Unexpected response (want: %v, got: %v)", tc.wantPriorityClassName, gotPriorityClassName) + } + }) + } +} + +func TestOrderedReplicaTypes(t *testing.T) { + testcases := map[string]struct { + job kftraining.PaddleJob + wantReplicaTypes []kftraining.ReplicaType + }{ + "job has no replicas": { + job: kftraining.PaddleJob{}, + wantReplicaTypes: []kftraining.ReplicaType{}, + }, + "job has all replicas": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: {}, + kftraining.PaddleJobReplicaTypeWorker: {}, + }, + }, + }, + wantReplicaTypes: []kftraining.ReplicaType{ + kftraining.PaddleJobReplicaTypeMaster, + kftraining.PaddleJobReplicaTypeWorker, + }, + }, + "job only has the worker replica": { + job: kftraining.PaddleJob{ + Spec: kftraining.PaddleJobSpec{ + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeWorker: {}, + }, + }, + }, + wantReplicaTypes: []kftraining.ReplicaType{ + kftraining.PaddleJobReplicaTypeWorker, + }, + }, + } + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + paddleJob := fromObject(&tc.job) + gotReplicaTypes := paddleJob.OrderedReplicaTypes() + if diff := cmp.Diff(tc.wantReplicaTypes, gotReplicaTypes); len(diff) != 0 { + t.Errorf("Unexpected response (-want, +got): %v", diff) + } + }) + } +} + +var ( + jobCmpOpts = cmp.Options{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(kftraining.PaddleJob{}, "TypeMeta", "ObjectMeta"), + } + workloadCmpOpts = cmp.Options{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"), + cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"), + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + cmpopts.IgnoreFields(kueue.PodSet{}, "Template"), + } +) + +func TestReconciler(t *testing.T) { + cases := map[string]struct { + reconcilerOptions []jobframework.Option + job *kftraining.PaddleJob + workloads []kueue.Workload + wantJob *kftraining.PaddleJob + wantWorkloads []kueue.Workload + wantErr error + }{ + "workload is created with podsets": { + reconcilerOptions: []jobframework.Option{ + jobframework.WithManageJobsWithoutQueueName(true), + }, + job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("paddlejob", "ns"). + PodSets( + *utiltesting.MakePodSet("master", 1).Obj(), + *utiltesting.MakePodSet("worker", 2).Obj(), + ). + Obj(), + }, + }, + "workload isn't created due to manageJobsWithoutQueueName=false": { + reconcilerOptions: []jobframework.Option{ + jobframework.WithManageJobsWithoutQueueName(false), + }, + job: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns").Parallelism(2).Obj(), + wantWorkloads: []kueue.Workload{}, + }, + "when workload is evicted, suspended is reset, restore node affinity": { + job: testingpaddlejob.MakePaddleJob("paddlejob", "ns"). + Image(""). + Args(nil). + Queue("foo"). + Suspend(false). + Parallelism(10). + Request(kftraining.PaddleJobReplicaTypeMaster, v1.ResourceCPU, "1"). + Request(kftraining.PaddleJobReplicaTypeWorker, v1.ResourceCPU, "5"). + NodeSelector("provisioning", "spot"). + Active(kftraining.PaddleJobReplicaTypeMaster, 1). + Active(kftraining.PaddleJobReplicaTypeWorker, 10). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns"). + PodSets( + *utiltesting.MakePodSet("master", 1).Request(v1.ResourceCPU, "1").Obj(), + *utiltesting.MakePodSet("worker", 10).Request(v1.ResourceCPU, "5").Obj(), + ). + ReserveQuota(utiltesting.MakeAdmission("cq"). + AssignmentPodCount(1). + AssignmentPodCount(10). + Obj()). + Admitted(true). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + }). + Obj(), + }, + wantJob: testingpaddlejob.MakePaddleJob("paddlejob", "ns"). + Image(""). + Args(nil). + Queue("foo"). + Suspend(true). + Parallelism(10). + Request(kftraining.PaddleJobReplicaTypeMaster, v1.ResourceCPU, "1"). + Request(kftraining.PaddleJobReplicaTypeWorker, v1.ResourceCPU, "5"). + Active(kftraining.PaddleJobReplicaTypeMaster, 1). + Active(kftraining.PaddleJobReplicaTypeWorker, 10). + Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns"). + PodSets( + *utiltesting.MakePodSet("master", 1).Request(v1.ResourceCPU, "1").Obj(), + *utiltesting.MakePodSet("worker", 10).Request(v1.ResourceCPU, "5").Obj(), + ). + ReserveQuota(utiltesting.MakeAdmission("cq"). + AssignmentPodCount(1). + AssignmentPodCount(10). + Obj()). + Admitted(true). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + }). + Obj(), + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + kcBuilder := utiltesting.NewClientBuilder(kftraining.AddToScheme) + if err := SetupIndexes(ctx, utiltesting.AsIndexer(kcBuilder)); err != nil { + t.Fatalf("Failed to setup indexes: %v", err) + } + kcBuilder = kcBuilder.WithObjects(tc.job) + for i := range tc.workloads { + kcBuilder = kcBuilder.WithStatusSubresource(&tc.workloads[i]) + } + + kClient := kcBuilder.Build() + for i := range tc.workloads { + if err := ctrl.SetControllerReference(tc.job, &tc.workloads[i], kClient.Scheme()); err != nil { + t.Fatalf("Could not set controller reference: %v", err) + } + if err := kClient.Create(ctx, &tc.workloads[i]); err != nil { + t.Fatalf("Could not create Workload: %v", err) + } + } + recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), v1.EventSource{Component: "test"}) + reconciler := NewReconciler(kClient, recorder, tc.reconcilerOptions...) + + jobKey := client.ObjectKeyFromObject(tc.job) + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: jobKey, + }) + if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Reconcile returned error (-want,+got):\n%s", diff) + } + + var gotPaddleJob kftraining.PaddleJob + if err = kClient.Get(ctx, jobKey, &gotPaddleJob); err != nil { + t.Fatalf("Could not get Job after reconcile: %v", err) + } + if diff := cmp.Diff(tc.wantJob, &gotPaddleJob, jobCmpOpts...); diff != "" { + t.Errorf("Job after reconcile (-want,+got):\n%s", diff) + } + var gotWorkloads kueue.WorkloadList + if err = kClient.List(ctx, &gotWorkloads); err != nil { + t.Fatalf("Could not list Workloads after reconcile: %v", err) + } + if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, workloadCmpOpts...); diff != "" { + t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook.go new file mode 100644 index 0000000000..70cef83398 --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook.go @@ -0,0 +1,95 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "sigs.k8s.io/kueue/pkg/controller/jobframework" +) + +type PaddleJobWebhook struct { + manageJobsWithoutQueueName bool +} + +// SetupPaddleJobWebhook configures the webhook for kubeflow PaddleJob. +func SetupPaddleJobWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { + options := jobframework.DefaultOptions + for _, opt := range opts { + opt(&options) + } + wh := &PaddleJobWebhook{ + manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, + } + return ctrl.NewWebhookManagedBy(mgr). + For(&kftraining.PaddleJob{}). + WithDefaulter(wh). + WithValidator(wh). + Complete() +} + +// +kubebuilder:webhook:path=/mutate-kubeflow-org-v1-paddlejob,mutating=true,failurePolicy=fail,sideEffects=None,groups=kubeflow.org,resources=paddlejobs,verbs=create,versions=v1,name=mpaddlejob.kb.io,admissionReviewVersions=v1 + +var _ webhook.CustomDefaulter = &PaddleJobWebhook{} + +// Default implements webhook.CustomDefaulter so a webhook will be registered for the type +func (w *PaddleJobWebhook) Default(ctx context.Context, obj runtime.Object) error { + job := fromObject(obj) + log := ctrl.LoggerFrom(ctx).WithName("paddlejob-webhook") + log.V(5).Info("Applying defaults", "paddlejob", klog.KObj(job.Object())) + jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) + return nil +} + +// +kubebuilder:webhook:path=/validate-kubeflow-org-v1-paddlejob,mutating=false,failurePolicy=fail,sideEffects=None,groups=kubeflow.org,resources=paddlejobs,verbs=create;update,versions=v1,name=vpaddlejob.kb.io,admissionReviewVersions=v1 + +var _ webhook.CustomValidator = &PaddleJobWebhook{} + +// ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type +func (w *PaddleJobWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + job := fromObject(obj) + log := ctrl.LoggerFrom(ctx).WithName("paddlejob-webhook") + log.Info("Validating create", "paddlejob", klog.KObj(job.Object())) + return nil, validateCreate(job).ToAggregate() +} + +func validateCreate(job jobframework.GenericJob) field.ErrorList { + return jobframework.ValidateCreateForQueueName(job) +} + +// ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type +func (w *PaddleJobWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldJob := fromObject(oldObj) + newJob := fromObject(newObj) + log := ctrl.LoggerFrom(ctx).WithName("paddlejob-webhook") + log.Info("Validating update", "paddlejob", klog.KObj(newJob.Object())) + allErrs := jobframework.ValidateUpdateForQueueName(oldJob, newJob) + return nil, allErrs.ToAggregate() +} + +// ValidateDelete implements webhook.CustomValidator so a webhook will be registered for the type +func (w *PaddleJobWebhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) { + return nil, nil +} diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook_test.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook_test.go new file mode 100644 index 0000000000..3470aab27e --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_webhook_test.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + + testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" +) + +func TestDefault(t *testing.T) { + testcases := map[string]struct { + job *kftraining.PaddleJob + manageJobsWithoutQueueName bool + want *kftraining.PaddleJob + }{ + "update the suspend field with 'manageJobsWithoutQueueName=false'": { + job: testingutil.MakePaddleJob("job", "default").Queue("queue").Suspend(false).Obj(), + want: testingutil.MakePaddleJob("job", "default").Queue("queue").Obj(), + }, + "update the suspend field 'manageJobsWithoutQueueName=true'": { + job: testingutil.MakePaddleJob("job", "default").Suspend(false).Obj(), + manageJobsWithoutQueueName: true, + want: testingutil.MakePaddleJob("job", "default").Obj(), + }, + } + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + w := &PaddleJobWebhook{manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName} + if err := w.Default(context.Background(), tc.job); err != nil { + t.Errorf("set defaults to a kubeflow.org/paddlejob by a Defaulter") + } + if diff := cmp.Diff(tc.want, tc.job); len(diff) != 0 { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/testingjobs/paddlejob/wrappers.go b/pkg/util/testingjobs/paddlejob/wrappers.go new file mode 100644 index 0000000000..ee8152e030 --- /dev/null +++ b/pkg/util/testingjobs/paddlejob/wrappers.go @@ -0,0 +1,168 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + + "sigs.k8s.io/kueue/pkg/controller/constants" +) + +// PaddleJobWrapper wraps a Job. +type PaddleJobWrapper struct{ kftraining.PaddleJob } + +// MakePaddleJob creates a wrapper for a suspended job with a single container and parallelism=1. +func MakePaddleJob(name, ns string) *PaddleJobWrapper { + return &PaddleJobWrapper{kftraining.PaddleJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: make(map[string]string, 1), + }, + Spec: kftraining.PaddleJobSpec{ + RunPolicy: kftraining.RunPolicy{ + Suspend: ptr.To(true), + }, + PaddleReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ + kftraining.PaddleJobReplicaTypeMaster: { + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, + }, + NodeSelector: map[string]string{}, + }, + }, + }, + kftraining.PaddleJobReplicaTypeWorker: { + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, + }, + NodeSelector: map[string]string{}, + }, + }, + }, + }, + }, + }} +} + +// PriorityClass updates job priorityclass. +func (j *PaddleJobWrapper) PriorityClass(pc string) *PaddleJobWrapper { + if j.Spec.RunPolicy.SchedulingPolicy == nil { + j.Spec.RunPolicy.SchedulingPolicy = &kftraining.SchedulingPolicy{} + } + j.Spec.RunPolicy.SchedulingPolicy.PriorityClass = pc + return j +} + +// Obj returns the inner Job. +func (j *PaddleJobWrapper) Obj() *kftraining.PaddleJob { + return &j.PaddleJob +} + +// Queue updates the queue name of the job. +func (j *PaddleJobWrapper) Queue(queue string) *PaddleJobWrapper { + if j.Labels == nil { + j.Labels = make(map[string]string) + } + j.Labels[constants.QueueLabel] = queue + return j +} + +// Request adds a resource request to the default container. +func (j *PaddleJobWrapper) Request(replicaType kftraining.ReplicaType, r corev1.ResourceName, v string) *PaddleJobWrapper { + j.Spec.PaddleReplicaSpecs[replicaType].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) + return j +} + +// Image updates images of the job. +func (j *PaddleJobWrapper) Image(image string) *PaddleJobWrapper { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.Containers[0].Image = image + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.Containers[0].Image = image + return j +} + +// Args updates args of the job. +func (j *PaddleJobWrapper) Args(args []string) *PaddleJobWrapper { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.Containers[0].Args = args + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.Containers[0].Args = args + return j +} + +// Parallelism updates job parallelism. +func (j *PaddleJobWrapper) Parallelism(p int32) *PaddleJobWrapper { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Replicas = ptr.To(p) + return j +} + +// Suspend updates the suspend status of the job. +func (j *PaddleJobWrapper) Suspend(s bool) *PaddleJobWrapper { + j.Spec.RunPolicy.Suspend = &s + return j +} + +// UID updates the uid of the job. +func (j *PaddleJobWrapper) UID(uid string) *PaddleJobWrapper { + j.ObjectMeta.UID = types.UID(uid) + return j +} + +// NodeSelector updates the nodeSelector of job. +func (j *PaddleJobWrapper) NodeSelector(k, v string) *PaddleJobWrapper { + if j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector == nil { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector = make(map[string]string) + } + if j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector == nil { + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector = make(map[string]string) + } + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector[k] = v + j.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector[k] = v + return j +} + +// Active updates the replicaStatus for Active of job. +func (j *PaddleJobWrapper) Active(rType kftraining.ReplicaType, c int32) *PaddleJobWrapper { + if j.Status.ReplicaStatuses == nil { + j.Status.ReplicaStatuses = make(map[kftraining.ReplicaType]*kftraining.ReplicaStatus) + } + j.Status.ReplicaStatuses[rType] = &kftraining.ReplicaStatus{ + Active: c, + } + return j +} diff --git a/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go new file mode 100644 index 0000000000..7d3e221945 --- /dev/null +++ b/test/integration/controller/jobs/paddlejob/paddlejob_controller_test.go @@ -0,0 +1,571 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "fmt" + + "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" + "sigs.k8s.io/kueue/pkg/util/testing" + testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" + "sigs.k8s.io/kueue/test/integration/framework" + "sigs.k8s.io/kueue/test/util" +) + +const ( + jobName = "test-job" + instanceKey = "cloud.provider.com/instance" + priorityClassName = "test-priority-class" + priorityValue = 10 +) + +var ( + ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") +) + +// +kubebuilder:docs-gen:collapse=Imports + +var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{paddleCrdPath}, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithManageJobsWithoutQueueName(true))) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + var ( + ns *corev1.Namespace + wlLookupKey types.NamespacedName + ) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "core-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + wlLookupKey = types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(jobName), Namespace: ns.Name} + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.It("Should reconcile PaddleJobs", func() { + ginkgo.By("checking the job gets suspended when created unsuspended") + priorityClass := testing.MakePriorityClass(priorityClassName). + PriorityValue(int32(priorityValue)).Obj() + gomega.Expect(k8sClient.Create(ctx, priorityClass)).Should(gomega.Succeed()) + + job := testingpaddlejob.MakePaddleJob(jobName, ns.Name).PriorityClass(priorityClassName).Obj() + err := k8sClient.Create(ctx, job) + gomega.Expect(err).To(gomega.Succeed()) + createdJob := &kftraining.PaddleJob{} + + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, types.NamespacedName{Name: jobName, Namespace: ns.Name}, createdJob); err != nil { + return false + } + return createdJob.Spec.RunPolicy.Suspend != nil && *createdJob.Spec.RunPolicy.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is created without queue assigned") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal(""), "The Workload shouldn't have .spec.queueName set") + gomega.Expect(metav1.IsControlledBy(createdWorkload, createdJob)).To(gomega.BeTrue(), "The Workload should be owned by the Job") + + ginkgo.By("checking the workload is created with priority and priorityName") + gomega.Expect(createdWorkload.Spec.PriorityClassName).Should(gomega.Equal(priorityClassName)) + gomega.Expect(*createdWorkload.Spec.Priority).Should(gomega.Equal(int32(priorityValue))) + + ginkgo.By("checking the workload is updated with queue name when the job does") + jobQueueName := "test-queue" + createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} + gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return createdWorkload.Spec.QueueName == jobQueueName + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking a second non-matching workload is deleted") + secondWl := &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: workloadpaddlejob.GetWorkloadNameForPaddleJob("second-workload"), + Namespace: createdWorkload.Namespace, + }, + Spec: *createdWorkload.Spec.DeepCopy(), + } + gomega.Expect(ctrl.SetControllerReference(createdJob, secondWl, scheme.Scheme)).Should(gomega.Succeed()) + secondWl.Spec.PodSets[0].Count += 1 + + gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed()) + gomega.Eventually(func() error { + wl := &kueue.Workload{} + key := types.NamespacedName{Name: secondWl.Name, Namespace: secondWl.Namespace} + return k8sClient.Get(ctx, key, wl) + }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) + // check the original wl is still there + gomega.Consistently(func() bool { + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) + return err == nil + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the job is unsuspended when workload is assigned") + onDemandFlavor := testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + spotFlavor := testing.MakeResourceFlavor("spot").Label(instanceKey, "spot").Obj() + gomega.Expect(k8sClient.Create(ctx, spotFlavor)).Should(gomega.Succeed()) + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), + *testing.MakeFlavorQuotas("spot").Resource(corev1.ResourceCPU, "5").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + PodSets( + kueue.PodSetAssignment{ + Name: "Master", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[0].Count), + }, + kueue.PodSetAssignment{ + Name: "Worker", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[1].Count), + }, + ). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdJob); err != nil { + return false + } + return !*createdJob.Spec.RunPolicy.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(len(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) + gomega.Expect(len(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotFlavor.Name)) + gomega.Consistently(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return len(createdWorkload.Status.Conditions) == 2 + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed") + parallelism := ptr.Deref(job.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Replicas, 1) + newParallelism := parallelism + 1 + createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Replicas = &newParallelism + gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdJob); err != nil { + return false + } + return createdJob.Spec.RunPolicy.Suspend != nil && *createdJob.Spec.RunPolicy.Suspend && + len(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector) == 0 + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String())) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is updated with new count") + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return createdWorkload.Spec.PodSets[1].Count == newParallelism + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(createdWorkload.Status.Admission).Should(gomega.BeNil()) + + ginkgo.By("checking the job is unsuspended and selectors added when workload is assigned again") + admission = testing.MakeAdmission(clusterQueue.Name). + PodSets( + kueue.PodSetAssignment{ + Name: "Master", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[0].Count), + }, + kueue.PodSetAssignment{ + Name: "Worker", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[1].Count), + }, + ). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdJob); err != nil { + return false + } + return !*createdJob.Spec.RunPolicy.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(len(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) + gomega.Expect(len(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotFlavor.Name)) + gomega.Consistently(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return len(createdWorkload.Status.Conditions) == 2 + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is finished when job is completed") + createdJob.Status.Conditions = append(createdJob.Status.Conditions, + kftraining.JobCondition{ + Type: kftraining.JobSucceeded, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }) + gomega.Expect(k8sClient.Status().Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) + if err != nil || len(createdWorkload.Status.Conditions) == 2 { + return false + } + + return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) +}) + +var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + type podsReadyTestSpec struct { + beforeJobStatus *kftraining.JobStatus + beforeCondition *metav1.Condition + jobStatus kftraining.JobStatus + suspended bool + wantCondition *metav1.Condition + } + + var ( + ns *corev1.Namespace + wlLookupKey types.NamespacedName + defaultFlavor = testing.MakeResourceFlavor("default").Label(instanceKey, "default").Obj() + ) + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{paddleCrdPath}, + } + cfg := fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetup(jobframework.WithWaitForPodsReady(true))) + + ginkgo.By("Create a resource flavor") + gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultFlavor, true) + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "core-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + wlLookupKey = types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(jobName), Namespace: ns.Name} + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.DescribeTable("Single job at different stages of progress towards completion", + func(podsReadyTestSpec podsReadyTestSpec) { + ginkgo.By("Create a job") + job := testingpaddlejob.MakePaddleJob(jobName, ns.Name).Parallelism(2).Obj() + jobQueueName := "test-queue" + job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} + createdJob := &kftraining.PaddleJob{} + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) + + ginkgo.By("Fetch the workload created for the job") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Admit the workload created for the job") + admission := testing.MakeAdmission("foo"). + PodSets( + kueue.PodSetAssignment{ + Name: "Master", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[0].Count), + }, + kueue.PodSetAssignment{ + Name: "Worker", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + Count: ptr.To(createdWorkload.Spec.PodSets[1].Count), + }, + ). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + + ginkgo.By("Await for the job to be unsuspended") + gomega.Eventually(func() *bool { + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) + return createdJob.Spec.RunPolicy.Suspend + }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) + + if podsReadyTestSpec.beforeJobStatus != nil { + ginkgo.By("Update the job status to simulate its initial progress towards completion") + createdJob.Status = *podsReadyTestSpec.beforeJobStatus + gomega.Expect(k8sClient.Status().Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) + } + + if podsReadyTestSpec.beforeCondition != nil { + ginkgo.By("Update the workload status") + gomega.Eventually(func() *metav1.Condition { + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + return apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadPodsReady) + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(podsReadyTestSpec.beforeCondition, ignoreConditionTimestamps)) + } + + ginkgo.By("Update the job status to simulate its progress towards completion") + createdJob.Status = podsReadyTestSpec.jobStatus + gomega.Expect(k8sClient.Status().Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) + + if podsReadyTestSpec.suspended { + ginkgo.By("Unset admission of the workload to suspend the job") + gomega.Eventually(func() error { + // the update may need to be retried due to a conflict as the workload gets + // also updated due to setting of the job status. + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return err + } + return util.SetQuotaReservation(ctx, k8sClient, createdWorkload, nil) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + } + + ginkgo.By("Verify the PodsReady condition is added") + gomega.Eventually(func() *metav1.Condition { + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + return apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadPodsReady) + }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(podsReadyTestSpec.wantCondition, ignoreConditionTimestamps)) + }, + ginkgo.Entry("No progress", podsReadyTestSpec{ + wantCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionFalse, + Reason: "PodsReady", + Message: "Not all pods are ready or succeeded", + }, + }), + ginkgo.Entry("Running PaddleJob", podsReadyTestSpec{ + jobStatus: kftraining.JobStatus{ + Conditions: []kftraining.JobCondition{ + { + Type: kftraining.JobRunning, + Status: corev1.ConditionTrue, + Reason: "Running", + }, + }, + }, + wantCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionTrue, + Reason: "PodsReady", + Message: "All pods were ready or succeeded since the workload admission", + }, + }), + ginkgo.Entry("Running PaddleJob; PodsReady=False before", podsReadyTestSpec{ + beforeCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionFalse, + Reason: "PodsReady", + Message: "Not all pods are ready or succeeded", + }, + jobStatus: kftraining.JobStatus{ + Conditions: []kftraining.JobCondition{ + { + Type: kftraining.JobRunning, + Status: corev1.ConditionTrue, + Reason: "Running", + }, + }, + }, + wantCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionTrue, + Reason: "PodsReady", + Message: "All pods were ready or succeeded since the workload admission", + }, + }), + ginkgo.Entry("Job suspended; PodsReady=True before", podsReadyTestSpec{ + beforeJobStatus: &kftraining.JobStatus{ + Conditions: []kftraining.JobCondition{ + { + Type: kftraining.JobRunning, + Status: corev1.ConditionTrue, + Reason: "Running", + }, + }, + }, + beforeCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionTrue, + Reason: "PodsReady", + Message: "All pods were ready or succeeded since the workload admission", + }, + jobStatus: kftraining.JobStatus{ + Conditions: []kftraining.JobCondition{ + { + Type: kftraining.JobRunning, + Status: corev1.ConditionFalse, + Reason: "Suspended", + }, + }, + }, + suspended: true, + wantCondition: &metav1.Condition{ + Type: kueue.WorkloadPodsReady, + Status: metav1.ConditionFalse, + Reason: "PodsReady", + Message: "Not all pods are ready or succeeded", + }, + }), + ) +}) + +var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + ns *corev1.Namespace + onDemandFlavor *kueue.ResourceFlavor + spotUntaintedFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{paddleCrdPath}, + } + cfg := fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerAndSchedulerSetup()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "core-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + onDemandFlavor = testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + + spotUntaintedFlavor = testing.MakeResourceFlavor("spot-untainted").Label(instanceKey, "spot-untainted").Obj() + gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("dev-clusterqueue"). + ResourceGroup( + *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(), + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), + ).Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) + gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, spotUntaintedFlavor)).To(gomega.Succeed()) + }) + + ginkgo.It("Should schedule jobs as they fit in their ClusterQueue", func() { + ginkgo.By("creating localQueue") + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + + ginkgo.By("checking a dev job starts") + job := testingpaddlejob.MakePaddleJob("dev-job", ns.Name).Queue(localQueue.Name). + Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "3"). + Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "4"). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + createdJob := &kftraining.PaddleJob{} + gomega.Eventually(func() *bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)). + Should(gomega.Succeed()) + return createdJob.Spec.RunPolicy.Suspend + }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeMaster].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) + gomega.Expect(createdJob.Spec.PaddleReplicaSpecs[kftraining.PaddleJobReplicaTypeWorker].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) + util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + }) +}) diff --git a/test/integration/controller/jobs/paddlejob/suite_test.go b/test/integration/controller/jobs/paddlejob/suite_test.go new file mode 100644 index 0000000000..3b7c8a9c54 --- /dev/null +++ b/test/integration/controller/jobs/paddlejob/suite_test.go @@ -0,0 +1,97 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package paddlejob + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/test/integration/framework" +) + +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework + crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") + paddleCrdPath = filepath.Join("..", "..", "..", "..", "..", "dep-crds", "training-operator") +) + +func TestAPIs(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "PaddleJob Controller Suite", + ) +} + +func managerSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + reconciler := paddlejob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err := paddlejob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = paddlejob.SetupPaddleJobWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } +} + +func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + err = paddlejob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = paddlejob.NewReconciler(mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), opts...).SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = paddlejob.SetupPaddleJobWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } +}