Skip to content

Commit

Permalink
MXNet SDK with Status check fix (#1618)
Browse files Browse the repository at this point in the history
* Adding latest image tag

* Update manifests with latest image tag

* Add MXNet Job SDK

* Fix namespace
  • Loading branch information
johnugeorge authored Jun 20, 2022
1 parent fbcb6f3 commit 0a0bb0c
Show file tree
Hide file tree
Showing 8 changed files with 652 additions and 30 deletions.
1 change: 1 addition & 0 deletions hack/python-sdk/post_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def add_imports() -> None:
init_file.write("from kubeflow.training.api.py_torch_job_client import PyTorchJobClient\n")
init_file.write("from kubeflow.training.api.xgboost_job_client import XGBoostJobClient\n")
init_file.write("from kubeflow.training.api.mpi_job_client import MPIJobClient\n")
init_file.write("from kubeflow.training.api.mx_job_client import MXJobClient\n")
with open(os.path.join(sdk_dir, "kubeflow/__init__.py"), "a") as init_file:
init_file.write("__path__ = __import__('pkgutil').extend_path(__path__, __name__)")

Expand Down
63 changes: 33 additions & 30 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
return err
}

if mxjob.Status.StartTime == nil {
now := metav1.Now()
mxjob.Status.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if mxjob.Spec.RunPolicy.ActiveDeadlineSeconds != nil {
logrus.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *mxjob.Spec.RunPolicy.ActiveDeadlineSeconds)
r.WorkQueue.AddAfter(mxjobKey, time.Duration(*mxjob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second)
}
}

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]

Expand All @@ -338,39 +348,32 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
r.Log.Info(fmt.Sprintf("MXJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
mxjob.Name, rtype, expected, running, succeeded, failed))

if mxjob.Status.StartTime == nil {
now := metav1.Now()
mxjob.Status.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if mxjob.Spec.RunPolicy.ActiveDeadlineSeconds != nil {
logrus.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *mxjob.Spec.RunPolicy.ActiveDeadlineSeconds)
r.WorkQueue.AddAfter(mxjobKey, time.Duration(*mxjob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second)
}
}

if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, mxJobRunningReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
}
if expected == 0 {
msg := fmt.Sprintf("MXJob %s is successfully completed.", mxjob.Name)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobSucceededReason, msg)
if mxjob.Status.CompletionTime == nil {
now := metav1.Now()
mxjob.Status.CompletionTime = &now
if rtype == commonv1.ReplicaType(kubeflowv1.MXJobReplicaTypeScheduler) {
if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, mxJobRunningReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, mxJobSucceededReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
// when scheduler is succeeded, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MXJob %s is successfully completed.", mxjob.Name)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobSucceededReason, msg)
if mxjob.Status.CompletionTime == nil {
now := metav1.Now()
mxjob.Status.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, mxJobSucceededReason, msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName)
return nil
}
trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, kubeflowv1.MXJobFrameworkName)
}

if failed > 0 {
if spec.RestartPolicy == commonv1.RestartPolicyExitCode {
msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/kubeflow/training/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@
from kubeflow.training.api.py_torch_job_client import PyTorchJobClient
from kubeflow.training.api.xgboost_job_client import XGBoostJobClient
from kubeflow.training.api.mpi_job_client import MPIJobClient
from kubeflow.training.api.mx_job_client import MXJobClient
Loading

0 comments on commit 0a0bb0c

Please sign in to comment.