Skip to content

Commit

Permalink
Merge pull request #284 from k82cn/del_job
Browse files Browse the repository at this point in the history
Enhanced job deletion.
  • Loading branch information
k82cn authored Jul 8, 2018
2 parents ccf7a4d + 49e8af2 commit 9b7ae56
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,8 @@ func MergeErrors(errs ...error) error {

return nil
}

// JobTerminated checkes whether job was terminated.
func JobTerminated(job *JobInfo) bool {
return job.SchedSpec == nil && len(job.Tasks) == 0
}
32 changes: 15 additions & 17 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,35 +319,33 @@ func (sc *SchedulerCache) Bind(taskInfo *arbapi.TaskInfo, hostname string) error
}

func (sc *SchedulerCache) deleteJob(job *arbapi.JobInfo) {
glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name)

time.AfterFunc(5*time.Second, func() {
sc.deletedJobs.AddIfNotPresent(job)
})
}

func (sc *SchedulerCache) cleanupJob(job *arbapi.JobInfo) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

if job.SchedSpec == nil && len(job.Tasks) == 0 {
delete(sc.Jobs, job.UID)
return nil
}

return fmt.Errorf("Job <%v/%v> is not ready to clean up", job.Namespace, job.Name)
}

func (sc *SchedulerCache) processCleanupJob() error {
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
job, ok := obj.(*arbapi.JobInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

if err := sc.cleanupJob(job); err != nil {
// Requeue Job to wait for all tasks deleted.
sc.deleteJob(job)
return err
}
func() {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

if arbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}()

return nil
})

Expand Down
11 changes: 10 additions & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,16 @@ func (sc *SchedulerCache) deleteTask(pi *arbapi.TaskInfo) error {
// Assumes that lock is already acquired.
func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)
return sc.deleteTask(pi)
if err := sc.deleteTask(pi); err != nil {
return err
}

// If job was terminated, delete it.
if job, found := sc.Jobs[pi.Job]; found && arbapi.JobTerminated(job) {
sc.deleteJob(job)
}

return nil
}

func (sc *SchedulerCache) AddPod(obj interface{}) {
Expand Down

0 comments on commit 9b7ae56

Please sign in to comment.