Skip to content

Commit

Permalink
refactor: job completion uses event instead polling
Browse files Browse the repository at this point in the history
  • Loading branch information
Ricardo Lüders committed Dec 18, 2023
1 parent 8ebecf8 commit 20be7ec
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions pkg/controller/periodic/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package periodic
import (
"context"
"fmt"
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -108,19 +106,33 @@ func (j *JobController) CreateGathererJob(ctx context.Context, dataGatherName, i
return j.kubeClient.BatchV1().Jobs(insightsNamespace).Create(ctx, gj, metav1.CreateOptions{})
}

// WaitForJobCompletion polls the Kubernetes API every 20 seconds and checks if the job finished.
// WaitForJobCompletion listen the Kubernetes events to check if job finished.
func (j *JobController) WaitForJobCompletion(ctx context.Context, job *batchv1.Job) error {
return wait.PollUntilContextCancel(ctx, 20*time.Second, true, func(ctx context.Context) (done bool, err error) {
j, err := j.kubeClient.BatchV1().Jobs(insightsNamespace).Get(ctx, job.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, err
}
if j.Status.Succeeded > 0 {
return true, nil
}
if j.Status.Failed > 0 {
return true, fmt.Errorf("job %s failed", job.Name)
watcher, err := j.kubeClient.BatchV1().Jobs(insightsNamespace).
Watch(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", job.Name)})
if err != nil {
return err
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("watcher channel was closed unexpectedly")
}
if event.Type == watch.Error {
return fmt.Errorf("watcher received error event: %v", event.Object)
}
job := event.Object.(*batchv1.Job)
if job.Status.Succeeded > 0 {
return nil
}
if job.Status.Failed > 0 {
return fmt.Errorf("job %s failed", job.Name)
}
}
return false, nil
})
}
}

0 comments on commit 20be7ec

Please sign in to comment.