Skip to content

Commit

Permalink
Distinguish different pod-delete scenario
Browse files Browse the repository at this point in the history
Try to address issue volcano-sh#791
It's a draft solution, need further discussion.

Signed-off-by: pengli <justdoit.pli@gmail.com>
  • Loading branch information
vincent-pli committed May 27, 2020
1 parent bab40f1 commit 0c71e04
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
10 changes: 9 additions & 1 deletion pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ func JobKey(job *v1alpha1.Job) string {
}

func jobTerminated(job *apis.JobInfo) bool {
return job.Job == nil && len(job.Pods) == 0
for _, pods := range job.Pods {
for _, pod := range pods {
if pod.Status.Phase != v1.PodSucceeded {
return false
}
}
}

return job.Job == nil
}

func jobKeyOfPod(pod *v1.Pod) (string, error) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
_, retain := podRetainPhase[pod.Status.Phase]

if !retain {
err := cc.deleteJobPod(job.Name, pod)
existing, err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
if existing {
terminating++
} else {
cc.cache.DeletePod(pod)
}
continue
}
// record the err, and then collect the pod info like retained pod
Expand Down Expand Up @@ -333,7 +337,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
for _, pod := range podToDelete {
go func(pod *v1.Pod) {
defer waitDeletionGroup.Done()
err := cc.deleteJobPod(job.Name, pod)
_, err := cc.deleteJobPod(job.Name, pod)
if err != nil {
// Failed to delete Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
Expand Down Expand Up @@ -530,16 +534,19 @@ func (cc *Controller) createOrUpdatePodGroup(job *batch.Job) error {
return nil
}

func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error {
func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) (bool, error) {
err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil && !apierrors.IsNotFound(err) {
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
klog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v",
pod.Namespace, pod.Name, jobName, err)

return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err)
return true, fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err)
}

return nil
return true, nil
}

func (cc *Controller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func TestDeleteJobPod(t *testing.T) {
}
}

err := fakeController.deleteJobPod(testcase.Job.Name, testcase.DeletePod)
_, err := fakeController.deleteJobPod(testcase.Job.Name, testcase.DeletePod)
if err != testcase.ExpextVal {
t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err)
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,23 @@ func (cc *Controller) deletePod(obj interface{}) {
return
}

req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
TaskName: taskName,

Event: bus.PodEvictedEvent,
JobVersion: int32(dVersion),
if pod.Status.Phase == v1.PodSucceeded {
return
}

if err := cc.cache.DeletePod(pod); err != nil {
klog.Errorf("Failed to delete Pod <%s/%s>: %v in cache",
pod.Namespace, pod.Name, err)
}

req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
TaskName: taskName,
Event: bus.PodEvictedEvent,
JobVersion: int32(dVersion),
}

key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
Expand Down

0 comments on commit 0c71e04

Please sign in to comment.