Skip to content

Commit

Permalink
fix bug cant get status when worker pod spec is invalid
Browse files Browse the repository at this point in the history
  • Loading branch information
congpeiqing committed Nov 17, 2023
1 parent 4a63d3c commit 6748c47
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,8 +961,13 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
// But, if err is about pod spec invalid, retrying would be
// futile, the status of job should turn to failed.
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err)
if errors.IsInvalid(err) {
return workerPods, nil
}
return nil, err
}
// If the worker is not controlled by this MPIJob resource, we should log
Expand Down Expand Up @@ -1076,7 +1081,6 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
running = 0
evict = 0
)

initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
//spec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
for i := 0; i < len(worker); i++ {
Expand All @@ -1100,7 +1104,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg)
}

if isMPIJobSuspended(mpiJob) {
// When workerSpec != nil and workerSpec.Replicas != 0 and len(worker) == 0,
// pod spec must be wrong, job failed.
workerSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
if workerSpec != nil && len(worker) == 0 && *workerSpec.Replicas != 0 {
msg := "invalid pod spec"
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, msg)
if mpiJob.Status.CompletionTime == nil {
now := metav1.Now()
mpiJob.Status.CompletionTime = &now
}
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, corev1.ConditionTrue, mpiJobFailedReason, msg)
mpiJobsFailureCount.Inc()
} else if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobSuspendedReason, msg)
} else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
Expand Down

0 comments on commit 6748c47

Please sign in to comment.