Skip to content

Commit

Permalink
Merge pull request #627 from danielvegamyhre/coord-val
Browse files Browse the repository at this point in the history
Validation for Coordinator field
  • Loading branch information
k8s-ci-robot authored Jul 26, 2024
2 parents a1bbff3 + 9e483b2 commit a9165e1
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
44 changes: 44 additions & 0 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object)
failurePolicyErrors := validateFailurePolicy(js.Spec.FailurePolicy, validReplicatedJobs)
allErrs = append(allErrs, failurePolicyErrors...)
}

// Validate coordinator, if set.
if js.Spec.Coordinator != nil {
allErrs = append(allErrs, validateCoordinator(js))
}
return nil, errors.Join(allErrs...)
}

Expand Down Expand Up @@ -346,3 +351,42 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string {
}
return names
}

// validateCoordinator validates the following:
// 1. coordinator replicatedJob is a valid replicatedJob in the JobSet spec.
// 2. coordinator jobIndex is a valid index for the replicatedJob.
// 3. coordinator podIndex is a valid pod index for the job.
func validateCoordinator(js *jobset.JobSet) error {
// Validate replicatedJob.
replicatedJob := replicatedJobByName(js, js.Spec.Coordinator.ReplicatedJob)
if replicatedJob == nil {
return fmt.Errorf("coordinator replicatedJob %s does not exist", js.Spec.Coordinator.ReplicatedJob)
}

// Validate Job index.
if js.Spec.Coordinator.JobIndex < 0 || js.Spec.Coordinator.JobIndex >= int(replicatedJob.Replicas) {
return fmt.Errorf("coordinator job index %d is invalid for replicatedJob %s", js.Spec.Coordinator.JobIndex, replicatedJob.Name)
}

// Validate job is using indexed completion mode.
if replicatedJob.Template.Spec.CompletionMode == nil || *replicatedJob.Template.Spec.CompletionMode != batchv1.IndexedCompletion {
return fmt.Errorf("job for coordinator pod must be indexed completion mode")
}

// Validate Pod index.
if js.Spec.Coordinator.PodIndex < 0 || js.Spec.Coordinator.PodIndex >= int(*replicatedJob.Template.Spec.Completions) {
return fmt.Errorf("coordinator pod index %d is invalid for replicatedJob %s job index %d", js.Spec.Coordinator.PodIndex, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex)
}
return nil
}

// replicatedJobByName fetches the replicatedJob spec from the JobSet by name.
// Returns nil if no replicatedJob with the given name exists.
func replicatedJobByName(js *jobset.JobSet, replicatedJob string) *jobset.ReplicatedJob {
for _, rjob := range js.Spec.ReplicatedJobs {
if rjob.Name == js.Spec.Coordinator.ReplicatedJob {
return &rjob
}
}
return nil
}
123 changes: 123 additions & 0 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,129 @@ func TestValidateCreate(t *testing.T) {
fmt.Errorf("invalid failure policy rule name '%v', a failure policy rule name must start with an alphabetic character, optionally followed by a string of alphanumeric characters or '_,:', and must end with an alphanumeric character or '_'", "ruleToRuleThemAll,"),
),
},
{
name: "coordinator replicated job does not exist",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Coordinator: &jobset.Coordinator{
ReplicatedJob: "fake-rjob",
JobIndex: 0,
PodIndex: 0,
},
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "replicatedjob-a",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
{
Name: "replicatedjob-b",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
},
SuccessPolicy: &jobset.SuccessPolicy{},
},
},
want: errors.Join(
fmt.Errorf("coordinator replicatedJob fake-rjob does not exist"),
),
},
{
name: "coordinator job index invalid",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Coordinator: &jobset.Coordinator{
ReplicatedJob: "replicatedjob-a",
JobIndex: 2,
PodIndex: 0,
},
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "replicatedjob-a",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
{
Name: "replicatedjob-b",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
},
SuccessPolicy: &jobset.SuccessPolicy{},
},
},
want: errors.Join(
fmt.Errorf("coordinator job index 2 is invalid for replicatedJob replicatedjob-a"),
),
},
{
name: "coordinator pod index invalid",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Coordinator: &jobset.Coordinator{
ReplicatedJob: "replicatedjob-a",
JobIndex: 0,
PodIndex: 2,
},
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "replicatedjob-a",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
{
Name: "replicatedjob-b",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
CompletionMode: ptr.To(batchv1.IndexedCompletion),
Completions: ptr.To(int32(2)),
Parallelism: ptr.To(int32(2)),
},
},
},
},
SuccessPolicy: &jobset.SuccessPolicy{},
},
},
want: errors.Join(
fmt.Errorf("coordinator pod index 2 is invalid for replicatedJob replicatedjob-a job index 0"),
),
},
}

testGroups := [][]validationTestCase{
Expand Down

0 comments on commit a9165e1

Please sign in to comment.