diff --git a/pkg/controllers/cache/cache.go b/pkg/controllers/cache/cache.go index c2d45eaa760..0c07f884c35 100644 --- a/pkg/controllers/cache/cache.go +++ b/pkg/controllers/cache/cache.go @@ -59,7 +59,15 @@ func JobKey(job *v1alpha1.Job) string { } func jobTerminated(job *apis.JobInfo) bool { - return job.Job == nil && len(job.Pods) == 0 + for _, pods := range job.Pods { + for _, pod := range pods { + if pod.Status.Phase != v1.PodSucceeded { + return false + } + } + } + + return job.Job == nil } func jobKeyOfPod(pod *v1.Pod) (string, error) { diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 5e3d7392544..3e9a4b2fb97 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -65,9 +65,13 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM _, retain := podRetainPhase[pod.Status.Phase] if !retain { - err := cc.deleteJobPod(job.Name, pod) + existing, err := cc.deleteJobPod(job.Name, pod) if err == nil { - terminating++ + if existing { + terminating++ + } else { + cc.cache.DeletePod(pod) + } continue } // record the err, and then collect the pod info like retained pod @@ -333,7 +337,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt for _, pod := range podToDelete { go func(pod *v1.Pod) { defer waitDeletionGroup.Done() - err := cc.deleteJobPod(job.Name, pod) + _, err := cc.deleteJobPod(job.Name, pod) if err != nil { // Failed to delete Pod, waitCreationGroup a moment and then create it again // This is to ensure all podsMap under the same Job created @@ -530,16 +534,19 @@ func (cc *Controller) createOrUpdatePodGroup(job *batch.Job) error { return nil } -func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error { +func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) (bool, error) { err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil) - if err != nil && !apierrors.IsNotFound(err) { + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } klog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v", pod.Namespace, pod.Name, jobName, err) - return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err) + return true, fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err) } - return nil + return true, nil } func (cc *Controller) calcPGMinResources(job *batch.Job) *v1.ResourceList { diff --git a/pkg/controllers/job/job_controller_actions_test.go b/pkg/controllers/job/job_controller_actions_test.go index c07b2f6eec1..59c793068f0 100644 --- a/pkg/controllers/job/job_controller_actions_test.go +++ b/pkg/controllers/job/job_controller_actions_test.go @@ -461,7 +461,7 @@ func TestDeleteJobPod(t *testing.T) { } } - err := fakeController.deleteJobPod(testcase.Job.Name, testcase.DeletePod) + _, err := fakeController.deleteJobPod(testcase.Job.Name, testcase.DeletePod) if err != testcase.ExpextVal { t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err) } diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index f15b74c9592..cc90f9e2d4f 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -324,13 +324,8 @@ func (cc *Controller) deletePod(obj interface{}) { return } - req := apis.Request{ - Namespace: pod.Namespace, - JobName: jobName, - TaskName: taskName, - - Event: bus.PodEvictedEvent, - JobVersion: int32(dVersion), + if pod.Status.Phase == v1.PodSucceeded { + return } if err := cc.cache.DeletePod(pod); err != nil { @@ -338,6 +333,14 @@ func (cc *Controller) deletePod(obj interface{}) { pod.Namespace, pod.Name, err) } + req := apis.Request{ + Namespace: pod.Namespace, + JobName: jobName, + TaskName: taskName, + Event: bus.PodEvictedEvent, + JobVersion: int32(dVersion), + } + key := jobhelpers.GetJobKeyByReq(&req) queue := cc.getWorkerQueue(key) queue.Add(req)