Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support suspend semantics for MPIJob #511

Merged
merged 2 commits into from
Feb 3, 2023

Conversation

mimowo
Copy link
Contributor

@mimowo mimowo commented Jan 27, 2023

It solves: #504

@mimowo
Copy link
Contributor Author

mimowo commented Jan 27, 2023

@alculquicondor @tenzen-y WIP but ready for early feedback (would be good as this is my first PR in this repo). PTAL.

Copy link
Collaborator

@alculquicondor alculquicondor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the common repo is on its way to disappearing. I would suggest copying the RunPolicy struct here and adding the field.

pkg/apis/kubeflow/v2beta1/types.go Outdated Show resolved Hide resolved
pkg/apis/kubeflow/v2beta1/types.go Outdated Show resolved Hide resolved
pkg/controller/mpi_job_controller.go Outdated Show resolved Hide resolved
pkg/controller/mpi_job_controller.go Outdated Show resolved Hide resolved
Comment on lines +613 to +638
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y are you familiar with the volcano integration?

I wonder if we need to remove the pod group on suspension. Does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do podgroups allocate any node resources, as pods, or they are just API objects (as services)?

I do not delete services and other API objects on suspension. I also don't delete the launcher job, just suspend it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, a podgroup is just a declaration that some pods should be treated as a unit. But a podgroup doesn't create other objects.

Copy link
Member

@tenzen-y tenzen-y Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduling.volcano.sh/v1beta1 PodGroup also has queueing logic. So we might need to delete PodGroup to re-queue Pods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we should create E2E with the volcano in a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably someone with volcano experience should do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the volcano code a little bit on my own, but could not conclude if it may create the pods under some scenarios.

Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?

+1 for the idea of an e2e test employing with podgroups. I guess we could create a follow-up Issue and ask for people willing to do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?

That sounds good to me.

if err != nil {
return err
if !isMPIJobSuspended(mpiJob) {
worker, err = c.getOrCreateWorker(mpiJob)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also create the podgroup conditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I create all resources for as long as the MPIJob is suspended. This is the common workflow for Kueue, where the Job is created suspended. In some cases, when the job never get unsuspended (for whatever reason), we can save on creating the objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now reverted the change - sorry for back end forth - this is just to minimize the diff as the unit tests currently verify the objects are created. Let me know what you think.

}
return nil
}

// first set StartTime.
if mpiJob.Status.StartTime == nil {
if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the job was suspended, we should reset the StartTime. Double check how we do it in the job controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - I've added the JobSuspended condition and replicated the semantics around StartTime.

However, the .spec.runPolicy.activeDeadlineSeconds is actually respected for MPIJobs via batch.Job:

ActiveDeadlineSeconds: mpiJob.Spec.RunPolicy.ActiveDeadlineSeconds,
.

This means that the changes aren't strictly required to enforce the timeout. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for using Job!

@mimowo
Copy link
Contributor Author

mimowo commented Jan 27, 2023

FYI, the common repo is on its way to disappearing. I would suggest copying the RunPolicy struct here and adding the field.

I see, but is common reused by other subprojects too, right? So we would also need to copy the contents of common into these repos. Sounds like a lot of work, maybe simple, but the diffs will be big and one needs to be careful, so not sure we want to block the suspend work on that? Also, is this effort already planned, or in progress @alculquicondor @tenzen-y ?

@tenzen-y
Copy link
Member

FYI, the common repo is on its way to disappearing. I would suggest copying the RunPolicy struct here and adding the field.

I see, but is common reused by other subprojects too, right? So we would also need to copy the contents of common into these repos. Sounds like a lot of work, maybe simple, but the diffs will be big and one needs to be careful, so not sure we want to block the suspend work on that? Also, is this effort already planned, or in progress @alculquicondor @tenzen-y ?

Yes, that's right. We are using common repo in training-operator. However, we are planning to consolidate common codes to the training-operator repo.

kubeflow/trainer#1714

@tenzen-y
Copy link
Member

FYI, the common repo is on its way to disappearing. I would suggest copying the RunPolicy struct here and adding the field.

@alculquicondor I agree with adding a suspend member to Runpolicy. Although can we copy RunPolicy in a separate PR? Since I think copying the Runpolicy to this repo is another context with this PR.

@mimowo
Copy link
Contributor Author

mimowo commented Jan 27, 2023

@alculquicondor I agree with adding a suspend member to Runpolicy. Although can we copy RunPolicy in a separate PR? Since I think copying the Runpolicy to this repo is another context with this PR.

What about the other constants, like the once defining conditions? I guess we could have a PR to just copy RunPolicy to mpi-operator to unblock this work, but keep the dependency on common@0.4.6 for the condition constants. Then, we can extend the set of MPIJob conditions by JobSuspended just in the mpi-operator. If this sounds good I can open a preparatory PR just to copy RunPolicy.

@mimowo mimowo force-pushed the mpijob-add-susped branch 2 times, most recently from 77e56c5 to ed0bcd7 Compare January 27, 2023 17:47
@tenzen-y
Copy link
Member

@alculquicondor I agree with adding a suspend member to Runpolicy. Although can we copy RunPolicy in a separate PR? Since I think copying the Runpolicy to this repo is another context with this PR.

What about the other constants, like the once defining conditions? I guess we could have a PR to just copy RunPolicy to mpi-operator to unblock this work, but keep the dependency on common@0.4.6 for the condition constants. Then, we can extend the set of MPIJob conditions by JobSuspended just in the mpi-operator. If this sounds good I can open a preparatory PR just to copy RunPolicy.

Sounds good to me. Although, let me know what other members think.

cc @alculquicondor @terrytangyuan

@alculquicondor
Copy link
Collaborator

sgtm

@terrytangyuan
Copy link
Member

Sounds good

@mimowo
Copy link
Contributor Author

mimowo commented Jan 30, 2023

@tenzen-y @alculquicondor I've opened the preparatory PR here: #513. Please review.

@mimowo mimowo force-pushed the mpijob-add-susped branch 2 times, most recently from aaf12e6 to 7634c74 Compare January 30, 2023 17:47
@mimowo
Copy link
Contributor Author

mimowo commented Jan 31, 2023

@terrytangyuan Please approve CI

@mimowo mimowo force-pushed the mpijob-add-susped branch from 7634c74 to fd68512 Compare January 31, 2023 10:56
@google-oss-prow google-oss-prow bot added size/XL and removed size/L labels Jan 31, 2023
@mimowo mimowo force-pushed the mpijob-add-susped branch 4 times, most recently from 13c54ea to 4e96e9a Compare January 31, 2023 16:10
@google-oss-prow google-oss-prow bot added the lgtm label Feb 1, 2023
Copy link
Collaborator

@alculquicondor alculquicondor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test?

Also, I'm not convinced of the value of an E2E test over unit+integration. Do you have a particular justification?

@@ -94,7 +96,7 @@ spec:
properties:
type:
type: string
enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"]
enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this autogenerated?
If so, I'm curious to know how it worked.

Maybe this? https://github.com/kubeflow/common/blob/9ec55d141f90faaf52fd6df271e987e5a6781945/pkg/apis/common/v1/types.go#L112

We should probably use it in Kueue, where applicable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I updated it manually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm... then make sure that make generate doesn't override it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't, checked

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y is this expected?
Or is this related to your other PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this to #510

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a known issue, and this will be fixed by #510.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it is needed for the e2e test (which I believe is worth adding).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, keep the manual change, and #510 should automatize it.

}
return nil
}

// first set StartTime.
if mpiJob.Status.StartTime == nil {
if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for using Job!

@@ -905,6 +936,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
}
if isMPIJobSuspended(mpiJob) {
// it is suspended now
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") {
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "Suspended", "MPIJob suspended") {

I don't think we need the redundancy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought so to, but this seems to be a convention here. For now, I stick to the convention but added the reason to the list.

@@ -1304,7 +1351,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
}

func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job {
return &batchv1.Job{
job := &batchv1.Job{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought:
We will probably need some kind of Job annotation that tells kueue that this Job is already queued as part of a higher level object (MPIJob in this case), so that we simply ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. IIUC, this affects only Kueue configurations when ManageJobsWithoutQueueName=true, which is non-default. We could have an annotation, yes, or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.

The dependency could be indirect. And we don't want to spend a GET call to obtain such information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but still we should strive to keep the integration interface as small as possible - every extension to the surface of the interface will be multiplied by the number of projects, but maybe a new annotation is not that bad. Also, maybe we can have a hybrid approach.

@mimowo
Copy link
Contributor Author

mimowo commented Feb 2, 2023

Can you add a unit test?

Done, ended up adding 3 actually: for creating suspended MPIJob, suspending if running and resuming.
One thing is that to write the unit test for resuming I had to refactor the code a little bit to inject a fake clock in tests.
Also, the test for suspending a running MPIJob revealed that I was requiring two syncs - one to clean up the pod workers
and one to update the MPIJob status. Now, I do these steps in one sync.

Also, I'm not convinced of the value of an E2E test over unit+integration. Do you have a particular justification?

I think of two reasons:

  1. the test abstracts out the implementation details, thus is better at documenting what the feature is about. For example, it abstracts out when the service and other auxiliary objects are created. Thus, having such a test allows us to do refactoring and have confidence that the feature works before the unit, or integration tests are adjusted.
  2. it makes us more confident about race conditions. For example, suspending or resuming an MPIJob triggers
    suspending or resuming the Launcher Job which happens asynchronously. We don't really test the interaction
    between the launcher job and the MPIJob at other layers of testing.

@@ -94,7 +96,7 @@ spec:
properties:
type:
type: string
enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"]
enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this to #510

updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended")
msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJobCopy, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg)
f.expectUpdateMPIJobStatusAction(mpiJobCopy)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect zero workers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above checks that already:

			mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{
				common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {},
				common.ReplicaType(kubeflow.MPIReplicaTypeWorker):   {},
			}

It does not specify Active, meaning it checks the value is 0.

The workers were never created in this scenario, so I cannot assert on delete actions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you can assert that there were no pods were created

the status is not the same as pods being created, necessarily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if there were some actions to create a pod, and we didn't expect them, the test would fail (IIUC). For example, when I comment out expecting the status update, the test fails as follows:
1 unexpected actions: [{ActionImpl:{Namespace:default Verb:update Resource:kubeflow.org/v2beta1, Resource=mpijobs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, so it's implicitly checked

@@ -211,12 +231,31 @@ var _ = ginkgo.Describe("MPIJob", func() {
})
})

func resumeJob(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow up: accept contexts in all these functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will open a PR directly after this, doesn't seem it requires Issue for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, going to open a PR to copy the MPIJob conditions from common.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimowo I opened the PR to copy the MPIJob conditions to this repo in #514 since I faced issues caused by the MPIJob conditions in #510.

@alculquicondor
Copy link
Collaborator

/lgtm
/assign @terrytangyuan

Copy link
Member

@terrytangyuan terrytangyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

/lgtm
/approve

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: terrytangyuan

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@terrytangyuan
Copy link
Member

The other PR got merged first so this one will need to resolve conflicts :-)

# Conflicts:
#	pkg/apis/kubeflow/v2beta1/types.go
#	pkg/controller/mpi_job_controller.go
#	pkg/controller/mpi_job_controller_status.go
#	pkg/controller/mpi_job_controller_test.go
#	test/integration/mpi_job_controller_test.go
@mimowo mimowo force-pushed the mpijob-add-susped branch from 7357382 to f600aa4 Compare February 3, 2023 15:09
@google-oss-prow google-oss-prow bot removed the lgtm label Feb 3, 2023
- add unit tests for creating suspended, suspending and resuming
- use fake clock for unit tests
- do not return from the syncHandler after worker pods cleanup on
suspend - this allows to continue with the MPIJob update in the same sync

# Conflicts:
#	pkg/controller/mpi_job_controller.go
@mimowo mimowo force-pushed the mpijob-add-susped branch from f600aa4 to 11e368e Compare February 3, 2023 15:19
@terrytangyuan
Copy link
Member

/lgtm

@google-oss-prow google-oss-prow bot added the lgtm label Feb 3, 2023
@alculquicondor
Copy link
Collaborator

Still lgtm

@google-oss-prow google-oss-prow bot merged commit 92e491e into kubeflow:master Feb 3, 2023
@mimowo mimowo deleted the mpijob-add-susped branch March 18, 2023 18:57
architkulkarni pushed a commit to ray-project/kuberay that referenced this pull request May 16, 2023
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them.

So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511

Implementation details
If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job.

If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job.

If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued.

Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else.

No Kueue-specific code leaked to Kuberay implementation

Contributors from Kueue/Kubernetes cc'ed:

@alculquicondor
@mwielgus
lowang-bh pushed a commit to lowang-bh/kuberay that referenced this pull request Sep 24, 2023
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them.

So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511

Implementation details
If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job.

If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job.

If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued.

Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else.

No Kueue-specific code leaked to Kuberay implementation

Contributors from Kueue/Kubernetes cc'ed:

@alculquicondor
@mwielgus
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants