Skip to content

Commit

Permalink
update status.startTime for pytorchjob and xgboostjob (#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheimu authored Mar 30, 2022
1 parent 00710ba commit 8c43231
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pytorch
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
Expand Down Expand Up @@ -324,6 +325,25 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
return fmt.Errorf("%+v is not a type of PyTorchJob", job)
}

pytorchjobKey, err := common.KeyFunc(pytorchjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for pytorchjob object %#v: %v", pytorchjob, err))
return err
}

logger := commonutil.LoggerForJob(pytorchjob)

// Set StartTime.
if jobStatus.StartTime == nil {
now := metav1.Now()
jobStatus.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds != nil {
logger.Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds)
r.WorkQueue.AddAfter(pytorchjobKey, time.Duration(*pytorchjob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second)
}
}

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]
if status.LabelSelector == nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"k8s.io/client-go/informers"
"time"

"github.com/kubeflow/training-operator/pkg/apis/xgboost/validation"

Expand Down Expand Up @@ -330,6 +331,23 @@ func (r *XGBoostJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
return fmt.Errorf("%+v is not a type of xgboostJob", xgboostJob)
}

xgboostJobKey, err := common.KeyFunc(xgboostJob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for xgboostjob object %#v: %v", xgboostJob, err))
return err
}

// Set StartTime.
if jobStatus.StartTime == nil {
now := metav1.Now()
jobStatus.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds != nil {
logger.LoggerForJob(xgboostJob).Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)
r.WorkQueue.AddAfter(xgboostJobKey, time.Duration(*xgboostJob.Spec.RunPolicy.ActiveDeadlineSeconds)*time.Second)
}
}

for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]

Expand Down

0 comments on commit 8c43231

Please sign in to comment.