diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index d7f080b1..f6639590 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -294,6 +294,11 @@ func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) failurePolicyErrors := validateFailurePolicy(js.Spec.FailurePolicy, validReplicatedJobs) allErrs = append(allErrs, failurePolicyErrors...) } + + // Validate coordinator, if set. + if js.Spec.Coordinator != nil { + allErrs = append(allErrs, validateCoordinator(js)) + } return nil, errors.Join(allErrs...) } @@ -346,3 +351,42 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string { } return names } + +// validateCoordinator validates the following: +// 1. coordinator replicatedJob is a valid replicatedJob in the JobSet spec. +// 2. coordinator jobIndex is a valid index for the replicatedJob. +// 3. coordinator podIndex is a valid pod index for the job. +func validateCoordinator(js *jobset.JobSet) error { + // Validate replicatedJob. + replicatedJob := replicatedJobByName(js, js.Spec.Coordinator.ReplicatedJob) + if replicatedJob == nil { + return fmt.Errorf("coordinator replicatedJob %s does not exist", js.Spec.Coordinator.ReplicatedJob) + } + + // Validate Job index. + if js.Spec.Coordinator.JobIndex < 0 || js.Spec.Coordinator.JobIndex >= int(replicatedJob.Replicas) { + return fmt.Errorf("coordinator job index %d is invalid for replicatedJob %s", js.Spec.Coordinator.JobIndex, replicatedJob.Name) + } + + // Validate job is using indexed completion mode. + if replicatedJob.Template.Spec.CompletionMode == nil || *replicatedJob.Template.Spec.CompletionMode != batchv1.IndexedCompletion { + return fmt.Errorf("job for coordinator pod must be indexed completion mode") + } + + // Validate Pod index. + if js.Spec.Coordinator.PodIndex < 0 || js.Spec.Coordinator.PodIndex >= int(*replicatedJob.Template.Spec.Completions) { + return fmt.Errorf("coordinator pod index %d is invalid for replicatedJob %s job index %d", js.Spec.Coordinator.PodIndex, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex) + } + return nil +} + +// replicatedJobByName fetches the replicatedJob spec from the JobSet by name. +// Returns nil if no replicatedJob with the given name exists. +func replicatedJobByName(js *jobset.JobSet, replicatedJob string) *jobset.ReplicatedJob { + for _, rjob := range js.Spec.ReplicatedJobs { + if rjob.Name == js.Spec.Coordinator.ReplicatedJob { + return &rjob + } + } + return nil +} diff --git a/pkg/webhooks/jobset_webhook_test.go b/pkg/webhooks/jobset_webhook_test.go index 8597a16d..be38b4b5 100644 --- a/pkg/webhooks/jobset_webhook_test.go +++ b/pkg/webhooks/jobset_webhook_test.go @@ -1362,6 +1362,129 @@ func TestValidateCreate(t *testing.T) { fmt.Errorf("invalid failure policy rule name '%v', a failure policy rule name must start with an alphabetic character, optionally followed by a string of alphanumeric characters or '_,:', and must end with an alphanumeric character or '_'", "ruleToRuleThemAll,"), ), }, + { + name: "coordinator replicated job does not exist", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Coordinator: &jobset.Coordinator{ + ReplicatedJob: "fake-rjob", + JobIndex: 0, + PodIndex: 0, + }, + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "replicatedjob-a", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + { + Name: "replicatedjob-b", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + }, + SuccessPolicy: &jobset.SuccessPolicy{}, + }, + }, + want: errors.Join( + fmt.Errorf("coordinator replicatedJob fake-rjob does not exist"), + ), + }, + { + name: "coordinator job index invalid", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Coordinator: &jobset.Coordinator{ + ReplicatedJob: "replicatedjob-a", + JobIndex: 2, + PodIndex: 0, + }, + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "replicatedjob-a", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + { + Name: "replicatedjob-b", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + }, + SuccessPolicy: &jobset.SuccessPolicy{}, + }, + }, + want: errors.Join( + fmt.Errorf("coordinator job index 2 is invalid for replicatedJob replicatedjob-a"), + ), + }, + { + name: "coordinator pod index invalid", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Coordinator: &jobset.Coordinator{ + ReplicatedJob: "replicatedjob-a", + JobIndex: 0, + PodIndex: 2, + }, + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "replicatedjob-a", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + { + Name: "replicatedjob-b", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + CompletionMode: ptr.To(batchv1.IndexedCompletion), + Completions: ptr.To(int32(2)), + Parallelism: ptr.To(int32(2)), + }, + }, + }, + }, + SuccessPolicy: &jobset.SuccessPolicy{}, + }, + }, + want: errors.Join( + fmt.Errorf("coordinator pod index 2 is invalid for replicatedJob replicatedjob-a job index 0"), + ), + }, } testGroups := [][]validationTestCase{