Skip to content

Commit

Permalink
add support for ttl cleanup for finished jobsets
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele committed Mar 31, 2024
1 parent 7fc6624 commit 15fcc39
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 16 deletions.
11 changes: 11 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ type JobSetSpec struct {

// ManagedBy is used to indicate the controller or entity that manages a JobSet
ManagedBy *string `json:"managedBy,omitempty"`

// TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished
// execution (either Complete or Failed). If this field is set,
// TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be
// automatically deleted. When the JobSet is being deleted, its lifecycle
// guarantees (e.g. finalizers) will be honored. If this field is unset,
// the JobSet won't be automatically deleted. If this field is set to zero,
// the JobSet becomes eligible to be deleted immediately after it finishes.
// +kubebuilder:validation:Minimum=0
// +optional
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
}

// JobSetStatus defines the observed state of JobSet
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8335,6 +8335,18 @@ spec:
suspend:
description: Suspend suspends all running child Jobs when set to true.
type: boolean
ttlSecondsAfterFinished:
description: |-
TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished
execution (either Complete or Failed). If this field is set,
TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be
automatically deleted. When the JobSet is being deleted, its lifecycle
guarantees (e.g. finalizers) will be honored. If this field is unset,
the JobSet won't be automatically deleted. If this field is set to zero,
the JobSet becomes eligible to be deleted immediately after it finishes.
format: int32
minimum: 0
type: integer
type: object
status:
description: JobSetStatus defines the observed state of JobSet
Expand Down
5 changes: 5 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
"suspend": {
"description": "Suspend suspends all running child Jobs when set to true.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.",
"type": "integer",
"format": "int32"
}
}
},
Expand Down
114 changes: 112 additions & 2 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"
"strconv"
"sync"
"time"

"k8s.io/utils/clock"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,6 +51,7 @@ type JobSetReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
clock clock.Clock
}

type childJobs struct {
Expand All @@ -62,7 +66,7 @@ type childJobs struct {
}

func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler {
return &JobSetReconciler{Client: client, Scheme: scheme, Record: record}
return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
Expand Down Expand Up @@ -110,8 +114,26 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, err
}

// If JobSet is already completed or failed, clean up active child jobs.
// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(&js) {
if js.Spec.TTLSecondsAfterFinished != nil {
expired, err := r.checkIfTTLExpired(ctx, &js)
if err != nil {
log.Error(err, "checking if TTL expired")
return ctrl.Result{}, err
}
// if expired is true, that means the TTL has expired, and we should delete the JobSet
// otherwise, we requeue it for the remaining TTL duration.
if expired {
log.V(5).Info("JobSet TTL expired")
if err := r.deleteJobSet(ctx, &js); err != nil {
log.Error(err, "deleting jobset")
return ctrl.Result{}, err
}
} else {
return ctrl.Result{RequeueAfter: requeueJobSetAfter(&js)}, nil
}
}
if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
Expand Down Expand Up @@ -946,3 +968,91 @@ func managedByExternalController(js jobset.JobSet) *string {
}
return nil
}

func (r *JobSetReconciler) deleteJobSet(ctx context.Context, js *jobset.JobSet) error {
log := ctrl.LoggerFrom(ctx)

policy := metav1.DeletePropagationForeground
options := []client.DeleteOption{client.PropagationPolicy(policy)}
log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(js))

return r.Delete(ctx, js, options...)
}

// checkIfTTLExpired checks whether a given JobSet's TTL has expired.
func (r *JobSetReconciler) checkIfTTLExpired(ctx context.Context, jobSet *jobset.JobSet) (bool, error) {
// We don't care about the JobSets that are going to be deleted
if jobSet.DeletionTimestamp != nil {
return false, nil
}

now := r.clock.Now()
remaining, err := timeLeft(ctx, jobSet, &now)
if err != nil {
return false, err
}

// TTL has expired
ttlExpired := remaining != nil && *remaining <= 0
return ttlExpired, nil
}

// timeLeft returns the time left until the JobSet's TTL expires and the time when it will expire.
func timeLeft(ctx context.Context, js *jobset.JobSet, now *time.Time) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)

finishAt, expireAt, err := getJobSetFinishAndExpireTime(js)
if err != nil {
return nil, err
}
// The following 2 checks do sanity checking for nil pointers in case of changes to the above function.
// This logic should never be executed.
if now == nil || finishAt == nil || expireAt == nil {
log.V(2).Info("Warning: Calculated invalid expiration time. JobSet cleanup will be deferred.")
return nil, nil
}

if finishAt.After(*now) {
log.V(2).Info("Warning: Found JobSet finished in the future. This is likely due to time skew in the cluster. JobSet cleanup will be deferred.")
}
remaining := expireAt.Sub(*now)
log.V(2).Info("Found JobSet finished", "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", now.UTC(), "deadlineTTL", expireAt.UTC())
return &remaining, nil
}

func getJobSetFinishAndExpireTime(js *jobset.JobSet) (finishAt, expireAt *time.Time, err error) {
finishTime, err := jobSetFinishTime(js)
if err != nil {
return nil, nil, err
}

finishAt = &finishTime.Time
expiration := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second)
expireAt = ptr.To(expiration)
return finishAt, expireAt, nil
}

// jobSetFinishTime takes an already finished JobSet and returns the time it finishes.
func jobSetFinishTime(finishedJobSet *jobset.JobSet) (metav1.Time, error) {
for _, c := range finishedJobSet.Status.Conditions {
if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue {
finishAt := c.LastTransitionTime
if finishAt.IsZero() {
return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name)
}
return finishAt, nil
}
}

// This should never happen if the JobSets have finished
return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name)
}

// requeueJobSetAfter returns the duration after which the JobSet should be requeued if TTLSecondsAfterFinished is set, otherwise returns 0.
func requeueJobSetAfter(js *jobset.JobSet) time.Duration {
var requeueAfter time.Duration = 0
if js.Spec.TTLSecondsAfterFinished != nil {
requeueAfter = time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second
}
return requeueAfter
}
Loading

0 comments on commit 15fcc39

Please sign in to comment.