diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index 308f2e05..420710d1 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -7757,6 +7757,17 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a MPIJob. + The value must be either an empty, 'kubeflow.org/mpi-operator' or + 'kueue.x-k8s.io/multikueue'. + The mpi-operator reconciles a MPIJob which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob + with 'kueue.x-k8s.io/multikueue' to the Kueue. + The field is immutable. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index e326aecf..38169bb1 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -7734,6 +7734,17 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a MPIJob. + The value must be either an empty, 'kubeflow.org/mpi-operator' or + 'kueue.x-k8s.io/multikueue'. + The mpi-operator reconciles a MPIJob which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob + with 'kueue.x-k8s.io/multikueue' to the Kueue. + The field is immutable. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 9a7e3c45..61449c43 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -241,6 +241,10 @@ "description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", "type": "string" }, + "managedBy": { + "description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + "type": "string" + }, "schedulingPolicy": { "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", "$ref": "#/definitions/v2beta1.SchedulingPolicy" diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 5480d842..f6831080 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -93,6 +93,14 @@ type SchedulingPolicy struct { ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"` } +const ( + // KubeflowJobController represents the value of the default job controller + KubeflowJobController = "kubeflow.org/mpi-operator" + + // MultiKueueController represents the vaue of the MultiKueue controller + MultiKueueController = "kueue.x-k8s.io/multikueue" +) + // RunPolicy encapsulates various runtime policies of the distributed training // job, for example how to clean up resources and how long the job can stay // active. @@ -131,6 +139,17 @@ type RunPolicy struct { // Defaults to false. // +kubebuilder:default:=false Suspend *bool `json:"suspend,omitempty"` + + // ManagedBy is used to indicate the controller or entity that manages a MPIJob. + // The value must be either an empty, 'kubeflow.org/mpi-operator' or + // 'kueue.x-k8s.io/multikueue'. + // The mpi-operator reconciles a MPIJob which doesn't have this + // field at all or the field value is the reserved string + // 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob + // with 'kueue.x-k8s.io/multikueue' to the Kueue. + // The field is immutable. + // +optional + ManagedBy *string `json:"managedBy,omitempty"` } type LauncherCreationPolicy string diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 75ef51aa..e85549f5 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -273,6 +273,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(bool) **out = **in } + if in.ManagedBy != nil { + in, out := &in.ManagedBy, &out.ManagedBy + *out = new(string) + **out = **in + } return } diff --git a/pkg/apis/kubeflow/validation/validation.go b/pkg/apis/kubeflow/validation/validation.go index 8658683b..bea4cd71 100644 --- a/pkg/apis/kubeflow/validation/validation.go +++ b/pkg/apis/kubeflow/validation/validation.go @@ -39,8 +39,11 @@ var ( validRestartPolicies = sets.NewString( string(kubeflow.RestartPolicyNever), - string(kubeflow.RestartPolicyOnFailure), - ) + string(kubeflow.RestartPolicyOnFailure)) + + validManagedBy = sets.NewString( + string(kubeflow.MultiKueueController), + string(kubeflow.KubeflowJobController)) ) func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList { @@ -98,6 +101,11 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error if policy.BackoffLimit != nil { errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*policy.BackoffLimit), path.Child("backoffLimit"))...) } + if policy.ManagedBy != nil { + if !validManagedBy.Has(*policy.ManagedBy) { + errs = append(errs, field.NotSupported(path.Child("managedBy"), *policy.ManagedBy, validManagedBy.List())) + } + } return errs } diff --git a/pkg/apis/kubeflow/validation/validation_test.go b/pkg/apis/kubeflow/validation/validation_test.go index 4da4676c..5c6977ba 100644 --- a/pkg/apis/kubeflow/validation/validation_test.go +++ b/pkg/apis/kubeflow/validation/validation_test.go @@ -193,6 +193,7 @@ func TestValidateMPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](-1), ActiveDeadlineSeconds: ptr.To[int64](-1), BackoffLimit: ptr.To[int32](-1), + ManagedBy: ptr.To("invalid.com/controller"), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementation("Unknown"), @@ -239,6 +240,10 @@ func TestValidateMPIJob(t *testing.T) { Type: field.ErrorTypeInvalid, Field: "spec.runPolicy.backoffLimit", }, + { + Type: field.ErrorTypeNotSupported, + Field: "spec.runPolicy.managedBy", + }, { Type: field.ErrorTypeNotSupported, Field: "spec.mpiImplementation", diff --git a/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go b/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go index 5370cddb..0481ee7e 100644 --- a/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go +++ b/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go @@ -29,6 +29,7 @@ type RunPolicyApplyConfiguration struct { BackoffLimit *int32 `json:"backoffLimit,omitempty"` SchedulingPolicy *SchedulingPolicyApplyConfiguration `json:"schedulingPolicy,omitempty"` Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` } // RunPolicyApplyConfiguration constructs an declarative configuration of the RunPolicy type for use with @@ -84,3 +85,11 @@ func (b *RunPolicyApplyConfiguration) WithSuspend(value bool) *RunPolicyApplyCon b.Suspend = &value return b } + +// WithManagedBy sets the ManagedBy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ManagedBy field is set to the value of the last call. +func (b *RunPolicyApplyConfiguration) WithManagedBy(value string) *RunPolicyApplyConfiguration { + b.ManagedBy = &value + return b +} diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 80cb46ba..4ccc1666 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -582,6 +582,11 @@ func (c *MPIJobController) syncHandler(key string) error { // Set default for the new mpiJob. scheme.Scheme.Default(mpiJob) + if manager := managedByExternalController(mpiJob.Spec.RunPolicy.ManagedBy); manager != nil { + klog.V(2).Info("Skipping MPIJob managed by a custom controller", "managed-by", manager) + return nil + } + // for mpi job that is terminating, just return. if mpiJob.DeletionTimestamp != nil { return nil @@ -1722,3 +1727,10 @@ func truncateMessage(message string) string { suffix := "..." return message[:eventMessageLimit-len(suffix)] + suffix } + +func managedByExternalController(controllerName *string) *string { + if controllerName != nil && *controllerName != kubeflow.KubeflowJobController { + return controllerName + } + return nil +} diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 95445d94..3594d354 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d)) } +func (f *fixture) expectNoKubeActions() bool { + k8sActions := filterInformerActions(f.kubeClient.Actions()) + return len(k8sActions) == 0 +} + func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) { action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob) action.Subresource = "status" @@ -504,6 +509,21 @@ func TestDoNothingWithInvalidMPIJob(t *testing.T) { f.run(getKey(mpiJob, t)) } +func TestDoNothingWithMPIJobManagedExternally(t *testing.T) { + f := newFixture(t, "") + var replicas int32 = 1 + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) + mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationOpenMPI + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) + f.setUpMPIJob(mpiJob) + f.run(getKey(mpiJob, t)) + if !f.expectNoKubeActions() { + t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)") + } +} + func TestAllResourcesCreated(t *testing.T) { impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH} for _, implementation := range impls { diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index eeb1727f..8491e8ca 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -8,6 +8,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] +**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] **scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] **suspend** | **bool** | suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index b8c25294..1860c6dc 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -36,6 +36,7 @@ class V2beta1RunPolicy(object): 'active_deadline_seconds': 'int', 'backoff_limit': 'int', 'clean_pod_policy': 'str', + 'managed_by': 'str', 'scheduling_policy': 'V2beta1SchedulingPolicy', 'suspend': 'bool', 'ttl_seconds_after_finished': 'int' @@ -45,12 +46,13 @@ class V2beta1RunPolicy(object): 'active_deadline_seconds': 'activeDeadlineSeconds', 'backoff_limit': 'backoffLimit', 'clean_pod_policy': 'cleanPodPolicy', + 'managed_by': 'managedBy', 'scheduling_policy': 'schedulingPolicy', 'suspend': 'suspend', 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, managed_by=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """V2beta1RunPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -59,6 +61,7 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self._active_deadline_seconds = None self._backoff_limit = None self._clean_pod_policy = None + self._managed_by = None self._scheduling_policy = None self._suspend = None self._ttl_seconds_after_finished = None @@ -70,6 +73,8 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self.backoff_limit = backoff_limit if clean_pod_policy is not None: self.clean_pod_policy = clean_pod_policy + if managed_by is not None: + self.managed_by = managed_by if scheduling_policy is not None: self.scheduling_policy = scheduling_policy if suspend is not None: @@ -146,6 +151,29 @@ def clean_pod_policy(self, clean_pod_policy): self._clean_pod_policy = clean_pod_policy + @property + def managed_by(self): + """Gets the managed_by of this V2beta1RunPolicy. # noqa: E501 + + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :return: The managed_by of this V2beta1RunPolicy. # noqa: E501 + :rtype: str + """ + return self._managed_by + + @managed_by.setter + def managed_by(self, managed_by): + """Sets the managed_by of this V2beta1RunPolicy. + + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :param managed_by: The managed_by of this V2beta1RunPolicy. # noqa: E501 + :type managed_by: str + """ + + self._managed_by = managed_by + @property def scheduling_policy(self): """Gets the scheduling_policy of this V2beta1RunPolicy. # noqa: E501 diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index b7f35815..5c1e9b61 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -18,10 +18,12 @@ import ( "context" "fmt" "io" + "time" "github.com/google/go-cmp/cmp" "github.com/onsi/ginkgo" "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -33,6 +35,7 @@ import ( schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + "github.com/kubeflow/mpi-operator/test/util" ) var _ = ginkgo.Describe("MPIJob", func() { @@ -164,8 +167,41 @@ var _ = ginkgo.Describe("MPIJob", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) - }) + ginkgo.It("should not be updated when managed externaly, only created", func() { + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) + ctx := context.Background() + mpiJob = createJob(ctx, mpiJob) + + time.Sleep(util.SleepDurationControllerSyncDelay) + mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{}) + gomega.Expect(err).To(gomega.BeNil()) + + // job should be created, but status should not be updated neither for create nor for any other status + condition := getJobCondition(mpiJob, kubeflow.JobCreated) + gomega.Expect(condition).To(gomega.BeNil()) + condition = getJobCondition(mpiJob, kubeflow.JobSucceeded) + gomega.Expect(condition).To(gomega.BeNil()) + launcherJob, err := getLauncherJob(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(launcherJob).To(gomega.BeNil()) + launcherPods, err := getLauncherPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(len(launcherPods.Items)).To(gomega.Equal(0)) + workerPods, err := getWorkerPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(len(workerPods.Items)).To(gomega.Equal(0)) + secret, err := getSecretsForJob(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(secret).To(gomega.BeNil()) + }) + + ginkgo.It("should succeed when explicitly managed by mpi-operator", func() { + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.KubeflowJobController) + mpiJob := createJobAndWaitForCompletion(mpiJob) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) + }) + }) }) ginkgo.Context("with Intel Implementation", func() { @@ -536,7 +572,7 @@ func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.M return mpiJob } -func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { +func getLauncherPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { selector := metav1.LabelSelector{ MatchLabels: map[string]string{ kubeflow.OperatorNameLabel: kubeflow.OperatorName, @@ -548,7 +584,45 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { LabelSelector: metav1.FormatLabelSelector(&selector), }) if err != nil { - return fmt.Errorf("getting launcher Pods: %w", err) + return &corev1.PodList{}, fmt.Errorf("getting launcher Pods: %w", err) + } + return launcherPods, nil +} + +func getWorkerPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { + selector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: mpiJob.Name, + kubeflow.JobRoleLabel: "worker", + }, + } + workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&selector), + }) + if err != nil { + return &corev1.PodList{}, fmt.Errorf("getting worker Pods: %w", err) + } + return workerPods, nil +} + +func getSecretsForJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.Secret, error) { + result, err := k8sClient.CoreV1().Secrets(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, obj := range result.Items { + if metav1.IsControlledBy(&obj, mpiJob) { + return &obj, nil + } + } + return nil, nil +} + +func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { + launcherPods, err := getLauncherPods(ctx, mpiJob) + if err != nil { + return err } if len(launcherPods.Items) == 0 { return fmt.Errorf("no launcher Pods found") @@ -563,11 +637,7 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { if err != nil { return fmt.Errorf("obtaining launcher logs: %w", err) } - - selector.MatchLabels[kubeflow.JobRoleLabel] = "worker" - workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(&selector), - }) + workerPods, err := getWorkerPods(ctx, mpiJob) if err != nil { return fmt.Errorf("getting worker Pods: %w", err) } @@ -611,6 +681,19 @@ func getJobCondition(mpiJob *kubeflow.MPIJob, condType kubeflow.JobConditionType return nil } +func getLauncherJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { + result, err := k8sClient.BatchV1().Jobs(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, j := range result.Items { + if metav1.IsControlledBy(&j, mpiJob) { + return &j, nil + } + } + return nil, nil +} + func createMPIJobWithOpenMPI(mpiJob *kubeflow.MPIJob) { mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{ { diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 5dd36b93..14419dc5 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -36,6 +36,7 @@ import ( volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/mpi-operator/test/util" ) var ( @@ -191,7 +192,7 @@ func (c *eventChecker) run() { func (c *eventChecker) verify(t *testing.T) { t.Helper() - err := wait.PollUntilContextTimeout(context.Background(), waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { c.Lock() defer c.Unlock() return c.expected.Len() == 0, nil diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index f465d077..f260d829 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -43,10 +43,7 @@ import ( "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" "github.com/kubeflow/mpi-operator/pkg/controller" -) - -const ( - waitInterval = 100 * time.Millisecond + "github.com/kubeflow/mpi-operator/test/util" ) func TestMPIJobSuccess(t *testing.T) { @@ -692,7 +689,7 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) { if err != nil { t.Errorf("Failed sending job to apiserver: %v", err) } - if err = wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err = wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { pg, err := getSchedPodGroup(ctx, gangSchedulerCfg.schedClient, mpiJob) if err != nil { return false, err @@ -808,7 +805,7 @@ func TestMPIJobWithVolcanoScheduler(t *testing.T) { if err != nil { t.Errorf("Failed sending job to apiserver: %v", err) } - if err = wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err = wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { pg, err := getVolcanoPodGroup(ctx, gangSchedulerCfg.volcanoClient, mpiJob) if err != nil { return false, err @@ -822,6 +819,88 @@ func TestMPIJobWithVolcanoScheduler(t *testing.T) { } } +func TestMPIJobManagedExternally(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + s := newTestSetup(ctx, t) + startController(ctx, s.kClient, s.mpiClient, nil) + + mpiJob := &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: s.namespace, + }, + Spec: kubeflow.MPIJobSpec{ + SlotsPerWorker: ptr.To[int32](1), + RunPolicy: kubeflow.RunPolicy{ + CleanPodPolicy: ptr.To(kubeflow.CleanPodPolicyRunning), + ManagedBy: ptr.To(kubeflow.MultiKueueController), + }, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + kubeflow.MPIReplicaTypeWorker: { + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + }, + }, + } + + // 1. The job must be created + var err error + mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(s.namespace).Create(ctx, mpiJob, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed sending job to apiserver: %v", err) + } + + time.Sleep(util.SleepDurationControllerSyncDelay) + // 2. Status is not getting updated + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, nil) + if mpiJob.Status.StartTime != nil { + t.Errorf("MPIJob should be missing startTime") + } + // 3. There should be no conditions, even the one for create + if mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { + t.Errorf("MPIJob shouldn't have any condition") + } + // 4. No Jobs or Services created + lp, err := getLauncherJobForMPIJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting launcher jobs: %v", err) + } + if lp != nil { + t.Fatalf("There should be no launcher jobs from job: %v", lp) + } + svcs, err := getServiceForJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting services for the job: %v", err) + } + if svcs != nil { + t.Fatalf("There should be no services from job: %v", svcs) + } + +} + func startController( ctx context.Context, kClient kubernetes.Interface, @@ -892,7 +971,7 @@ func validateMPIJobDependencies( podGroup metav1.Object ) var problems []string - if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { problems = nil var err error svc, err = getServiceForJob(ctx, kubeClient, job) @@ -988,7 +1067,7 @@ func validateMPIJobStatus(ctx context.Context, t *testing.T, client clientset.In err error got map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus ) - if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { newJob, err = client.KubeflowV2beta1().MPIJobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/util/constants.go b/test/util/constants.go new file mode 100644 index 00000000..17c2e82e --- /dev/null +++ b/test/util/constants.go @@ -0,0 +1,23 @@ +// Copyright 2024 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +const ( + // Time duration used to ensure that subsequent controller syncs have occurred + SleepDurationControllerSyncDelay = 1 * time.Second + WaitInterval = 100 * time.Millisecond +)