diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index 4cd901f391..6a523a70f2 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -17,6 +17,7 @@ package pytorch import ( "context" "fmt" + "time" "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" @@ -324,6 +325,25 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, return fmt.Errorf("%+v is not a type of PyTorchJob", job) } + pytorchjobKey, err := common.KeyFunc(pytorchjob) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for pytorchjob object %#v: %v", pytorchjob, err)) + return err + } + + logger := commonutil.LoggerForJob(pytorchjob) + + // Set StartTime. + if jobStatus.StartTime == nil { + now := metav1.Now() + jobStatus.StartTime = &now + // enqueue a sync to check if job past ActiveDeadlineSeconds + if pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds != nil { + logger.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds) + r.WorkQueue.AddAfter(pytorchjobKey, time.Duration(*pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second) + } + } + for rtype, spec := range replicas { status := jobStatus.ReplicaStatuses[rtype] if status.LabelSelector == nil { diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 66ea4b64eb..40488d5645 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "k8s.io/client-go/informers" + "time" "github.com/kubeflow/training-operator/pkg/apis/xgboost/validation" @@ -330,6 +331,23 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com return fmt.Errorf("%+v is not a type of xgboostJob", xgboostJob) } + xgboostJobKey, err := common.KeyFunc(xgboostJob) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for xgboostjob object %#v: %v", xgboostJob, err)) + return err + } + + // Set StartTime. + if jobStatus.StartTime == nil { + now := metav1.Now() + jobStatus.StartTime = &now + // enqueue a sync to check if job past ActiveDeadlineSeconds + if xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds != nil { + logger.LoggerForJob(xgboostJob).Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds) + r.WorkQueue.AddAfter(xgboostJobKey, time.Duration(*xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second) + } + } + for rtype, spec := range replicas { status := jobStatus.ReplicaStatuses[rtype]