From 82c141ebaf41878ccd7570f4074b8d84a3814414 Mon Sep 17 00:00:00 2001 From: ChanYiLin Date: Tue, 28 Apr 2020 17:44:05 +0800 Subject: [PATCH] if the job has already terminated we don't need to check activedeadline and backofflimit --- pkg/controller.v1/mxnet/controller.go | 98 ++++++++++++++++----------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/pkg/controller.v1/mxnet/controller.go b/pkg/controller.v1/mxnet/controller.go index 90291fee..a0800cde 100644 --- a/pkg/controller.v1/mxnet/controller.go +++ b/pkg/controller.v1/mxnet/controller.go @@ -21,7 +21,7 @@ import ( "time" log "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -351,6 +351,29 @@ func (tc *MXController) reconcileMXJobs(mxjob *mxv1.MXJob) error { return err } + // If the MXJob is terminated, delete all pods and services. + if isSucceeded(mxjob.Status) || isFailed(mxjob.Status) { + if err := tc.deletePodsAndServices(mxjob, pods); err != nil { + return err + } + + if err := tc.cleanupMXJob(mxjob); err != nil { + return err + } + + if tc.Config.EnableGangScheduling { + if err := tc.DeletePodGroup(mxjob); err != nil { + return err + } + } + + // Initialize the status. + initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeScheduler) + initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeWorker) + initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeServer) + return tc.updateStatusHandler(mxjob) + } + // retrieve the previous number of retry previousRetry := tc.WorkQueue.NumRequeues(mxjobKey) @@ -389,25 +412,13 @@ func (tc *MXController) reconcileMXJobs(mxjob *mxv1.MXJob) error { mxJobExceedsLimit = true } - // If the MXJob is terminated, delete all pods and services. - if isSucceeded(mxjob.Status) || isFailed(mxjob.Status) || mxJobExceedsLimit { + if mxJobExceedsLimit { + // If the MXJob exceeds backoff limit or is past active deadline + // delete all pods and services, then set the status to failed if err := tc.deletePodsAndServices(mxjob, pods); err != nil { return err } - if mxJobExceedsLimit { - tc.Recorder.Event(mxjob, v1.EventTypeNormal, mxJobFailedReason, failureMessage) - if mxjob.Status.CompletionTime == nil { - now := metav1.Now() - mxjob.Status.CompletionTime = &now - } - err := updateMXJobConditions(mxjob, mxv1.MXJobFailed, mxJobFailedReason, failureMessage) - if err != nil { - mxlogger.LoggerForJob(mxjob).Infof("Append mxjob condition error: %v", err) - return err - } - } - if err := tc.cleanupMXJob(mxjob); err != nil { return err } @@ -418,37 +429,42 @@ func (tc *MXController) reconcileMXJobs(mxjob *mxv1.MXJob) error { } } - // Initialize the status. - initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeScheduler) - initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeWorker) - initializeMXReplicaStatuses(mxjob, mxv1.MXReplicaTypeServer) - return tc.updateStatusHandler(mxjob) - } - - if tc.Config.EnableGangScheduling { - minAvailableReplicas := getTotalReplicas(mxjob) - _, err := tc.SyncPodGroup(mxjob, minAvailableReplicas) - if err != nil { - logger.Warnf("Sync PodGroup %v: %v", mxjob.Name, err) + tc.Recorder.Event(mxjob, v1.EventTypeNormal, mxJobFailedReason, failureMessage) + if mxjob.Status.CompletionTime == nil { + now := metav1.Now() + mxjob.Status.CompletionTime = &now } - } - - // Save the current state of the replicas - replicasStatus := make(map[string]v1.PodPhase) - - // Diff current active pods/services with replicas. - for rtype, spec := range mxjob.Spec.MXReplicaSpecs { - err = tc.reconcilePods(mxjob, pods, rtype, spec, replicasStatus) + err := updateMXJobConditions(mxjob, mxv1.MXJobFailed, mxJobFailedReason, failureMessage) if err != nil { - logger.Warnf("reconcilePods error %v", err) + mxlogger.LoggerForJob(mxjob).Infof("Append mxjob condition error: %v", err) return err } + } else { + if tc.Config.EnableGangScheduling { + minAvailableReplicas := getTotalReplicas(mxjob) + _, err := tc.SyncPodGroup(mxjob, minAvailableReplicas) + if err != nil { + logger.Warnf("Sync PodGroup %v: %v", mxjob.Name, err) + } + } - err = tc.reconcileServices(mxjob, services, rtype, spec) + // Save the current state of the replicas + replicasStatus := make(map[string]v1.PodPhase) - if err != nil { - logger.Warnf("reconcileServices error %v", err) - return err + // Diff current active pods/services with replicas. + for rtype, spec := range mxjob.Spec.MXReplicaSpecs { + err = tc.reconcilePods(mxjob, pods, rtype, spec, replicasStatus) + if err != nil { + logger.Warnf("reconcilePods error %v", err) + return err + } + + err = tc.reconcileServices(mxjob, services, rtype, spec) + + if err != nil { + logger.Warnf("reconcileServices error %v", err) + return err + } } }