Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for ttl cleanup for finished jobsets #443

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ type JobSetSpec struct {

// ManagedBy is used to indicate the controller or entity that manages a JobSet
ManagedBy *string `json:"managedBy,omitempty"`

// TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished
// execution (either Complete or Failed). If this field is set,
// TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be
// automatically deleted. When the JobSet is being deleted, its lifecycle
// guarantees (e.g. finalizers) will be honored. If this field is unset,
// the JobSet won't be automatically deleted. If this field is set to zero,
// the JobSet becomes eligible to be deleted immediately after it finishes.
// +kubebuilder:validation:Minimum=0
// +optional
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
}

// JobSetStatus defines the observed state of JobSet
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 16 additions & 7 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8335,6 +8335,18 @@ spec:
suspend:
description: Suspend suspends all running child Jobs when set to true.
type: boolean
ttlSecondsAfterFinished:
description: |-
TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished
execution (either Complete or Failed). If this field is set,
TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be
automatically deleted. When the JobSet is being deleted, its lifecycle
guarantees (e.g. finalizers) will be honored. If this field is unset,
the JobSet won't be automatically deleted. If this field is set to zero,
the JobSet becomes eligible to be deleted immediately after it finishes.
format: int32
minimum: 0
type: integer
type: object
status:
description: JobSetStatus defines the observed state of JobSet
Expand Down
22 changes: 22 additions & 0 deletions examples/simple/ttl-after-finished.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
name: jobset-with-ttl
spec:
ttlSecondsAfterFinished: 60
replicatedJobs:
- name: workers
template:
spec:
parallelism: 4
completions: 4
backoffLimit: 0
template:
spec:
containers:
- name: sleep
image: busybox
command:
- sleep
args:
- 100s
5 changes: 5 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@
"suspend": {
"description": "Suspend suspends all running child Jobs when set to true.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.",
"type": "integer",
"format": "int32"
}
}
},
Expand Down
15 changes: 13 additions & 2 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strconv"
"sync"

"k8s.io/utils/clock"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -48,6 +50,7 @@ type JobSetReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
clock clock.Clock
}

type childJobs struct {
Expand All @@ -62,7 +65,7 @@ type childJobs struct {
}

func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler {
return &JobSetReconciler{Client: client, Scheme: scheme, Record: record}
return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
Expand Down Expand Up @@ -110,8 +113,16 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, err
}

// If JobSet is already completed or failed, clean up active child jobs.
// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(&js) {
requeueAfter, err := executeTTLAfterFinishedPolicy(ctx, r.Client, r.clock, &js)
if err != nil {
log.Error(err, "executing ttl after finished policy")
return ctrl.Result{}, err
}
if requeueAfter > 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
Expand Down
134 changes: 134 additions & 0 deletions pkg/controllers/ttl_after_finished.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package controllers

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

// executeTTLAfterFinishedPolicy checks if the JobSet has a TTLSecondsAfterFinished set.
// If the JobSet has expired, it deletes the JobSet.
// If the JobSet has not expired, it returns the time after which the JobSet should be requeued.
// If the JobSet does not have a TTLSecondsAfterFinished set, it returns 0.
func executeTTLAfterFinishedPolicy(ctx context.Context, client client.Client, clock clock.Clock, js *jobset.JobSet) (time.Duration, error) {
log := ctrl.LoggerFrom(ctx)

if js.Spec.TTLSecondsAfterFinished != nil {
expired, err := checkIfTTLExpired(ctx, clock, js)
if err != nil {
return 0, fmt.Errorf("error checking if ttl expired: %w", err)
}
// if expired is true, that means the TTL has expired, and we should delete the JobSet
// otherwise, we requeue it for the remaining TTL duration.
if expired {
log.V(2).Info("JobSet TTL expired, deleting")
if err := deleteJobSet(ctx, client, js); err != nil {
return 0, err
}
} else {
return requeueJobSetAfter(js, clock.Now())
}
}
return 0, nil
}

// checkIfTTLExpired checks whether a given JobSet's TTL has expired.
func checkIfTTLExpired(ctx context.Context, clock clock.Clock, js *jobset.JobSet) (bool, error) {
// We don't care about the JobSets that don't have a TTL configured or are going to be deleted
if js.Spec.TTLSecondsAfterFinished == nil || js.DeletionTimestamp != nil {
return false, nil
}

now := clock.Now()
remaining, err := timeLeft(ctx, js, &now)
if err != nil {
return false, err
}

// TTL has expired
ttlExpired := remaining != nil && *remaining <= 0
return ttlExpired, nil
}

// timeLeft returns the time left until the JobSet's TTL expires and the time when it will expire.
func timeLeft(ctx context.Context, js *jobset.JobSet, now *time.Time) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)

finishAt, expireAt, err := getJobSetFinishAndExpireTime(js)
if err != nil {
return nil, err
}
// The following check does sanity checking for nil pointers in case of changes to the above function.
// This logic should never be executed.
if now == nil || finishAt == nil || expireAt == nil {
return nil, fmt.Errorf("calculated invalid expiration time, jobset cleanup will be deferred")
}

if finishAt.After(*now) {
log.V(5).Info("Found JobSet finished in the future. This is likely due to time skew in the cluster.")
}
remaining := expireAt.Sub(*now)
log.V(5).Info("Found JobSet finished", "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", now.UTC(), "deadlineTTL", expireAt.UTC())
return &remaining, nil
}

func getJobSetFinishAndExpireTime(js *jobset.JobSet) (finishAt, expireAt *time.Time, err error) {
finishTime, err := jobSetFinishTime(js)
if err != nil {
return nil, nil, err
}

finishAt = &finishTime.Time
expiration := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second)
expireAt = ptr.To(expiration)
return finishAt, expireAt, nil
}

// jobSetFinishTime takes an already finished JobSet and returns the time it finishes.
func jobSetFinishTime(finishedJobSet *jobset.JobSet) (metav1.Time, error) {
for _, c := range finishedJobSet.Status.Conditions {
if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue {
finishAt := c.LastTransitionTime
if finishAt.IsZero() {
return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name)
}
return finishAt, nil
}
}

// This should never happen if the JobSets have finished
return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name)
}

// requeueJobSetAfter returns the duration after which the JobSet should be requeued if TTLSecondsAfterFinished is set, otherwise returns 0.
func requeueJobSetAfter(js *jobset.JobSet, now time.Time) (time.Duration, error) {
var requeueAfter time.Duration = 0
if js.Spec.TTLSecondsAfterFinished != nil {
finishedAt, err := jobSetFinishTime(js)
if err != nil {
return 0, err
}
ttl := time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second
requeueAfter = finishedAt.Add(ttl).Sub(now)
}
return requeueAfter, nil
}

func deleteJobSet(ctx context.Context, c client.Client, js *jobset.JobSet) error {
log := ctrl.LoggerFrom(ctx)

policy := metav1.DeletePropagationForeground
options := []client.DeleteOption{client.PropagationPolicy(policy)}
log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(js))

return c.Delete(ctx, js, options...)
}
Loading