Skip to content

Commit

Permalink
feat: add terminalState to jobset status (#594)
Browse files Browse the repository at this point in the history
* feat: add jobset status

feat: add jobset status

* add updateConditionAndPhase comment

* remove unnecessary ctx

* feat: add jobset TerminalState field

Signed-off-by: googs1025 <googs1025@gmail.com>

* set status terminalstate field in setJobSetCompletedCondition and setJobSetFailedCondition

* verify the terminalState during integration tests

---------

Signed-off-by: googs1025 <googs1025@gmail.com>
  • Loading branch information
googs1025 authored Jun 30, 2024
1 parent 7866d64 commit c450428
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 18 deletions.
5 changes: 5 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type JobSetStatus struct {
// RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.
RestartsCountTowardsMax int32 `json:"restartsCountTowardsMax,omitempty"`

// TerminalState the state of the JobSet when it finishes execution.
// It can be either Complete or Failed. Otherwise, it is empty by default.
TerminalState string `json:"terminalState,omitempty"`

// ReplicatedJobsStatus track the number of JobsReady for each replicatedJob.
// +optional
// +listType=map
Expand Down Expand Up @@ -169,6 +173,7 @@ type ReplicatedJobStatus struct {
// +k8s:openapi-gen=true
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="TerminalState",JSONPath=".status.terminalState",type=string,description="Final state of JobSet"
// +kubebuilder:printcolumn:name="Restarts",JSONPath=".status.restarts",type=string,description="Number of restarts"
// +kubebuilder:printcolumn:name="Completed",type="string",priority=0,JSONPath=".status.conditions[?(@.type==\"Completed\")].status"
// +kubebuilder:printcolumn:name="Suspended",type="string",JSONPath=".spec.suspend",description="JobSet suspended"
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetstatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- description: Final state of JobSet
jsonPath: .status.terminalState
name: TerminalState
type: string
- description: Number of restarts
jsonPath: .status.restarts
name: Restarts
Expand Down Expand Up @@ -8546,6 +8550,11 @@ spec:
of restarts.
format: int32
type: integer
terminalState:
description: |-
TerminalState the state of the JobSet when it finishes execution.
It can be either Complete or Failed. Otherwise, it is empty by default.
type: string
type: object
type: object
served: true
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@
"description": "RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts.",
"type": "integer",
"format": "int32"
},
"terminalState": {
"description": "TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default.",
"type": "string"
}
}
},
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/failure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func executeFailurePolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *chi
// possible code paths here.
firstFailedJob := findFirstFailedJob(ownedJobs.failed)
msg := messageWithFirstFailedJob(constants.FailedJobsMessage, firstFailedJob.Name)
setJobSetFailedCondition(ctx, js, constants.FailedJobsReason, msg, updateStatusOpts)
setJobSetFailedCondition(js, constants.FailedJobsReason, msg, updateStatusOpts)
return nil
}

Expand Down Expand Up @@ -183,7 +183,7 @@ var failJobSetActionApplier failurePolicyActionApplier = func(ctx context.Contex
failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name)

failureReason := constants.FailJobSetActionReason
setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts)
setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts)
return nil
}

Expand All @@ -194,7 +194,7 @@ var restartJobSetActionApplier failurePolicyActionApplier = func(ctx context.Con
failureMessage := messageWithFirstFailedJob(failureBaseMessage, matchingFailedJob.Name)

failureReason := constants.ReachedMaxRestartsReason
setJobSetFailedCondition(ctx, js, failureReason, failureMessage, updateStatusOpts)
setJobSetFailedCondition(js, failureReason, failureMessage, updateStatusOpts)
return nil
}

Expand Down Expand Up @@ -254,9 +254,10 @@ func makeFailedConditionOpts(reason, msg string) *conditionOpts {
}
}

// setJobSetFailedCondition sets a condition on the JobSet status indicating it has failed.
func setJobSetFailedCondition(ctx context.Context, js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
// setJobSetFailedCondition sets a condition and terminal state on the JobSet status indicating it has failed.
func setJobSetFailedCondition(js *jobset.JobSet, reason, msg string, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeFailedConditionOpts(reason, msg), updateStatusOpts)
js.Status.TerminalState = string(jobset.JobSetFailed)
}

// findJobFailureTimeAndReason is a helper function which extracts the Job failure condition from a Job,
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// Calculate JobsReady and update statuses for each ReplicatedJob.
rjobStatuses := r.calculateReplicatedJobStatuses(ctx, js, ownedJobs)
updateReplicatedJobsStatuses(ctx, js, rjobStatuses, updateStatusOpts)
updateReplicatedJobsStatuses(js, rjobStatuses, updateStatusOpts)

// If JobSet is already completed or failed, clean up active child jobs and requeue if TTLSecondsAfterFinished is set.
if jobSetFinished(js) {
Expand Down Expand Up @@ -185,7 +185,7 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd

// If any jobs have succeeded, execute the JobSet success policy.
if len(ownedJobs.successful) > 0 {
if completed := executeSuccessPolicy(ctx, js, ownedJobs, updateStatusOpts); completed {
if completed := executeSuccessPolicy(js, ownedJobs, updateStatusOpts); completed {
return ctrl.Result{}, nil
}
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
}

// updateReplicatedJobsStatuses updates the replicatedJob statuses if they have changed.
func updateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
func updateReplicatedJobsStatuses(js *jobset.JobSet, statuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) {
// If replicated job statuses haven't changed, there's nothing to do here.
if replicatedJobStatusesEqual(js.Status.ReplicatedJobsStatus, statuses) {
return
Expand Down Expand Up @@ -630,7 +630,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js
// executeSuccessPolicy checks the completed jobs against the jobset success policy
// and updates the jobset status to completed if the success policy conditions are met.
// Returns a boolean value indicating if the jobset was completed or not.
func executeSuccessPolicy(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusOpts *statusUpdateOpts) bool {
if numJobsMatchingSuccessPolicy(js, ownedJobs.successful) >= numJobsExpectedToSucceed(js) {
setJobSetCompletedCondition(js, updateStatusOpts)
return true
Expand Down Expand Up @@ -944,9 +944,10 @@ func updateCondition(js *jobset.JobSet, opts *conditionOpts) bool {
return shouldUpdate
}

// setJobSetCompletedCondition sets a condition on the JobSet status indicating it has completed.
// setJobSetCompletedCondition sets a condition and terminal state on the JobSet status indicating it has completed.
func setJobSetCompletedCondition(js *jobset.JobSet, updateStatusOpts *statusUpdateOpts) {
setCondition(js, makeCompletedConditionsOpts(), updateStatusOpts)
js.Status.TerminalState = string(jobset.JobSetCompleted)
}

// setJobSetSuspendedCondition sets a condition on the JobSet status indicating it is currently suspended.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func TestUpdateConditions(t *testing.T) {
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(1).
Obj()).
Obj()).TerminalState(jobset.JobSetCompleted).
Conditions([]metav1.Condition{
// JobSet is completed..
{
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (j *JobSetWrapper) FailedCondition(failedAt metav1.Time) *JobSetWrapper {
return j
}

// TerminalState sets the value of JobSet.Status.TerminalState.
func (j *JobSetWrapper) TerminalState(terminalState jobset.JobSetConditionType) *JobSetWrapper {
j.Status.TerminalState = string(terminalState)
return j
}

func (j *JobSetWrapper) DeletionTimestamp(deletionTimestamp *metav1.Time) *JobSetWrapper {
j.ObjectMeta.DeletionTimestamp = deletionTimestamp
return j
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2JobSetStatus.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Name | Type | Description | Notes
**replicated_jobs_status** | [**list[JobsetV1alpha2ReplicatedJobStatus]**](JobsetV1alpha2ReplicatedJobStatus.md) | ReplicatedJobsStatus track the number of JobsReady for each replicatedJob. | [optional]
**restarts** | **int** | Restarts tracks the number of times the JobSet has restarted (i.e. recreated in case of RecreateAll policy). | [optional]
**restarts_count_towards_max** | **int** | RestartsCountTowardsMax tracks the number of times the JobSet has restarted that counts towards the maximum allowed number of restarts. | [optional]
**terminal_state** | **str** | TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
34 changes: 31 additions & 3 deletions sdk/python/jobset/models/jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ class JobsetV1alpha2JobSetStatus(object):
'conditions': 'list[V1Condition]',
'replicated_jobs_status': 'list[JobsetV1alpha2ReplicatedJobStatus]',
'restarts': 'int',
'restarts_count_towards_max': 'int'
'restarts_count_towards_max': 'int',
'terminal_state': 'str'
}

attribute_map = {
'conditions': 'conditions',
'replicated_jobs_status': 'replicatedJobsStatus',
'restarts': 'restarts',
'restarts_count_towards_max': 'restartsCountTowardsMax'
'restarts_count_towards_max': 'restartsCountTowardsMax',
'terminal_state': 'terminalState'
}

def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, local_vars_configuration=None): # noqa: E501
def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None, restarts_count_towards_max=None, terminal_state=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2JobSetStatus - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
Expand All @@ -56,6 +58,7 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None,
self._replicated_jobs_status = None
self._restarts = None
self._restarts_count_towards_max = None
self._terminal_state = None
self.discriminator = None

if conditions is not None:
Expand All @@ -66,6 +69,8 @@ def __init__(self, conditions=None, replicated_jobs_status=None, restarts=None,
self.restarts = restarts
if restarts_count_towards_max is not None:
self.restarts_count_towards_max = restarts_count_towards_max
if terminal_state is not None:
self.terminal_state = terminal_state

@property
def conditions(self):
Expand Down Expand Up @@ -157,6 +162,29 @@ def restarts_count_towards_max(self, restarts_count_towards_max):

self._restarts_count_towards_max = restarts_count_towards_max

@property
def terminal_state(self):
"""Gets the terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501
TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501
:return: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501
:rtype: str
"""
return self._terminal_state

@terminal_state.setter
def terminal_state(self, terminal_state):
"""Sets the terminal_state of this JobsetV1alpha2JobSetStatus.
TerminalState the state of the JobSet when it finishes execution. It can be either Complete or Failed. Otherwise, it is empty by default. # noqa: E501
:param terminal_state: The terminal_state of this JobsetV1alpha2JobSetStatus. # noqa: E501
:type: str
"""

self._terminal_state = terminal_state

def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, )
restarts_count_towards_max = 56,
terminal_state = '0', )
)
else :
return JobsetV1alpha2JobSet(
Expand Down
6 changes: 4 additions & 2 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, ), )
restarts_count_towards_max = 56,
terminal_state = '0', ), )
],
kind = '0',
metadata = None
Expand Down Expand Up @@ -146,7 +147,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56, ), )
restarts_count_towards_max = 56,
terminal_state = '0', ), )
],
)

Expand Down
3 changes: 2 additions & 1 deletion sdk/python/test/test_jobset_v1alpha2_job_set_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def make_instance(self, include_optional):
suspended = 56, )
],
restarts = 56,
restarts_count_towards_max = 56
restarts_count_towards_max = 56,
terminal_state = '0'
)
else :
return JobsetV1alpha2JobSetStatus(
Expand Down
14 changes: 14 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func JobSetCompleted(ctx context.Context, k8sClient client.Client, js *jobset.Jo
Status: metav1.ConditionTrue,
},
}
terminalState := string(jobset.JobSetCompleted)
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true))
}

func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
Expand All @@ -71,7 +73,9 @@ func JobSetFailed(ctx context.Context, k8sClient client.Client, js *jobset.JobSe
Status: metav1.ConditionTrue,
},
}
terminalState := string(jobset.JobSetFailed)
gomega.Eventually(checkJobSetStatus, timeout, interval).WithArguments(ctx, k8sClient, js, conditions).Should(gomega.Equal(true))
gomega.Eventually(checkJobSetTerminalState, timeout, interval).WithArguments(ctx, k8sClient, js, terminalState).Should(gomega.Equal(true))
}

func JobSetSuspended(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, timeout time.Duration) {
Expand Down Expand Up @@ -142,6 +146,7 @@ func checkJobSetActive(ctx context.Context, k8sClient client.Client, js *jobset.
return true, nil
}

// checkJobSetStatus check if the JobSet status matches the expected conditions.
func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, conditions []metav1.Condition) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
Expand All @@ -158,6 +163,15 @@ func checkJobSetStatus(ctx context.Context, k8sClient client.Client, js *jobset.
return found == len(conditions), nil
}

// checkJobSetTerminalState check if the JobSet is in the expected terminal state.
func checkJobSetTerminalState(ctx context.Context, k8sClient client.Client, js *jobset.JobSet, terminalState string) (bool, error) {
var fetchedJS jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: js.Namespace, Name: js.Name}, &fetchedJS); err != nil {
return false, err
}
return fetchedJS.Status.TerminalState == terminalState, nil
}

// DeleteNamespace deletes all objects the tests typically create in the namespace.
func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
if ns == nil {
Expand Down

0 comments on commit c450428

Please sign in to comment.