Skip to content

Commit

Permalink
Job failure policy support in JobController
Browse files Browse the repository at this point in the history
Fix: kubernetes/community#583

Job failure policy integration in JobController. From the
JobSpec.BackoffLimit and JobSpec.BackoffSeconds the JobController will
define the backoff duration between Job retry.

Currently the number of retry for each job is store in the
DynamicBackoff struct that is local to the JobController. It means that
if the JobController restarts the number of retries will be lost and the
backoff duration will be reset to 0.
  • Loading branch information
clamoriniere1A committed Aug 30, 2017
1 parent 4da5a52 commit df4bf0d
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 133 deletions.
1 change: 1 addition & 0 deletions pkg/controller/job/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
Expand Down
145 changes: 104 additions & 41 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
Expand All @@ -49,6 +50,13 @@ import (
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")

const (
// DefaultJobBackOff is the max backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
MaxJobBackOff = 3600 * time.Second
)

type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
Expand All @@ -75,6 +83,9 @@ type JobController struct {
// Jobs that need to be updated
queue workqueue.RateLimitingInterface

// backoff
backoff *flowcontrol.Backoff

recorder record.EventRecorder
}

Expand All @@ -100,9 +111,17 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
}

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
AddFunc: func(obj interface{}) {
if job := obj.(*batch.Job); job != nil {
jm.enqueueJob(job)
}
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
DeleteFunc: func(obj interface{}) {
if job := obj.(*batch.Job); job != nil {
jm.enqueueJob(job)
}
},
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
Expand All @@ -117,6 +136,9 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin

jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob

jm.backoff = flowcontrol.NewBackOff(DefaultJobBackOff, MaxJobBackOff)

return jm
}

Expand All @@ -136,6 +158,8 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(jm.worker, time.Second, stopCh)
}

flowcontrol.StartBackoffGC(jm.backoff, stopCh)

<-stopCh
}

Expand Down Expand Up @@ -199,7 +223,7 @@ func (jm *JobController) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
jm.enqueueJob(job)
return
}

Expand All @@ -208,7 +232,7 @@ func (jm *JobController) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
jm.enqueueJob(job)
}
}

Expand Down Expand Up @@ -240,7 +264,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
jm.enqueueJob(job)
}
}

Expand All @@ -250,15 +274,15 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if job == nil {
return
}
jm.enqueueController(job)
jm.enqueueJob(job)
return
}

// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
jm.enqueueJob(job)
}
}
}
Expand Down Expand Up @@ -299,7 +323,7 @@ func (jm *JobController) deletePod(obj interface{}) {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
jm.enqueueJob(job)
}

func (jm *JobController) updateJob(old, cur interface{}) {
Expand All @@ -311,7 +335,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.queue.Add(key)
jm.enqueueJob(curJob)
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
Expand All @@ -332,20 +356,23 @@ func (jm *JobController) updateJob(old, cur interface{}) {
}

// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
func (jm *JobController) enqueueJob(job *batch.Job) {
key, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
return
}

// Retrieves the backoff duration for this Job
backoff, _ := jm.backoff.GetWithRetryNumber(key)

// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
jm.queue.Add(key)
jm.queue.AddAfter(key, backoff)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
Expand Down Expand Up @@ -431,6 +458,15 @@ func (jm *JobController) syncJob(key string) error {
}
job := *sharedJob

// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
jm.backoff.Reset(key)
return nil
}

// retrieve the previous number of retry
_, previousRetry := jm.backoff.GetWithRetryNumber(key)

// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
Expand All @@ -456,34 +492,28 @@ func (jm *JobController) syncJob(key string) error {
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
}
}
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
return nil
}

var manageJobErr error
if pastActiveDeadline(&job) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
errCh := make(chan error, int(active))
wait.Add(int(active))
for i := int32(0); i < active; i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()

jobFailed := false
var failureReason string
var failureMessage string

jobHaveNewFailure := failed > job.Status.Failed

// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure && (previousRetry+1 > *job.Spec.BackoffLimit) {
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reach the specified backoff limit"
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
}

if jobFailed {
errCh := make(chan error, active)
jm.deleteJobPods(&job, activePods, errCh)
select {
case manageJobErr = <-errCh:
if manageJobErr != nil {
Expand All @@ -495,8 +525,8 @@ func (jm *JobController) syncJob(key string) error {
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
Expand Down Expand Up @@ -545,9 +575,42 @@ func (jm *JobController) syncJob(key string) error {
return err
}
}

if jobHaveNewFailure {
jm.backoff.NextWithInitDuration(key, time.Duration(*job.Spec.BackoffSeconds), time.Now())
// re-enqueue Job after the backoff period
jm.enqueueJob(&job)
} else {
// if no new Failure the job backoff period can be reset
jm.backoff.Reset(key)
}

return manageJobErr
}

func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
nbPods := len(pods)
wait.Add(nbPods)
for i := int32(0); i < int32(nbPods); i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()
}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
Expand Down
Loading

0 comments on commit df4bf0d

Please sign in to comment.