From 6748c47ba369e570ec0eea8c4e5c44b08d48bd3e Mon Sep 17 00:00:00 2001 From: congpeiqing Date: Fri, 17 Nov 2023 22:11:55 +0800 Subject: [PATCH] fix bug cant get status when worker pod spec is invalid --- pkg/controller/mpi_job_controller.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index da3f16f0..16670650 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -961,8 +961,13 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. + // But, if err is about pod spec invalid, retrying would be + // futile, the status of job should turn to failed. if err != nil { c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err) + if errors.IsInvalid(err) { + return workerPods, nil + } return nil, err } // If the worker is not controlled by this MPIJob resource, we should log @@ -1076,7 +1081,6 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher running = 0 evict = 0 ) - initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) //spec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] for i := 0; i < len(worker); i++ { @@ -1100,7 +1104,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg) } - if isMPIJobSuspended(mpiJob) { + // When workerSpec != nil and workerSpec.Replicas != 0 and len(worker) == 0, + // pod spec must be wrong, job failed. + workerSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] + if workerSpec != nil && len(worker) == 0 && *workerSpec.Replicas != 0 { + msg := "invalid pod spec" + c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, msg) + if mpiJob.Status.CompletionTime == nil { + now := metav1.Now() + mpiJob.Status.CompletionTime = &now + } + updateMPIJobConditions(mpiJob, kubeflow.JobFailed, corev1.ConditionTrue, mpiJobFailedReason, msg) + mpiJobsFailureCount.Inc() + } else if isMPIJobSuspended(mpiJob) { msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobSuspendedReason, msg) } else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {