Skip to content

Commit

Permalink
refactor ttl after finished logic and fix ttl integration test by rem…
Browse files Browse the repository at this point in the history
…oving assertion which can flake
  • Loading branch information
dejanzele committed Apr 5, 2024
1 parent 683504b commit 6b60eed
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 279 deletions.
113 changes: 7 additions & 106 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strconv"
"sync"
"time"

"k8s.io/utils/clock"

Expand Down Expand Up @@ -116,23 +115,13 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(&js) {
if js.Spec.TTLSecondsAfterFinished != nil {
expired, err := r.checkIfTTLExpired(ctx, &js)
if err != nil {
log.Error(err, "checking if TTL expired")
return ctrl.Result{}, err
}
// if expired is true, that means the TTL has expired, and we should delete the JobSet
// otherwise, we requeue it for the remaining TTL duration.
if expired {
log.V(5).Info("JobSet TTL expired")
if err := r.deleteJobSet(ctx, &js); err != nil {
log.Error(err, "deleting jobset")
return ctrl.Result{}, err
}
} else {
return ctrl.Result{RequeueAfter: requeueJobSetAfter(&js)}, nil
}
requeueAfter, err := r.executeTTLAfterFinishedPolicy(ctx, &js)
if err != nil {
log.Error(err, "executing ttl after finished policy")
return ctrl.Result{}, err
}
if requeueAfter > 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
log.Error(err, "deleting jobs")
Expand Down Expand Up @@ -968,91 +957,3 @@ func managedByExternalController(js jobset.JobSet) *string {
}
return nil
}

func (r *JobSetReconciler) deleteJobSet(ctx context.Context, js *jobset.JobSet) error {
log := ctrl.LoggerFrom(ctx)

policy := metav1.DeletePropagationForeground
options := []client.DeleteOption{client.PropagationPolicy(policy)}
log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(js))

return r.Delete(ctx, js, options...)
}

// checkIfTTLExpired checks whether a given JobSet's TTL has expired.
func (r *JobSetReconciler) checkIfTTLExpired(ctx context.Context, jobSet *jobset.JobSet) (bool, error) {
// We don't care about the JobSets that are going to be deleted
if jobSet.DeletionTimestamp != nil {
return false, nil
}

now := r.clock.Now()
remaining, err := timeLeft(ctx, jobSet, &now)
if err != nil {
return false, err
}

// TTL has expired
ttlExpired := remaining != nil && *remaining <= 0
return ttlExpired, nil
}

// timeLeft returns the time left until the JobSet's TTL expires and the time when it will expire.
func timeLeft(ctx context.Context, js *jobset.JobSet, now *time.Time) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)

finishAt, expireAt, err := getJobSetFinishAndExpireTime(js)
if err != nil {
return nil, err
}
// The following 2 checks do sanity checking for nil pointers in case of changes to the above function.
// This logic should never be executed.
if now == nil || finishAt == nil || expireAt == nil {
log.V(2).Info("Warning: Calculated invalid expiration time. JobSet cleanup will be deferred.")
return nil, nil
}

if finishAt.After(*now) {
log.V(2).Info("Warning: Found JobSet finished in the future. This is likely due to time skew in the cluster. JobSet cleanup will be deferred.")
}
remaining := expireAt.Sub(*now)
log.V(2).Info("Found JobSet finished", "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", now.UTC(), "deadlineTTL", expireAt.UTC())
return &remaining, nil
}

func getJobSetFinishAndExpireTime(js *jobset.JobSet) (finishAt, expireAt *time.Time, err error) {
finishTime, err := jobSetFinishTime(js)
if err != nil {
return nil, nil, err
}

finishAt = &finishTime.Time
expiration := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second)
expireAt = ptr.To(expiration)
return finishAt, expireAt, nil
}

// jobSetFinishTime takes an already finished JobSet and returns the time it finishes.
func jobSetFinishTime(finishedJobSet *jobset.JobSet) (metav1.Time, error) {
for _, c := range finishedJobSet.Status.Conditions {
if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue {
finishAt := c.LastTransitionTime
if finishAt.IsZero() {
return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name)
}
return finishAt, nil
}
}

// This should never happen if the JobSets have finished
return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name)
}

// requeueJobSetAfter returns the duration after which the JobSet should be requeued if TTLSecondsAfterFinished is set, otherwise returns 0.
func requeueJobSetAfter(js *jobset.JobSet) time.Duration {
var requeueAfter time.Duration = 0
if js.Spec.TTLSecondsAfterFinished != nil {
requeueAfter = time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second
}
return requeueAfter
}
141 changes: 0 additions & 141 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package controllers
import (
"context"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -26,8 +25,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
Expand Down Expand Up @@ -1177,92 +1174,6 @@ func jobWithFailedCondition(name string, failureTime time.Time) *batchv1.Job {
}
}

func TestTimeLeft(t *testing.T) {
now := metav1.Now()

tests := []struct {
name string
completionTime metav1.Time
failedTime metav1.Time
ttl *int32
since *time.Time
expectErr bool
expectErrStr string
expectedTimeLeft *time.Duration
}{
{
name: "jobset completed now, nil since",
completionTime: now,
ttl: ptr.To[int32](0),
since: nil,
},
{
name: "jobset completed now, 0s TTL",
completionTime: now,
ttl: ptr.To[int32](0),
since: &now.Time,
expectedTimeLeft: ptr.To(0 * time.Second),
},
{
name: "jobset completed now, 10s TTL",
completionTime: now,
ttl: ptr.To[int32](10),
since: &now.Time,
expectedTimeLeft: ptr.To(10 * time.Second),
},
{
name: "jobset completed 10s ago, 15s TTL",
completionTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: ptr.To[int32](15),
since: &now.Time,
expectedTimeLeft: ptr.To(5 * time.Second),
},
{
name: "jobset failed now, 0s TTL",
failedTime: now,
ttl: ptr.To[int32](0),
since: &now.Time,
expectedTimeLeft: ptr.To(0 * time.Second),
},
{
name: "jobset failed now, 10s TTL",
failedTime: now,
ttl: ptr.To[int32](10),
since: &now.Time,
expectedTimeLeft: ptr.To(10 * time.Second),
},
{
name: "jobset failed 10s ago, 15s TTL",
failedTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: ptr.To[int32](15),
since: &now.Time,
expectedTimeLeft: ptr.To(5 * time.Second),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
jobSet := newJobSet(tc.completionTime, tc.failedTime, tc.ttl)
_, ctx := ktesting.NewTestContext(t)
gotTimeLeft, gotErr := timeLeft(ctx, jobSet, tc.since)
if tc.expectErr != (gotErr != nil) {
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
}
if tc.expectErr && len(tc.expectErrStr) == 0 {
t.Errorf("%s: invalid test setup; error message must not be empty for error cases", tc.name)
}
if tc.expectErr && !strings.Contains(gotErr.Error(), tc.expectErrStr) {
t.Errorf("%s: expected error message contains %q, got %v", tc.name, tc.expectErrStr, gotErr)
}
if !tc.expectErr {
if gotTimeLeft != nil && *gotTimeLeft != *tc.expectedTimeLeft {
t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft)
}
}
})
}
}

type makeJobArgs struct {
jobSetName string
replicatedJobName string
Expand Down Expand Up @@ -1308,55 +1219,3 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper {
PodAnnotations(annotations)
return jobWrapper
}

func newJobSet(completionTime, failedTime metav1.Time, ttl *int32) *jobset.JobSet {
js := &jobset.JobSet{
TypeMeta: metav1.TypeMeta{Kind: "JobSet"},
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: metav1.NamespaceDefault,
},
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "foobar-job",
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Image: "foo/bar"},
},
},
},
},
},
},
},
},
}

if !completionTime.IsZero() {
c := metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue, LastTransitionTime: completionTime}
js.Status.Conditions = append(js.Status.Conditions, c)
}

if !failedTime.IsZero() {
c := metav1.Condition{Type: string(jobset.JobSetFailed), Status: metav1.ConditionTrue, LastTransitionTime: failedTime}
js.Status.Conditions = append(js.Status.Conditions, c)
}

if ttl != nil {
js.Spec.TTLSecondsAfterFinished = ttl
}

return js
}
Loading

0 comments on commit 6b60eed

Please sign in to comment.