From 586fa419c35879c9f1f6c576cd3b7dce29fcb6ae Mon Sep 17 00:00:00 2001 From: Dejan Pejchev Date: Sun, 10 Mar 2024 20:12:49 +0100 Subject: [PATCH] add support for ttl cleanup for finished jobsets --- api/jobset/v1alpha2/jobset_types.go | 11 ++ api/jobset/v1alpha2/openapi_generated.go | 7 + api/jobset/v1alpha2/zz_generated.deepcopy.go | 5 + .../jobset/v1alpha2/jobsetspec.go | 21 ++- .../crd/bases/jobset.x-k8s.io_jobsets.yaml | 12 ++ hack/python-sdk/swagger.json | 5 + pkg/controllers/jobset_controller.go | 114 +++++++++++++- pkg/controllers/jobset_controller_test.go | 141 ++++++++++++++++++ pkg/util/testing/wrappers.go | 6 + sdk/python/docs/JobsetV1alpha2JobSetSpec.md | 1 + .../models/jobset_v1alpha2_job_set_spec.py | 34 ++++- .../test/test_jobset_v1alpha2_job_set.py | 3 +- .../test/test_jobset_v1alpha2_job_set_list.py | 6 +- .../test/test_jobset_v1alpha2_job_set_spec.py | 3 +- .../controller/jobset_controller_test.go | 96 ++++++++++++ 15 files changed, 450 insertions(+), 15 deletions(-) diff --git a/api/jobset/v1alpha2/jobset_types.go b/api/jobset/v1alpha2/jobset_types.go index d0a417b4..f2fe4558 100644 --- a/api/jobset/v1alpha2/jobset_types.go +++ b/api/jobset/v1alpha2/jobset_types.go @@ -94,6 +94,17 @@ type JobSetSpec struct { // Suspend suspends all running child Jobs when set to true. Suspend *bool `json:"suspend,omitempty"` + + // TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished + // execution (either Complete or Failed). If this field is set, + // TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be + // automatically deleted. When the JobSet is being deleted, its lifecycle + // guarantees (e.g. finalizers) will be honored. If this field is unset, + // the JobSet won't be automatically deleted. If this field is set to zero, + // the JobSet becomes eligible to be deleted immediately after it finishes. + // +kubebuilder:validation:Minimum=0 + // +optional + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetStatus defines the observed state of JobSet diff --git a/api/jobset/v1alpha2/openapi_generated.go b/api/jobset/v1alpha2/openapi_generated.go index 3c2c11bc..38800b0f 100644 --- a/api/jobset/v1alpha2/openapi_generated.go +++ b/api/jobset/v1alpha2/openapi_generated.go @@ -214,6 +214,13 @@ func schema_jobset_api_jobset_v1alpha2_JobSetSpec(ref common.ReferenceCallback) Format: "", }, }, + "ttlSecondsAfterFinished": { + SchemaProps: spec.SchemaProps{ + Description: "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + Type: []string{"integer"}, + Format: "int32", + }, + }, }, }, }, diff --git a/api/jobset/v1alpha2/zz_generated.deepcopy.go b/api/jobset/v1alpha2/zz_generated.deepcopy.go index 112890b3..94eb729d 100644 --- a/api/jobset/v1alpha2/zz_generated.deepcopy.go +++ b/api/jobset/v1alpha2/zz_generated.deepcopy.go @@ -131,6 +131,11 @@ func (in *JobSetSpec) DeepCopyInto(out *JobSetSpec) { *out = new(bool) **out = **in } + if in.TTLSecondsAfterFinished != nil { + in, out := &in.TTLSecondsAfterFinished, &out.TTLSecondsAfterFinished + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSetSpec. diff --git a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go index 67079652..86f7a2e5 100644 --- a/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go +++ b/client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go @@ -17,12 +17,13 @@ package v1alpha2 // JobSetSpecApplyConfiguration represents an declarative configuration of the JobSetSpec type for use // with apply. type JobSetSpecApplyConfiguration struct { - ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` - Network *NetworkApplyConfiguration `json:"network,omitempty"` - SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` - FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` - StartupPolicy *StartupPolicyApplyConfiguration `json:"startupPolicy,omitempty"` - Suspend *bool `json:"suspend,omitempty"` + ReplicatedJobs []ReplicatedJobApplyConfiguration `json:"replicatedJobs,omitempty"` + Network *NetworkApplyConfiguration `json:"network,omitempty"` + SuccessPolicy *SuccessPolicyApplyConfiguration `json:"successPolicy,omitempty"` + FailurePolicy *FailurePolicyApplyConfiguration `json:"failurePolicy,omitempty"` + StartupPolicy *StartupPolicyApplyConfiguration `json:"startupPolicy,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` } // JobSetSpecApplyConfiguration constructs an declarative configuration of the JobSetSpec type for use with @@ -83,3 +84,11 @@ func (b *JobSetSpecApplyConfiguration) WithSuspend(value bool) *JobSetSpecApplyC b.Suspend = &value return b } + +// WithTTLSecondsAfterFinished sets the TTLSecondsAfterFinished field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TTLSecondsAfterFinished field is set to the value of the last call. +func (b *JobSetSpecApplyConfiguration) WithTTLSecondsAfterFinished(value int32) *JobSetSpecApplyConfiguration { + b.TTLSecondsAfterFinished = &value + return b +} diff --git a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml index 33f532f6..be7dae21 100644 --- a/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml +++ b/config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml @@ -8331,6 +8331,18 @@ spec: suspend: description: Suspend suspends all running child Jobs when set to true. type: boolean + ttlSecondsAfterFinished: + description: |- + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished + execution (either Complete or Failed). If this field is set, + TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be + automatically deleted. When the JobSet is being deleted, its lifecycle + guarantees (e.g. finalizers) will be honored. If this field is unset, + the JobSet won't be automatically deleted. If this field is set to zero, + the JobSet becomes eligible to be deleted immediately after it finishes. + format: int32 + minimum: 0 + type: integer type: object status: description: JobSetStatus defines the observed state of JobSet diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index a2481695..b8740be6 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -106,6 +106,11 @@ "suspend": { "description": "Suspend suspends all running child Jobs when set to true.", "type": "boolean" + }, + "ttlSecondsAfterFinished": { + "description": "TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes.", + "type": "integer", + "format": "int32" } } }, diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 3d0a9d41..38e1cc41 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -21,6 +21,9 @@ import ( "fmt" "strconv" "sync" + "time" + + "k8s.io/utils/clock" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -48,6 +51,7 @@ type JobSetReconciler struct { client.Client Scheme *runtime.Scheme Record record.EventRecorder + clock clock.Clock } type childJobs struct { @@ -62,7 +66,7 @@ type childJobs struct { } func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler { - return &JobSetReconciler{Client: client, Scheme: scheme, Record: record} + return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}} } //+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch @@ -107,8 +111,26 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } - // If JobSet is already completed or failed, clean up active child jobs. + // If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set. if jobSetFinished(&js) { + if js.Spec.TTLSecondsAfterFinished != nil { + expired, err := r.checkIfTTLExpired(ctx, &js) + if err != nil { + log.Error(err, "checking if TTL expired") + return ctrl.Result{}, err + } + // if expired is true, that means the TTL has expired, and we should delete the JobSet + // otherwise, we requeue it for the remaining TTL duration. + if expired { + log.V(5).Info("JobSet TTL expired") + if err := r.deleteJobSet(ctx, &js); err != nil { + log.Error(err, "deleting jobset") + return ctrl.Result{}, err + } + } else { + return ctrl.Result{RequeueAfter: requeueJobSetAfter(&js)}, nil + } + } if err := r.deleteJobs(ctx, ownedJobs.active); err != nil { log.Error(err, "deleting jobs") return ctrl.Result{}, err @@ -936,3 +958,91 @@ func findJobFailureTime(job *batchv1.Job) *metav1.Time { } return nil } + +func (r *JobSetReconciler) deleteJobSet(ctx context.Context, js *jobset.JobSet) error { + log := ctrl.LoggerFrom(ctx) + + policy := metav1.DeletePropagationForeground + options := []client.DeleteOption{client.PropagationPolicy(policy)} + log.V(2).Info("Cleaning up JobSet", "jobset", klog.KObj(js)) + + return r.Delete(ctx, js, options...) +} + +// checkIfTTLExpired checks whether a given JobSet's TTL has expired. +func (r *JobSetReconciler) checkIfTTLExpired(ctx context.Context, jobSet *jobset.JobSet) (bool, error) { + // We don't care about the JobSets that are going to be deleted + if jobSet.DeletionTimestamp != nil { + return false, nil + } + + now := r.clock.Now() + remaining, err := timeLeft(ctx, jobSet, &now) + if err != nil { + return false, err + } + + // TTL has expired + ttlExpired := remaining != nil && *remaining <= 0 + return ttlExpired, nil +} + +// timeLeft returns the time left until the JobSet's TTL expires and the time when it will expire. +func timeLeft(ctx context.Context, js *jobset.JobSet, now *time.Time) (*time.Duration, error) { + log := ctrl.LoggerFrom(ctx) + + finishAt, expireAt, err := getJobSetFinishAndExpireTime(js) + if err != nil { + return nil, err + } + // The following 2 checks do sanity checking for nil pointers in case of changes to the above function. + // This logic should never be executed. + if now == nil || finishAt == nil || expireAt == nil { + log.V(2).Info("Warning: Calculated invalid expiration time. JobSet cleanup will be deferred.") + return nil, nil + } + + if finishAt.After(*now) { + log.V(2).Info("Warning: Found JobSet finished in the future. This is likely due to time skew in the cluster. JobSet cleanup will be deferred.") + } + remaining := expireAt.Sub(*now) + log.V(2).Info("Found JobSet finished", "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", now.UTC(), "deadlineTTL", expireAt.UTC()) + return &remaining, nil +} + +func getJobSetFinishAndExpireTime(js *jobset.JobSet) (finishAt, expireAt *time.Time, err error) { + finishTime, err := jobSetFinishTime(js) + if err != nil { + return nil, nil, err + } + + finishAt = &finishTime.Time + expiration := finishAt.Add(time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second) + expireAt = ptr.To(expiration) + return finishAt, expireAt, nil +} + +// jobSetFinishTime takes an already finished JobSet and returns the time it finishes. +func jobSetFinishTime(finishedJobSet *jobset.JobSet) (metav1.Time, error) { + for _, c := range finishedJobSet.Status.Conditions { + if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue { + finishAt := c.LastTransitionTime + if finishAt.IsZero() { + return metav1.Time{}, fmt.Errorf("unable to find the time when the JobSet %s/%s finished", finishedJobSet.Namespace, finishedJobSet.Name) + } + return finishAt, nil + } + } + + // This should never happen if the JobSets have finished + return metav1.Time{}, fmt.Errorf("unable to find the status of the finished JobSet %s/%s", finishedJobSet.Namespace, finishedJobSet.Name) +} + +// requeueJobSetAfter returns the duration after which the JobSet should be requeued if TTLSecondsAfterFinished is set, otherwise returns 0. +func requeueJobSetAfter(js *jobset.JobSet) time.Duration { + var requeueAfter time.Duration = 0 + if js.Spec.TTLSecondsAfterFinished != nil { + requeueAfter = time.Duration(*js.Spec.TTLSecondsAfterFinished) * time.Second + } + return requeueAfter +} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index 0a237f79..4afd677e 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -16,6 +16,7 @@ package controllers import ( "context" "strconv" + "strings" "testing" "time" @@ -25,6 +26,8 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/fake" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -1174,6 +1177,92 @@ func jobWithFailedCondition(name string, failureTime time.Time) *batchv1.Job { } } +func TestTimeLeft(t *testing.T) { + now := metav1.Now() + + tests := []struct { + name string + completionTime metav1.Time + failedTime metav1.Time + ttl *int32 + since *time.Time + expectErr bool + expectErrStr string + expectedTimeLeft *time.Duration + }{ + { + name: "jobset completed now, nil since", + completionTime: now, + ttl: ptr.To[int32](0), + since: nil, + }, + { + name: "jobset completed now, 0s TTL", + completionTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + }, + { + name: "jobset completed now, 10s TTL", + completionTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + }, + { + name: "jobset completed 10s ago, 15s TTL", + completionTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + }, + { + name: "jobset failed now, 0s TTL", + failedTime: now, + ttl: ptr.To[int32](0), + since: &now.Time, + expectedTimeLeft: ptr.To(0 * time.Second), + }, + { + name: "jobset failed now, 10s TTL", + failedTime: now, + ttl: ptr.To[int32](10), + since: &now.Time, + expectedTimeLeft: ptr.To(10 * time.Second), + }, + { + name: "jobset failed 10s ago, 15s TTL", + failedTime: metav1.NewTime(now.Add(-10 * time.Second)), + ttl: ptr.To[int32](15), + since: &now.Time, + expectedTimeLeft: ptr.To(5 * time.Second), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + jobSet := newJobSet(tc.completionTime, tc.failedTime, tc.ttl) + _, ctx := ktesting.NewTestContext(t) + gotTimeLeft, gotErr := timeLeft(ctx, jobSet, tc.since) + if tc.expectErr != (gotErr != nil) { + t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr) + } + if tc.expectErr && len(tc.expectErrStr) == 0 { + t.Errorf("%s: invalid test setup; error message must not be empty for error cases", tc.name) + } + if tc.expectErr && !strings.Contains(gotErr.Error(), tc.expectErrStr) { + t.Errorf("%s: expected error message contains %q, got %v", tc.name, tc.expectErrStr, gotErr) + } + if !tc.expectErr { + if gotTimeLeft != nil && *gotTimeLeft != *tc.expectedTimeLeft { + t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft) + } + } + }) + } +} + type makeJobArgs struct { jobSetName string replicatedJobName string @@ -1219,3 +1308,55 @@ func makeJob(args *makeJobArgs) *testutils.JobWrapper { PodAnnotations(annotations) return jobWrapper } + +func newJobSet(completionTime, failedTime metav1.Time, ttl *int32) *jobset.JobSet { + js := &jobset.JobSet{ + TypeMeta: metav1.TypeMeta{Kind: "JobSet"}, + ObjectMeta: metav1.ObjectMeta{ + Name: "foobar", + Namespace: metav1.NamespaceDefault, + }, + Spec: jobset.JobSetSpec{ + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "foobar-job", + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Image: "foo/bar"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if !completionTime.IsZero() { + c := metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue, LastTransitionTime: completionTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if !failedTime.IsZero() { + c := metav1.Condition{Type: string(jobset.JobSetFailed), Status: metav1.ConditionTrue, LastTransitionTime: failedTime} + js.Status.Conditions = append(js.Status.Conditions, c) + } + + if ttl != nil { + js.Spec.TTLSecondsAfterFinished = ttl + } + + return js +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 0946ef90..9b2fd450 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -121,6 +121,12 @@ func (j *JobSetWrapper) EnableDNSHostnames(val bool) *JobSetWrapper { return j } +// TTLSecondsAfterFinished sets the value of JobSet.Spec.TTLSecondsAfterFinished +func (j *JobSetWrapper) TTLSecondsAfterFinished(seconds int32) *JobSetWrapper { + j.Spec.TTLSecondsAfterFinished = &seconds + return j +} + // ReplicatedJobWrapper wraps a ReplicatedJob. type ReplicatedJobWrapper struct { jobset.ReplicatedJob diff --git a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md index 01102c61..5337b0c1 100644 --- a/sdk/python/docs/JobsetV1alpha2JobSetSpec.md +++ b/sdk/python/docs/JobsetV1alpha2JobSetSpec.md @@ -10,6 +10,7 @@ Name | Type | Description | Notes **startup_policy** | [**JobsetV1alpha2StartupPolicy**](JobsetV1alpha2StartupPolicy.md) | | [optional] **success_policy** | [**JobsetV1alpha2SuccessPolicy**](JobsetV1alpha2SuccessPolicy.md) | | [optional] **suspend** | **bool** | Suspend suspends all running child Jobs when set to true. | [optional] +**ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py index 7f4ca82e..b1fed064 100644 --- a/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/jobset/models/jobset_v1alpha2_job_set_spec.py @@ -38,7 +38,8 @@ class JobsetV1alpha2JobSetSpec(object): 'replicated_jobs': 'list[JobsetV1alpha2ReplicatedJob]', 'startup_policy': 'JobsetV1alpha2StartupPolicy', 'success_policy': 'JobsetV1alpha2SuccessPolicy', - 'suspend': 'bool' + 'suspend': 'bool', + 'ttl_seconds_after_finished': 'int' } attribute_map = { @@ -47,10 +48,11 @@ class JobsetV1alpha2JobSetSpec(object): 'replicated_jobs': 'replicatedJobs', 'startup_policy': 'startupPolicy', 'success_policy': 'successPolicy', - 'suspend': 'suspend' + 'suspend': 'suspend', + 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, failure_policy=None, network=None, replicated_jobs=None, startup_policy=None, success_policy=None, suspend=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, failure_policy=None, network=None, replicated_jobs=None, startup_policy=None, success_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """JobsetV1alpha2JobSetSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -62,6 +64,7 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, star self._startup_policy = None self._success_policy = None self._suspend = None + self._ttl_seconds_after_finished = None self.discriminator = None if failure_policy is not None: @@ -76,6 +79,8 @@ def __init__(self, failure_policy=None, network=None, replicated_jobs=None, star self.success_policy = success_policy if suspend is not None: self.suspend = suspend + if ttl_seconds_after_finished is not None: + self.ttl_seconds_after_finished = ttl_seconds_after_finished @property def failure_policy(self): @@ -207,6 +212,29 @@ def suspend(self, suspend): self._suspend = suspend + @property + def ttl_seconds_after_finished(self): + """Gets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :return: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :rtype: int + """ + return self._ttl_seconds_after_finished + + @ttl_seconds_after_finished.setter + def ttl_seconds_after_finished(self, ttl_seconds_after_finished): + """Sets the ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. + + TTLSecondsAfterFinished limits the lifetime of a JobSet that has finished execution (either Complete or Failed). If this field is set, TTLSecondsAfterFinished after the JobSet finishes, it is eligible to be automatically deleted. When the JobSet is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the JobSet won't be automatically deleted. If this field is set to zero, the JobSet becomes eligible to be deleted immediately after it finishes. # noqa: E501 + + :param ttl_seconds_after_finished: The ttl_seconds_after_finished of this JobsetV1alpha2JobSetSpec. # noqa: E501 + :type: int + """ + + self._ttl_seconds_after_finished = ttl_seconds_after_finished + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set.py b/sdk/python/test/test_jobset_v1alpha2_job_set.py index 0719e3a6..bf0cf585 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set.py @@ -60,7 +60,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py index 5d468158..740c27ad 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_list.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_list.py @@ -63,7 +63,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None @@ -108,7 +109,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True, ), + suspend = True, + ttl_seconds_after_finished = 56, ), status = jobset.models.jobset_v1alpha2_job_set_status.JobsetV1alpha2JobSetStatus( conditions = [ None diff --git a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py index 7fc98878..71be9b5b 100644 --- a/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py +++ b/sdk/python/test/test_jobset_v1alpha2_job_set_spec.py @@ -56,7 +56,8 @@ def make_instance(self, include_optional): target_replicated_jobs = [ '0' ], ), - suspend = True + suspend = True, + ttl_seconds_after_finished = 56 ) else : return JobsetV1alpha2JobSetSpec( diff --git a/test/integration/controller/jobset_controller_test.go b/test/integration/controller/jobset_controller_test.go index 0849a22c..bbd443b4 100644 --- a/test/integration/controller/jobset_controller_test.go +++ b/test/integration/controller/jobset_controller_test.go @@ -1224,6 +1224,82 @@ var _ = ginkgo.Describe("JobSet controller", func() { }, timeout, interval).Should(gomega.Succeed()) }) }) + + ginkgo.When("A JobSet is created with TTLSecondsAfterFinished configured and reaches terminal state", func() { + ginkgo.It("JobSet controller should delete it after configured ttl duration passes", func() { + // Create test namespace for each entry. + ns1 := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "jobset-ns-", + }, + } + ns2 := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "jobset-ns-", + }, + } + + gomega.Expect(k8sClient.Create(ctx, ns1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, ns2)).To(gomega.Succeed()) + + defer func() { + gomega.Expect(testutil.DeleteNamespace(ctx, k8sClient, ns1)).To(gomega.Succeed()) + gomega.Expect(testutil.DeleteNamespace(ctx, k8sClient, ns2)).To(gomega.Succeed()) + }() + // Create JobSet. + js1 := testJobSet(ns1).TTLSecondsAfterFinished(2).Obj() + js2 := testJobSet(ns2).TTLSecondsAfterFinished(2).Obj() + + // Verify jobsets created successfully. + ginkgo.By(fmt.Sprintf("creating jobSet %s/%s", js1.Name, js1.Namespace)) + gomega.Eventually(k8sClient.Create(ctx, js1), timeout, interval).Should(gomega.Succeed()) + ginkgo.By(fmt.Sprintf("creating jobSet %s/%s", js2.Name, js2.Namespace)) + gomega.Eventually(k8sClient.Create(ctx, js2), timeout, interval).Should(gomega.Succeed()) + + ginkgo.By("checking all jobs were created successfully") + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js1).Should(gomega.Equal(testutil.NumExpectedJobs(js1))) + gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js2).Should(gomega.Equal(testutil.NumExpectedJobs(js2))) + + // Fetch updated job objects, so we always have the latest resource versions to perform mutations on. + var jobList batchv1.JobList + gomega.Expect(k8sClient.List(ctx, &jobList, client.InNamespace(js1.Namespace))).Should(gomega.Succeed()) + gomega.Expect(len(jobList.Items)).To(gomega.Equal(testutil.NumExpectedJobs(js1))) + completeAllJobs(&jobList) + gomega.Expect(k8sClient.List(ctx, &jobList, client.InNamespace(js2.Namespace))).Should(gomega.Succeed()) + gomega.Expect(len(jobList.Items)).To(gomega.Equal(testutil.NumExpectedJobs(js2))) + failJob(&jobList.Items[0]) + + // Verify jobset is marked as completed. + testutil.JobSetCompleted(ctx, k8sClient, js1, timeout) + testutil.JobSetFailed(ctx, k8sClient, js2, timeout) + + // Verify active jobs have not been deleted if ttl has not passed. + checkJobsDeletionTimestamp(js2, false, testutil.NumExpectedJobs(js2)) + + // Verify jobset has not been deleted if ttl has not passed. + ginkgo.By("checking that jobset has not been deleted before configured seconds pass") + var fresh1, fresh2 jobset.JobSet + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(js1), &fresh1)).To(gomega.Succeed()) + gomega.Expect(fresh1.DeletionTimestamp).To(gomega.BeNil()) + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(js2), &fresh2)).To(gomega.Succeed()) + gomega.Expect(fresh2.DeletionTimestamp).To(gomega.BeNil()) + + // Verify active jobs have been deleted after ttl has passed. + checkJobsDeletionTimestamp(js2, true, testutil.NumExpectedJobs(js2)-1) + + // Verify jobset has been deleted after ttl has passed. + ginkgo.By("checking that ttl after finished controller deletes jobset after configured seconds pass") + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(js1), &fresh1); err != nil { + return false + } + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(js2), &fresh2); err != nil { + return false + } + return !fresh1.DeletionTimestamp.IsZero() && !fresh2.DeletionTimestamp.IsZero() + }, timeout, interval).Should(gomega.BeTrue()) + }) + }) }) // end of Describe func makeAllJobsReady(jl *batchv1.JobList) { @@ -1498,6 +1574,26 @@ func checkNoActiveJobs(js *jobset.JobSet, numFinishedJobs int) { }, timeout, interval).Should(gomega.Equal(true)) } +// Check that the jobs' deletion timestamp is set or not set for the provided number of jobs. +func checkJobsDeletionTimestamp(js *jobset.JobSet, set bool, numJobs int) { + ginkgo.By(fmt.Sprintf("checking that jobset jobs deletion timestamp status is %t", set)) + gomega.Eventually(func() (bool, error) { + var jobList batchv1.JobList + if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil { + return false, err + } + numJobs := numJobs + for _, job := range jobList.Items { + deletionTimestampExpected := set && job.DeletionTimestamp != nil + deletionTimestampNotExpected := !set && job.DeletionTimestamp == nil + if deletionTimestampExpected || deletionTimestampNotExpected { + numJobs-- + } + } + return numJobs == 0, nil + }, timeout, interval).Should(gomega.Equal(true)) +} + func jobActive(job *batchv1.Job) bool { // Jobs marked for deletion using foreground cascading deletion will have deletion timestamp set, // but will still exist until dependent objects with ownerReference.blockOwnerDeletion=true set are deleted.