Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ManagedBy field in RunPolicy #650

Merged
merged 11 commits into from
Oct 10, 2024
11 changes: 11 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7757,6 +7757,17 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to Running.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a MPIJob which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
11 changes: 11 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7734,6 +7734,17 @@ spec:
CleanPodPolicy defines the policy to kill pods after the job completes.
Default to Running.
type: string
managedBy:
description: |-
ManagedBy is used to indicate the controller or entity that manages a MPIJob.
The value must be either an empty, 'kubeflow.org/mpi-operator' or
'kueue.x-k8s.io/multikueue'.
The mpi-operator reconciles a MPIJob which doesn't have this
field at all or the field value is the reserved string
'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable.
type: string
schedulingPolicy:
description: SchedulingPolicy defines the policy related to scheduling,
e.g. gang-scheduling
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.",
"type": "string"
},
"managedBy": {
"description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.",
"type": "string"
},
"schedulingPolicy": {
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/v2beta1.SchedulingPolicy"
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ type SchedulingPolicy struct {
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}

const (
// KubeflowJobController represents the value of the default job controller
KubeflowJobController = "kubeflow.org/mpi-operator"

// MultiKueueController represents the vaue of the MultiKueue controller
MultiKueueController = "kueue.x-k8s.io/multikueue"
)

// RunPolicy encapsulates various runtime policies of the distributed training
// job, for example how to clean up resources and how long the job can stay
// active.
Expand Down Expand Up @@ -131,6 +139,17 @@ type RunPolicy struct {
// Defaults to false.
// +kubebuilder:default:=false
Suspend *bool `json:"suspend,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a MPIJob.
// The value must be either an empty, 'kubeflow.org/mpi-operator' or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The value must be either an empty, 'kubeflow.org/mpi-operator' or
// The value must be either empty, 'kubeflow.org/mpi-operator' or

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mszadkow Could you address this comment in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will be addressed asap

// 'kueue.x-k8s.io/multikueue'.
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
// The mpi-operator reconciles a MPIJob which doesn't have this
// field at all or the field value is the reserved string
// 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob
// with 'kueue.x-k8s.io/multikueue' to the Kueue.
// The field is immutable.
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
// +optional
ManagedBy *string `json:"managedBy,omitempty"`
}

type LauncherCreationPolicy string
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions pkg/apis/kubeflow/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ var (

validRestartPolicies = sets.NewString(
string(kubeflow.RestartPolicyNever),
string(kubeflow.RestartPolicyOnFailure),
)
string(kubeflow.RestartPolicyOnFailure))

validManagedBy = sets.NewString(
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
string(kubeflow.MultiKueueController),
string(kubeflow.KubeflowJobController))
)

func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList {
Expand Down Expand Up @@ -98,6 +101,11 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error
if policy.BackoffLimit != nil {
errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*policy.BackoffLimit), path.Child("backoffLimit"))...)
}
if policy.ManagedBy != nil {
if !validManagedBy.Has(*policy.ManagedBy) {
errs = append(errs, field.NotSupported(path.Child("managedBy"), *policy.ManagedBy, validManagedBy.List()))
}
}
return errs
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestValidateMPIJob(t *testing.T) {
TTLSecondsAfterFinished: ptr.To[int32](-1),
ActiveDeadlineSeconds: ptr.To[int64](-1),
BackoffLimit: ptr.To[int32](-1),
ManagedBy: ptr.To("invalid.com/controller"),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: kubeflow.MPIImplementation("Unknown"),
Expand Down Expand Up @@ -239,6 +240,10 @@ func TestValidateMPIJob(t *testing.T) {
Type: field.ErrorTypeInvalid,
Field: "spec.runPolicy.backoffLimit",
},
{
Type: field.ErrorTypeNotSupported,
Field: "spec.runPolicy.managedBy",
},
{
Type: field.ErrorTypeNotSupported,
Field: "spec.mpiImplementation",
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,11 @@ func (c *MPIJobController) syncHandler(key string) error {
// Set default for the new mpiJob.
scheme.Scheme.Default(mpiJob)

if manager := managedByExternalController(mpiJob.Spec.RunPolicy.ManagedBy); manager != nil {
klog.V(2).Info("Skipping MPIJob managed by a custom controller", "managed-by", manager)
return nil
}

// for mpi job that is terminating, just return.
if mpiJob.DeletionTimestamp != nil {
return nil
Expand Down Expand Up @@ -1722,3 +1727,10 @@ func truncateMessage(message string) string {
suffix := "..."
return message[:eventMessageLimit-len(suffix)] + suffix
}

func managedByExternalController(controllerName *string) *string {
if controllerName != nil && *controllerName != kubeflow.KubeflowJobController {
return controllerName
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) {
f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d))
}

func (f *fixture) expectNoKubeActions() bool {
k8sActions := filterInformerActions(f.kubeClient.Actions())
return len(k8sActions) == 0
}

func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob)
action.Subresource = "status"
Expand Down Expand Up @@ -504,6 +509,21 @@ func TestDoNothingWithInvalidMPIJob(t *testing.T) {
f.run(getKey(mpiJob, t))
}

func TestDoNothingWithMPIJobManagedExternally(t *testing.T) {
mszadkow marked this conversation as resolved.
Show resolved Hide resolved
f := newFixture(t, "")
var replicas int32 = 1
startTime := metav1.Now()
completionTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime)
mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationOpenMPI
mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController)
f.setUpMPIJob(mpiJob)
f.run(getKey(mpiJob, t))
if !f.expectNoKubeActions() {
t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)")
}
}
mszadkow marked this conversation as resolved.
Show resolved Hide resolved

mszadkow marked this conversation as resolved.
Show resolved Hide resolved
func TestAllResourcesCreated(t *testing.T) {
impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH}
for _, implementation := range impls {
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1RunPolicy.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading