diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 12040ddf5d..6a5b11a2d8 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -278,6 +278,7 @@ rules: - pytorchjobs/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 2d38be7220..3522d37a75 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -277,6 +277,7 @@ rules: - pytorchjobs/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index 5dc7bc6cd5..09cdbcd5ee 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -40,7 +40,8 @@ var ( batchv1.SchemeGroupVersion.WithKind("Job").String(), jobset.SchemeGroupVersion.WithKind("JobSet").String(), kftraining.SchemeGroupVersion.WithKind(kftraining.TFJobKind).String(), - kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String()) + kftraining.SchemeGroupVersion.WithKind(kftraining.PaddleJobKind).String(), + kftraining.SchemeGroupVersion.WithKind(kftraining.PyTorchJobKind).String()) ) // ValidateJobOnCreate encapsulates all GenericJob validations that must be performed on a Create operation diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go new file mode 100644 index 0000000000..2976d79add --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pytorchjob + +import ( + "context" + "errors" + "fmt" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" +) + +type multikueueAdapter struct{} + +var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) + +func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { + localJob := kftraining.PyTorchJob{} + err := localClient.Get(ctx, key, &localJob) + if err != nil { + return err + } + + remoteJob := &kftraining.PyTorchJob{} + err = remoteClient.Get(ctx, key, remoteJob) + if client.IgnoreNotFound(err) != nil { + return err + } + + // if the remote exists, just copy the status + if err == nil { + return clientutil.PatchStatus(ctx, localClient, &localJob, func() (bool, error) { + localJob.Status = remoteJob.Status + return true, nil + }) + } + + remoteJob = &kftraining.PyTorchJob{ + ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), + Spec: *localJob.Spec.DeepCopy(), + } + + // add the prebuilt workload + if remoteJob.Labels == nil { + remoteJob.Labels = make(map[string]string, 2) + } + remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin + + return remoteClient.Create(ctx, remoteJob) +} + +func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { + job := kftraining.PyTorchJob{} + err := remoteClient.Get(ctx, key, &job) + if err != nil { + return client.IgnoreNotFound(err) + } + return client.IgnoreNotFound(remoteClient.Delete(ctx, &job)) +} + +func (b *multikueueAdapter) KeepAdmissionCheckPending() bool { + return false +} + +func (b *multikueueAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) { + return true, "", nil +} + +func (b *multikueueAdapter) GVK() schema.GroupVersionKind { + return gvk +} + +var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil) + +func (*multikueueAdapter) GetEmptyList() client.ObjectList { + return &kftraining.PyTorchJobList{} +} + +func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) { + pyTorchJob, isPyTorchJob := o.(*kftraining.PyTorchJob) + if !isPyTorchJob { + return types.NamespacedName{}, errors.New("not a PyTorchJob") + } + + prebuiltWl, hasPrebuiltWorkload := pyTorchJob.Labels[constants.PrebuiltWorkloadLabel] + if !hasPrebuiltWorkload { + return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for PyTorchJob: %s", klog.KObj(pyTorchJob)) + } + + return types.NamespacedName{Name: prebuiltWl, Namespace: pyTorchJob.Namespace}, nil +} diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go new file mode 100644 index 0000000000..8ae5db1a41 --- /dev/null +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorch_multikueue_adapter_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pytorchjob + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/util/slices" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + kfutiltesting "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" +) + +const ( + TestNamespace = "ns" +) + +func TestMultikueueAdapter(t *testing.T) { + objCheckOpts := []cmp.Option{ + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.EquateEmpty(), + } + + pyTorchJobBuilder := kfutiltesting.MakePyTorchJob("pytorchjob1", TestNamespace).Queue("queue").Suspend(false) + + cases := map[string]struct { + managersPyTorchJobs []kftraining.PyTorchJob + workerPyTorchJobs []kftraining.PyTorchJob + + operation func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error + + wantError error + wantManagersPyTorchJobs []kftraining.PyTorchJob + wantWorkerPyTorchJobs []kftraining.PyTorchJob + }{ + "sync creates missing remote pytorchjob": { + managersPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone().Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone().Obj(), + }, + wantWorkerPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + }, + "sync status from remote pytorchjob": { + managersPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone().Obj(), + }, + workerPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.SyncJob(ctx, managerClient, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}, "wl1", "origin1") + }, + + wantManagersPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone(). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + wantWorkerPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue}). + Obj(), + }, + }, + "remote pytorchjob is deleted": { + workerPyTorchJobs: []kftraining.PyTorchJob{ + *pyTorchJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(kueuealpha.MultiKueueOriginLabel, "origin1"). + Obj(), + }, + operation: func(ctx context.Context, adapter *multikueueAdapter, managerClient, workerClient client.Client) error { + return adapter.DeleteRemoteObject(ctx, workerClient, types.NamespacedName{Name: "pytorchjob1", Namespace: TestNamespace}) + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + managerBuilder := utiltesting.NewClientBuilder(kftraining.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + managerBuilder = managerBuilder.WithLists(&kftraining.PyTorchJobList{Items: tc.managersPyTorchJobs}) + managerBuilder = managerBuilder.WithStatusSubresource(slices.Map(tc.managersPyTorchJobs, func(w *kftraining.PyTorchJob) client.Object { return w })...) + managerClient := managerBuilder.Build() + + workerBuilder := utiltesting.NewClientBuilder(kftraining.AddToScheme).WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}) + workerBuilder = workerBuilder.WithLists(&kftraining.PyTorchJobList{Items: tc.workerPyTorchJobs}) + workerClient := workerBuilder.Build() + + ctx, _ := utiltesting.ContextWithLog(t) + + adapter := &multikueueAdapter{} + + gotErr := tc.operation(ctx, adapter, managerClient, workerClient) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("unexpected error (-want/+got):\n%s", diff) + } + + gotManagersPyTorchJob := &kftraining.PyTorchJobList{} + if err := managerClient.List(ctx, gotManagersPyTorchJob); err != nil { + t.Errorf("unexpected list manager's pytorchjobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantManagersPyTorchJobs, gotManagersPyTorchJob.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected manager's pytorchjobs (-want/+got):\n%s", diff) + } + } + + gotWorkerPyTorchJobs := &kftraining.PyTorchJobList{} + if err := workerClient.List(ctx, gotWorkerPyTorchJobs); err != nil { + t.Errorf("unexpected list worker's pytorchjobs error %s", err) + } else { + if diff := cmp.Diff(tc.wantWorkerPyTorchJobs, gotWorkerPyTorchJobs.Items, objCheckOpts...); diff != "" { + t.Errorf("unexpected worker's pytorchjobs (-want/+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go index 2c9cfb83db..1bfc88cb95 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go @@ -45,13 +45,14 @@ func init() { JobType: &kftraining.PyTorchJob{}, AddToScheme: kftraining.AddToScheme, IsManagingObjectsOwner: isPyTorchJob, + MultiKueueAdapter: &multikueueAdapter{}, })) } // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs/status,verbs=get;update +// +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch diff --git a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go index ab2cfefe23..6f163b2a00 100644 --- a/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go +++ b/pkg/util/testingjobs/pytorchjob/wrappers_pytorchjob.go @@ -42,44 +42,87 @@ func MakePyTorchJob(name, ns string) *PyTorchJobWrapper { RunPolicy: kftraining.RunPolicy{ Suspend: ptr.To(true), }, - PyTorchReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{ - kftraining.PyTorchJobReplicaTypeMaster: { - Replicas: ptr.To[int32](1), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, - }, - NodeSelector: map[string]string{}, - }, + PyTorchReplicaSpecs: make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec), + }, + }} +} + +type PyTorchReplicaSpecRequirement struct { + ReplicaType kftraining.ReplicaType + Name string + ReplicaCount int32 + Annotations map[string]string + RestartPolicy kftraining.RestartPolicy +} + +func (j *PyTorchJobWrapper) PyTorchReplicaSpecs(replicaSpecs ...PyTorchReplicaSpecRequirement) *PyTorchJobWrapper { + j = j.PyTorchReplicaSpecsDefault() + for _, rs := range replicaSpecs { + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount) + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Name = rs.Name + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy) + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "pytorch" + + if rs.Annotations != nil { + j.Spec.PyTorchReplicaSpecs[rs.ReplicaType].Template.ObjectMeta.Annotations = rs.Annotations + } + } + + return j +} + +func (j *PyTorchJobWrapper) PyTorchReplicaSpecsDefault() *PyTorchJobWrapper { + j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeMaster] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, }, - kftraining.PyTorchJobReplicaTypeWorker: { - Replicas: ptr.To[int32](1), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - RestartPolicy: "Never", - Containers: []corev1.Container{ - { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, - }, - }, - NodeSelector: map[string]string{}, - }, + NodeSelector: map[string]string{}, + }, + }, + } + + j.Spec.PyTorchReplicaSpecs[kftraining.PyTorchJobReplicaTypeWorker] = &kftraining.ReplicaSpec{ + Replicas: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: "Never", + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, }, }, + NodeSelector: map[string]string{}, }, }, - }} + } + + return j +} + +// Clone returns deep copy of the PyTorchJobWrapper. +func (j *PyTorchJobWrapper) Clone() *PyTorchJobWrapper { + return &PyTorchJobWrapper{PyTorchJob: *j.DeepCopy()} +} + +// Label sets the label key and value +func (j *PyTorchJobWrapper) Label(key, value string) *PyTorchJobWrapper { + if j.Labels == nil { + j.Labels = make(map[string]string) + } + j.Labels[key] = value + return j } // PriorityClass updates job priorityclass. @@ -155,3 +198,15 @@ func (j *PyTorchJobWrapper) PodLabel(replicaType kftraining.ReplicaType, k, v st j.Spec.PyTorchReplicaSpecs[replicaType].Template.Labels[k] = v return j } + +// Condition adds a condition +func (j *PyTorchJobWrapper) StatusConditions(c kftraining.JobCondition) *PyTorchJobWrapper { + j.Status.Conditions = append(j.Status.Conditions, c) + return j +} + +func (j *PyTorchJobWrapper) Image(replicaType kftraining.ReplicaType, image string, args []string) *PyTorchJobWrapper { + j.Spec.PyTorchReplicaSpecs[replicaType].Template.Spec.Containers[0].Image = image + j.Spec.PyTorchReplicaSpecs[replicaType].Template.Spec.Containers[0].Args = args + return j +} diff --git a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh index eb3224306f..a184b0fedc 100644 --- a/site/static/examples/multikueue/create-multikueue-kubeconfig.sh +++ b/site/static/examples/multikueue/create-multikueue-kubeconfig.sh @@ -117,6 +117,22 @@ rules: - paddlejobs/status verbs: - get + - apiGroups: + - kubeflow.org + resources: + - pytorchjobs + verbs: + - create + - delete + - get + - list + - watch +- apiGroups: + - kubeflow.org + resources: + - pytorchjobs/status + verbs: + - get --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index ff0401a925..49ec5f7f36 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -41,11 +41,13 @@ import ( workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" + workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" + testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -406,7 +408,6 @@ var _ = ginkgo.Describe("MultiKueue", func() { ginkgo.By("Creating the TfJob", func() { gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed()) }) - wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name} // the execution should be given to the worker @@ -545,6 +546,89 @@ var _ = ginkgo.Describe("MultiKueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) + + ginkgo.It("Should run a kubeflow PyTorchJob on worker if admitted", func() { + // Since it requires 1600M of memory, this job can only be admitted in worker 2. + pyTorchJob := testingpytorchjob.MakePyTorchJob("pytorchjob1", managerNs.Name). + Queue(managerLq.Name). + PyTorchReplicaSpecs( + testingpytorchjob.PyTorchReplicaSpecRequirement{ + ReplicaType: kftraining.PyTorchJobReplicaTypeMaster, + ReplicaCount: 1, + RestartPolicy: "Never", + }, + testingpytorchjob.PyTorchReplicaSpecRequirement{ + ReplicaType: kftraining.PyTorchJobReplicaTypeWorker, + ReplicaCount: 1, + RestartPolicy: "OnFailure", + }, + ). + Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "0.2"). + Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceMemory, "800M"). + Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "0.5"). + Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceMemory, "800M"). + Image(kftraining.PyTorchJobReplicaTypeMaster, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Image(kftraining.PyTorchJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Obj() + + ginkgo.By("Creating the PyTorchJob", func() { + gomega.Expect(k8sManagerClient.Create(ctx, pyTorchJob)).Should(gomega.Succeed()) + }) + + createdLeaderWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadpytorchjob.GetWorkloadNameForPyTorchJob(pyTorchJob.Name, pyTorchJob.UID), Namespace: managerNs.Name} + + // the execution should be given to the worker 2 + ginkgo.By("Waiting to be admitted in worker2 and manager", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multiKueueAc.Name, + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker2"`, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Waiting for the PyTorchJob to finish", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPyTorchJob := &kftraining.PyTorchJob{} + g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(pyTorchJob), createdPyTorchJob)).To(gomega.Succeed()) + g.Expect(createdPyTorchJob.Status.ReplicaStatuses[kftraining.PyTorchJobReplicaTypeMaster]).To(gomega.BeComparableTo( + &kftraining.ReplicaStatus{ + Active: 0, + Succeeded: 1, + Selector: fmt.Sprintf("training.kubeflow.org/job-name=%s,training.kubeflow.org/operator-name=pytorchjob-controller,training.kubeflow.org/replica-type=master", createdPyTorchJob.Name), + }, + util.IgnoreConditionTimestampsAndObservedGeneration)) + + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: fmt.Sprintf("PyTorchJob %s is successfully completed.", pyTorchJob.Name), + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking no objects are left in the worker clusters and the PyTorchJob is completed", func() { + gomega.Eventually(func(g gomega.Gomega) { + workerWl := &kueue.Workload{} + g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + workerPyTorchJob := &kftraining.PyTorchJob{} + g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(pyTorchJob), workerPyTorchJob)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(pyTorchJob), workerPyTorchJob)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index 02fc6a43d8..9f2913634c 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -91,6 +91,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"), policyRule(kftraining.SchemeGroupVersion.Group, "paddlejobs", resourceVerbs...), policyRule(kftraining.SchemeGroupVersion.Group, "paddlejobs/status", "get"), + policyRule(kftraining.SchemeGroupVersion.Group, "pytorchjobs", resourceVerbs...), + policyRule(kftraining.SchemeGroupVersion.Group, "pytorchjobs/status", "get"), }, } err := c.Create(ctx, cr) diff --git a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go index e725a5b481..fa02d31fac 100644 --- a/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go +++ b/test/integration/controller/jobs/pytorchjob/pytorchjob_controller_test.go @@ -80,7 +80,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) ginkgo.It("Should reconcile PyTorchJobs", func() { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).PyTorchReplicaSpecsDefault().Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(&kftraining.PyTorchJob{})} kftesting.ShouldReconcileJob(ctx, k8sClient, kfJob, createdJob, []kftesting.PodSetsResource{ { @@ -125,7 +125,9 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue ginkgo.It("Should reconcile jobs only when queue is set", func() { ginkgo.By("checking the workload is not created when queue name is not set") - job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Obj() + job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} createdJob := &kftraining.PyTorchJob{} @@ -182,6 +184,7 @@ var _ = ginkgo.Describe("Job controller for workloads when only jobs with queue createdJob := &kftraining.PyTorchJob{} createdWorkload := &kueue.Workload{} job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). PodAnnotation(kftraining.PyTorchJobReplicaTypeWorker, "old-ann-key", "old-ann-value"). PodLabel(kftraining.PyTorchJobReplicaTypeWorker, "old-label-key", "old-label-value"). Queue(localQueue.Name). @@ -359,7 +362,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", ginkgo.O ginkgo.DescribeTable("Single job at different stages of progress towards completion", func(podsReadyTestSpec kftesting.PodsReadyTestSpec) { - kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Parallelism(2).Obj())} + kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(testingpytorchjob.MakePyTorchJob(jobName, ns.Name).PyTorchReplicaSpecsDefault().Parallelism(2).Obj())} createdJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)(&kftraining.PyTorchJob{})} kftesting.JobControllerWhenWaitForPodsReadyEnabled(ctx, k8sClient, kfJob, createdJob, podsReadyTestSpec, []kftesting.PodSetsResource{ @@ -512,7 +515,9 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) kfJob := kubeflowjob.KubeflowJob{KFJobControl: (*workloadpytorchjob.JobControl)( - testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Queue(localQueue.Name). + testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Queue(localQueue.Name). Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj(), @@ -534,7 +539,9 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde ginkgo.When("The workload's admission is removed", func() { ginkgo.It("Should restore the original node selectors", func() { localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name).Queue(localQueue.Name). + job := testingpytorchjob.MakePyTorchJob(jobName, ns.Name). + PyTorchReplicaSpecsDefault(). + Queue(localQueue.Name). Request(kftraining.PyTorchJobReplicaTypeMaster, corev1.ResourceCPU, "3"). Request(kftraining.PyTorchJobReplicaTypeWorker, corev1.ResourceCPU, "4"). Obj() diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index b38e180b93..aeea0dd71b 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -42,12 +42,14 @@ import ( workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" + workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" "sigs.k8s.io/kueue/pkg/features" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob" + testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob" testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -1063,6 +1065,144 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) + ginkgo.It("Should run a PyTorchJob on worker if admitted", func() { + pyTorchJob := testingpytorchjob.MakePyTorchJob("pytorchjob1", managerNs.Name). + Queue(managerLq.Name). + PyTorchReplicaSpecs( + testingpytorchjob.PyTorchReplicaSpecRequirement{ + ReplicaType: kftraining.PyTorchJobReplicaTypeMaster, + ReplicaCount: 1, + Name: "pytorchjob-master", + RestartPolicy: "OnFailure", + }, + testingpytorchjob.PyTorchReplicaSpecRequirement{ + ReplicaType: kftraining.PyTorchJobReplicaTypeWorker, + ReplicaCount: 1, + Name: "pytorchjob-worker", + RestartPolicy: "Never", + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, pyTorchJob)).Should(gomega.Succeed()) + + wlLookupKey := types.NamespacedName{Name: workloadpytorchjob.GetWorkloadNameForPyTorchJob(pyTorchJob.Name, pyTorchJob.UID), Namespace: managerNs.Name} + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "master", + }, kueue.PodSetAssignment{ + Name: "worker", + }, + ).Obj() + + ginkgo.By("setting workload reservation in the management cluster", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + createdWorkload := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager and worker1 wl is removed", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).NotTo(gomega.BeNil()) + g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady)) + g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker2"`)) + + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("changing the status of the PyTorchJob in the worker, updates the manager's PyTorchJob status", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPyTorchJob := kftraining.PyTorchJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(pyTorchJob), &createdPyTorchJob)).To(gomega.Succeed()) + createdPyTorchJob.Status.ReplicaStatuses = map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.PyTorchJobReplicaTypeMaster: { + Active: 1, + }, + kftraining.PyTorchJobReplicaTypeWorker: { + Active: 2, + Succeeded: 1, + }, + } + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdPyTorchJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdPyTorchJob := kftraining.PyTorchJob{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(pyTorchJob), &createdPyTorchJob)).To(gomega.Succeed()) + g.Expect(createdPyTorchJob.Status.ReplicaStatuses).To(gomega.Equal( + map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.PyTorchJobReplicaTypeMaster: { + Active: 1, + }, + kftraining.PyTorchJobReplicaTypeWorker: { + Active: 2, + Succeeded: 1, + }, + })) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("finishing the worker PyTorchJob, the manager's wl is marked as finished and the worker2 wl removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdPyTorchJob := kftraining.PyTorchJob{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(pyTorchJob), &createdPyTorchJob)).To(gomega.Succeed()) + createdPyTorchJob.Status.Conditions = append(createdPyTorchJob.Status.Conditions, kftraining.JobCondition{ + Type: kftraining.JobSucceeded, + Status: corev1.ConditionTrue, + Reason: "ByTest", + Message: "TFJob finished successfully", + }) + g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdPyTorchJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: string(kftraining.JobSucceeded), + Message: "TFJob finished successfully", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + ginkgo.It("Should remove the worker's workload and job after reconnect when the managers job and workload are deleted", func() { job := testingjob.MakeJob("job", managerNs.Name). Queue(managerLq.Name). diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 742e1dbffb..3a083efa03 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -42,6 +42,7 @@ import ( workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob" + workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob" workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -161,6 +162,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { err = workloadpaddlejob.SetupPaddleJobWebhook(mgr) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadpytorchjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + pyTorchJobReconciler := workloadpytorchjob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = pyTorchJobReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = workloadpytorchjob.SetupPyTorchJobWebhook(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) {