Skip to content

Commit

Permalink
Add restart strategy (#686)
Browse files Browse the repository at this point in the history
* Add restart strategy to API

* Remove webhook, replace with openapi defaulting

* Update creation logic to account for BlockingRecreate

* Update integration test for existing Recreate strategy

* Fix integration tests

* Document restartStrategy by reusing existing max-restarts example

* Address comment

* Add enum validation

* make generate

* Fix issues with make generate

* Fix example
  • Loading branch information
nstogner authored Oct 23, 2024
1 parent 440f53e commit f76f2a7
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 16 deletions.
18 changes: 18 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,31 @@ type FailurePolicy struct {
// A restart is achieved by recreating all active child jobs.
MaxRestarts int32 `json:"maxRestarts,omitempty"`

// RestartStrategy defines the strategy to use when restarting the JobSet.
// Defaults to Recreate.
// +optional
// +kubebuilder:default=Recreate
RestartStrategy JobSetRestartStrategy `json:"restartStrategy,omitempty"`

// List of failure policy rules for this JobSet.
// For a given Job failure, the rules will be evaluated in order,
// and only the first matching rule will be executed.
// If no matching rule is found, the RestartJobSet action is applied.
Rules []FailurePolicyRule `json:"rules,omitempty"`
}

// +kubebuilder:validation:Enum=Recreate;BlockingRecreate
type JobSetRestartStrategy string

const (
// Recreate Jobs on a Job-by-Job basis.
Recreate JobSetRestartStrategy = "Recreate"

// BlockingRecreate ensures that all Jobs (and Pods) from a previous iteration are deleted before
// creating new Jobs.
BlockingRecreate JobSetRestartStrategy = "BlockingRecreate"
)

type SuccessPolicy struct {
// Operator determines either All or Any of the selected jobs should succeed to consider the JobSet successful
// +kubebuilder:validation:Enum=All;Any
Expand Down
7 changes: 7 additions & 0 deletions api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions client-go/applyconfiguration/jobset/v1alpha2/failurepolicy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ spec:
A restart is achieved by recreating all active child jobs.
format: int32
type: integer
restartStrategy:
default: Recreate
description: |-
RestartStrategy defines the strategy to use when restarting the JobSet.
Defaults to Recreate.
enum:
- Recreate
- BlockingRecreate
type: string
rules:
description: |-
List of failure policy rules for this JobSet.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module sigs.k8s.io/jobset

go 1.23
go 1.23.0

require (
github.com/google/go-cmp v0.6.0
Expand Down
4 changes: 4 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"type": "integer",
"format": "int32"
},
"restartStrategy": {
"description": "RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate.",
"type": "string"
},
"rules": {
"description": "List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied.",
"type": "array",
Expand Down
22 changes: 15 additions & 7 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type childJobs struct {
successful []*batchv1.Job
failed []*batchv1.Job

// Jobs marked for deletion are mutually exclusive with the set of jobs in active, successful, and failed.
delete []*batchv1.Job
// Jobs from a previous restart (marked for deletion) are mutually exclusive
// with the set of jobs in active, successful, and failed.
previous []*batchv1.Job
}

// statusUpdateOpts tracks if a JobSet status update should be performed at the end of the reconciliation
Expand Down Expand Up @@ -169,8 +170,8 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
return ctrl.Result{}, nil
}

// Delete any jobs marked for deletion.
if err := r.deleteJobs(ctx, ownedJobs.delete); err != nil {
// Delete all jobs from a previous restart that are marked for deletion.
if err := r.deleteJobs(ctx, ownedJobs.previous); err != nil {
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -281,11 +282,11 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet)
jobRestarts, err := strconv.Atoi(job.Labels[constants.RestartsKey])
if err != nil {
log.Error(err, fmt.Sprintf("invalid value for label %s, must be integer", constants.RestartsKey))
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
ownedJobs.previous = append(ownedJobs.previous, &childJobList.Items[i])
return nil, err
}
if int32(jobRestarts) < js.Status.Restarts {
ownedJobs.delete = append(ownedJobs.delete, &childJobList.Items[i])
ownedJobs.previous = append(ownedJobs.previous, &childJobList.Items[i])
continue
}

Expand Down Expand Up @@ -637,6 +638,13 @@ func executeSuccessPolicy(js *jobset.JobSet, ownedJobs *childJobs, updateStatusO

func constructJobsFromTemplate(js *jobset.JobSet, rjob *jobset.ReplicatedJob, ownedJobs *childJobs) []*batchv1.Job {
var jobs []*batchv1.Job
// If the JobSet is using the BlockingRecreate failure policy, we should not create any new jobs until
// all the jobs slated for deletion (i.e. from the last restart index) have been deleted.
useBlockingRecreate := js.Spec.FailurePolicy != nil && js.Spec.FailurePolicy.RestartStrategy == jobset.BlockingRecreate
if len(ownedJobs.previous) > 0 && useBlockingRecreate {
return jobs
}

for jobIdx := 0; jobIdx < int(rjob.Replicas); jobIdx++ {
jobName := placement.GenJobName(js.Name, rjob.Name, jobIdx)
if create := shouldCreateJob(jobName, ownedJobs); !create {
Expand Down Expand Up @@ -700,7 +708,7 @@ func shouldCreateJob(jobName string, ownedJobs *childJobs) bool {
// TODO: maybe we can use a job map here so we can do O(1) lookups
// to check if the job already exists, rather than a linear scan
// through all the jobs owned by the jobset.
for _, job := range collections.Concat(ownedJobs.active, ownedJobs.successful, ownedJobs.failed, ownedJobs.delete) {
for _, job := range collections.Concat(ownedJobs.active, ownedJobs.successful, ownedJobs.failed, ownedJobs.previous) {
if jobName == job.Name {
return false
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestConstructJobsFromTemplate(t *testing.T) {
Replicas(2).
Obj()).Obj(),
ownedJobs: &childJobs{
delete: []*batchv1.Job{
previous: []*batchv1.Job{
testutils.MakeJob("test-jobset-replicated-job-0", ns).Obj(),
},
},
Expand All @@ -275,6 +275,23 @@ func TestConstructJobsFromTemplate(t *testing.T) {
Suspend(false).Obj(),
},
},
{
name: "job creation blocked until all previous jobs no longer exist",
js: testutils.MakeJobSet(jobSetName, ns).
FailurePolicy(&jobset.FailurePolicy{
RestartStrategy: jobset.BlockingRecreate,
}).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Replicas(2).
Obj()).Obj(),
ownedJobs: &childJobs{
previous: []*batchv1.Job{
testutils.MakeJob("test-jobset-replicated-job-0", ns).Obj(),
},
},
want: nil,
},
{
name: "multiple replicated jobs",
js: testutils.MakeJobSet(jobSetName, ns).
Expand Down
1 change: 1 addition & 0 deletions sdk/python/docs/JobsetV1alpha2FailurePolicy.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**max_restarts** | **int** | MaxRestarts defines the limit on the number of JobSet restarts. A restart is achieved by recreating all active child jobs. | [optional]
**restart_strategy** | **str** | RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. | [optional]
**rules** | [**list[JobsetV1alpha2FailurePolicyRule]**](JobsetV1alpha2FailurePolicyRule.md) | List of failure policy rules for this JobSet. For a given Job failure, the rules will be evaluated in order, and only the first matching rule will be executed. If no matching rule is found, the RestartJobSet action is applied. | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
Expand Down
30 changes: 29 additions & 1 deletion sdk/python/jobset/models/jobset_v1alpha2_failure_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,31 @@ class JobsetV1alpha2FailurePolicy(object):
"""
openapi_types = {
'max_restarts': 'int',
'restart_strategy': 'str',
'rules': 'list[JobsetV1alpha2FailurePolicyRule]'
}

attribute_map = {
'max_restarts': 'maxRestarts',
'restart_strategy': 'restartStrategy',
'rules': 'rules'
}

def __init__(self, max_restarts=None, rules=None, local_vars_configuration=None): # noqa: E501
def __init__(self, max_restarts=None, restart_strategy=None, rules=None, local_vars_configuration=None): # noqa: E501
"""JobsetV1alpha2FailurePolicy - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration

self._max_restarts = None
self._restart_strategy = None
self._rules = None
self.discriminator = None

if max_restarts is not None:
self.max_restarts = max_restarts
if restart_strategy is not None:
self.restart_strategy = restart_strategy
if rules is not None:
self.rules = rules

Expand All @@ -80,6 +85,29 @@ def max_restarts(self, max_restarts):

self._max_restarts = max_restarts

@property
def restart_strategy(self):
"""Gets the restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501
RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. # noqa: E501
:return: The restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501
:rtype: str
"""
return self._restart_strategy

@restart_strategy.setter
def restart_strategy(self, restart_strategy):
"""Sets the restart_strategy of this JobsetV1alpha2FailurePolicy.
RestartStrategy defines the strategy to use when restarting the JobSet. Defaults to Recreate. # noqa: E501
:param restart_strategy: The restart_strategy of this JobsetV1alpha2FailurePolicy. # noqa: E501
:type: str
"""

self._restart_strategy = restart_strategy

@property
def rules(self):
"""Gets the rules of this JobsetV1alpha2FailurePolicy. # noqa: E501
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_failure_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def make_instance(self, include_optional):
if include_optional :
return JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down Expand Up @@ -113,6 +114,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
1 change: 1 addition & 0 deletions sdk/python/test/test_jobset_v1alpha2_job_set_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def make_instance(self, include_optional):
replicated_job = '0', ),
failure_policy = jobset.models.jobset_v1alpha2_failure_policy.JobsetV1alpha2FailurePolicy(
max_restarts = 56,
restart_strategy = '0',
rules = [
jobset.models.jobset_v1alpha2_failure_policy_rule.JobsetV1alpha2FailurePolicyRule(
action = '0',
Expand Down
2 changes: 1 addition & 1 deletion site/content/en/docs/tasks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Here we have some simple examples demonstrating core JobSet features.
Success Policy allows one to specify when to mark a JobSet as success.
This example showcases an example of using the success policy to mark the JobSet as successful if the worker replicated job completes.

- [Failure Policy with Max Restarts](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/max-restarts.yaml) demonstrates an example of utilizing `failurePolicy`. Failure Policy allows one to control how many restarts a JobSet can do before declaring the JobSet as failed.
- [Failure Policy](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/failure-policy.yaml) demonstrates an example of utilizing `failurePolicy`. Failure Policy allows one to control how many restarts a JobSet can do before declaring the JobSet as failed. The strategy used when restarting can also be specified (i.e. whether to first delete all Jobs, or recreate on a one-by-one basis).

- [Exclusive Job Placement](https://github.com/kubernetes-sigs/jobset/blob/release-0.5/examples/simple/exclusive-placement.yaml) demonstrates how you can configure a JobSet to have a 1:1 mapping between each child Job and a particular topology domain, such as a datacenter rack or zone. This means that all the pods belonging to a child job will be colocated in the same topology domain, while pods from other jobs will not be allowed to run within this domain. This gives the child job exclusive access to computer resources in this domain.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
name: max-restarts
name: failure-policy
spec:
# On failure, restart all jobs up to 3 times.
failurePolicy:
# Wait for all Jobs to be fully deleted before recreating any.
# Defaults to "Recreate" which restarts Jobs individually.
restartStrategy: BlockingRecreate
# On failure, restart all jobs up to 3 times.
maxRestarts: 3
replicatedJobs:
- name: leader
Expand Down
Loading

0 comments on commit f76f2a7

Please sign in to comment.