Skip to content

Commit

Permalink
Merge pull request #61 from volcano-sh/controller
Browse files Browse the repository at this point in the history
ignore already exist error
  • Loading branch information
Klaus Ma authored Apr 3, 2019
2 parents 3583b81 + c651258 commit 3632d36
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ const (
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
//CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
Expand Down
50 changes: 31 additions & 19 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job
//Job version is bumped only when job is killed
// Job version is bumped only when job is killed
job.Status.Version = job.Status.Version + 1
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
if job.DeletionTimestamp != nil {
Expand Down Expand Up @@ -88,7 +88,14 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
}
terminating++
case v1.PodSucceeded:
succeeded++
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil {
succeeded++
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
errs = append(errs, err)
continue
}
case v1.PodFailed:
err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil)
if err != nil {
Expand Down Expand Up @@ -240,7 +247,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
terminating++
} else {
running++
} /**/
}
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
Expand Down Expand Up @@ -382,11 +389,12 @@ func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error {
},
}

if _, e := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); e != nil {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)

return e
if _, err := cc.kubeClients.CoreV1().Services(job.Namespace).Create(svc); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create Service for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
}

Expand Down Expand Up @@ -419,10 +427,10 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {

glog.V(3).Infof("Try to create input PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
return err
}
}
}
Expand All @@ -433,7 +441,7 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
//return err
return err
}

pvc := &v1.PersistentVolumeClaim{
Expand All @@ -449,10 +457,12 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {

glog.V(3).Infof("Try to create output PVC: %v", pvc)

if _, e := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); e != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return e
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
}
}
Expand Down Expand Up @@ -482,11 +492,13 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
},
}

if _, e := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); e != nil {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
if _, err := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Create(pg); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)

return e
return err
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
newJob.Namespace, newJob.Name, err)
}

//NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
glog.Infof("Job update event is ignored since no update in 'Spec'.")
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
return req.Action
}

//For all the requests triggered from discarded job resources will perform sync action instead
if req.JobVersion < job.Status.Version {
glog.Infof("Request %s is outdated, will perform sync instead.", req)
if req.Event == vkv1.OutOfSyncEvent {
return vkv1.SyncJobAction
}

if req.Event == vkv1.OutOfSyncEvent {
// For all the requests triggered from discarded job resources will perform sync action instead
if req.JobVersion < job.Status.Version {
glog.Infof("Request %s is outdated, will perform sync instead.", req)
return vkv1.SyncJobAction
}

Expand All @@ -186,6 +186,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
return policy.Action
}
}
break
}
}
}
Expand Down

0 comments on commit 3632d36

Please sign in to comment.