-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement suspend semantics #1859
Implement suspend semantics #1859
Conversation
383e6b7
to
31154ff
Compare
Pull Request Test Coverage Report for Build 5613234102
💛 - Coveralls |
/assign @johnugeorge cc: @alculquicondor @mimowo This PR is the first step to support suspend semantics in the kubeflow/training-operator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the core logic for suspend semantics.
cc @trasc |
pkg/util/status.go
Outdated
} | ||
|
||
func IsSuspend(status apiv1.JobStatus) bool { | ||
return hasCondition(status, apiv1.JobSuspended) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasCondition
name now is misleading and it looks to be doing the same thing as apimachinery/pkg/api/meta.IsStatusConditionTrue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, our condition.status
is typed corev1.ConditionStatus
.
Status v1.ConditionStatus `json:"status"` |
So apimachinery/pkg/api/meta.IsStatusConditionTrue
doesn't work :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could still rename the function to isConditionTrue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I misunderstood @trasc 's comment.
I will replace all functions with IsStatusConditionTrue
. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
42b1cc9
to
1ab6390
Compare
1ab6390
to
c1af716
Compare
/hold for @alculquicondor's comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally lgtm, but I would prefer to make it more similar to Job & MPIJob.
return err | ||
} | ||
for rType := range jobStatus.ReplicaStatuses { | ||
jobStatus.ReplicaStatuses[rType].Active = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this would be set anyway by the other code once the replica pods are cleaned up. This is the approach we take in MPIJob and Job. I would like if we could apply here the same approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have any other codes to reset the Active field, and the replicaStatus[*].Active
is reset only by
training-operator/pkg/controller.v1/common/job.go
Lines 143 to 148 in 72f2512
if commonutil.IsSucceeded(jobStatus) { | |
for rtype := range jobStatus.ReplicaStatuses { | |
jobStatus.ReplicaStatuses[rtype].Succeeded += jobStatus.ReplicaStatuses[rtype].Active | |
jobStatus.ReplicaStatuses[rtype].Active = 0 | |
} | |
} |
So we need to reset the Active
field here if the job is suspended.
However, since I think we should reset the Active
field when cleaning up replica pods, I would do the refactoring in the follow-ups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any code that sets Active to non zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any code that sets Active to non zero?
Yes, here:
updateJobReplicaStatuses(jobStatus, rType, pod) |
func updateJobReplicaStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType, pod *corev1.Pod) { |
training-operator/pkg/core/status.go
Lines 34 to 50 in 72f2512
func UpdateJobReplicaStatuses(jobStatus *apiv1.JobStatus, rtype apiv1.ReplicaType, pod *corev1.Pod) { | |
switch pod.Status.Phase { | |
case corev1.PodRunning: | |
if pod.DeletionTimestamp != nil { | |
// when node is not ready, the pod will be in terminating state. | |
// Count deleted Pods as failures to account for orphan Pods that | |
// never have a chance to reach the Failed phase. | |
jobStatus.ReplicaStatuses[rtype].Failed++ | |
} else { | |
jobStatus.ReplicaStatuses[rtype].Active++ | |
} | |
case corev1.PodSucceeded: | |
jobStatus.ReplicaStatuses[rtype].Succeeded++ | |
case corev1.PodFailed: | |
jobStatus.ReplicaStatuses[rtype].Failed++ | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so those functions wouldn't be called in the next reconcile, essentially resetting the number of active pods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so those functions wouldn't be called in the next reconcile
Yes. If the job is suspended, JobController never calls ReconcilePods()
: https://github.com/tenzen-y/training-operator/blob/ce7259ecfaacbd529b6b1095dd6b632517dac0d0/pkg/controller.v1/common/job.go#L147-L173
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting counts manually, why can't it be derived from the status of all pods? Since we have already cleaned up, active pods will be zero. We can do this refactoring separately as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have functions to decrease Active count, only have functions to reset the count. So I guess we should refactor ReconcileJob(). However, the refactor will affect Succeeded and Failed conditions, too. So I would like to work on another PR.
} | ||
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobSuspendedReason), msg) | ||
if !reflect.DeepEqual(*oldStatus, jobStatus) { | ||
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to let the reconcile function continue, so that other status fields are updated, such as Active
(mentioned above)? I think it is preferable not to update here but let other fields be updated too, so that we can update as much as possible in a single reconciliation run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is preferable not to update here but let other fields be updated too, so that we can update as much as possible in a single reconciliation run.
That makes sense. As I say above, we need to refactor the ReconcilePods
:
func (jc *JobController) ReconcilePods( |
So I would do your suggestion in follow-ups.
@mimowo I updated this PR. PTAL. |
ffb736e
to
1ed3e8e
Compare
I have rebased. |
@mimowo: changing LGTM is restricted to collaborators In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
Is the title accurate? It says PyTorchJob, but I see API updates in every CRD. |
Sure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall
I just have side questions and a nit
continue | ||
} | ||
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, job.(runtime.Object)); err != nil { | ||
if err := jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject); err != nil { | ||
return err | ||
} | ||
// Pod and service have the same name, thus the service could be deleted using pod's name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side question: why is there a service per pod? That sounds like unnecessary load.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, ml framework configs need different FQDN for each pod.
For example, tensorflow ClusterSpec
: https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't a single headless Service allow that? similar to this https://kubernetes.io/docs/tasks/job/job-with-pod-to-pod-communication/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right.
Actually, using a single headless service is planning, although it is closed :(
return err | ||
} | ||
for rType := range jobStatus.ReplicaStatuses { | ||
jobStatus.ReplicaStatuses[rType].Active = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any code that sets Active to non zero?
pkg/util/status.go
Outdated
} | ||
|
||
func IsSuspend(status apiv1.JobStatus) bool { | ||
return hasCondition(status, apiv1.JobSuspended) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could still rename the function to isConditionTrue
77ec73e
to
4c28296
Compare
LGTM |
Thanks everyone! /hold cancel |
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
4c28296
to
e4bf325
Compare
@johnugeorge I addressed your comments and squashed commits into one. PTAL. |
Thanks for this awesome feature! |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: johnugeorge, tenzen-y The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What this PR does / why we need it:
I implemented the suspend semantics like batch/job and MPIJob v2beta1 to PyTorchJob. The semantics enables the external controller can stop creating pods. For example, this is useful for adapting Kubeflow TrainingJob to the job queueing system.
The training operator removes the following resources regardless of
runPolicy.cleanPodPolicy
when therunPolicy.suspend
is true:Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...
format, will close the issue(s) when PR gets merged):Part-of #1519
Related to: #1853
Checklist: