Skip to content

Commit

Permalink
relax validation on replicated jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed May 25, 2024
1 parent 0b8c19c commit 6988760
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 100 deletions.
8 changes: 7 additions & 1 deletion pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,13 @@ func (j *JobTemplateWrapper) CompletionMode(mode batchv1.CompletionMode) *JobTem
return j
}

// PodSpec Containers sets the pod template spec containers.
// PodTemplateSpec sets the pod template spec in a Job spec.
func (j *JobTemplateWrapper) PodTemplateSpec(podTemplateSpec corev1.PodTemplateSpec) *JobTemplateWrapper {
j.Spec.Template = podTemplateSpec
return j
}

// PodSpec sets the pod spec in a Job template.
func (j *JobTemplateWrapper) PodSpec(podSpec corev1.PodSpec) *JobTemplateWrapper {
j.Spec.Template.Spec = podSpec
return j
Expand Down
121 changes: 64 additions & 57 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,62 +151,6 @@ func (j *jobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {

//+kubebuilder:webhook:path=/validate-jobset-x-k8s-io-v1alpha2-jobset,mutating=false,failurePolicy=fail,sideEffects=None,groups=jobset.x-k8s.io,resources=jobsets,verbs=create;update,versions=v1alpha2,name=vjobset.kb.io,admissionReviewVersions=v1

const minRuleNameLength = 1
const maxRuleNameLength = 128
const ruleNameFmt = "^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$"

var ruleNameRegexp = regexp.MustCompile(ruleNameFmt)

// validateFailurePolicy performs validation for jobset failure policies and returns all errors detected.
func validateFailurePolicy(failurePolicy *jobset.FailurePolicy, validReplicatedJobs []string) []error {
var allErrs []error
if failurePolicy == nil {
return allErrs
}

// ruleNameToRulesWithName is used to verify that rule names are unique
ruleNameToRulesWithName := make(map[string][]int)
for index, rule := range failurePolicy.Rules {
// Check that the rule name meets the minimum length
nameLen := len(rule.Name)
if !(minRuleNameLength <= nameLen && nameLen <= maxRuleNameLength) {
err := fmt.Errorf("invalid failure policy rule name of length %v, the rule name must be at least %v characters long and at most %v characters long", nameLen, minRuleNameLength, maxRuleNameLength)
allErrs = append(allErrs, err)
}

ruleNameToRulesWithName[rule.Name] = append(ruleNameToRulesWithName[rule.Name], index)

if !ruleNameRegexp.MatchString(rule.Name) {
err := fmt.Errorf("invalid failure policy rule name '%v', a failure policy rule name must start with an alphabetic character, optionally followed by a string of alphanumeric characters or '_,:', and must end with an alphanumeric character or '_'", rule.Name)
allErrs = append(allErrs, err)
}

// Validate the rules target replicated jobs are valid
for _, rjobName := range rule.TargetReplicatedJobs {
if !collections.Contains(validReplicatedJobs, rjobName) {
allErrs = append(allErrs, fmt.Errorf("invalid replicatedJob name '%s' in failure policy does not appear in .spec.ReplicatedJobs", rjobName))
}
}

// Validate the rules on job failure reasons are valid
for _, failureReason := range rule.OnJobFailureReasons {
if !collections.Contains(validOnJobFailureReasons, failureReason) {
allErrs = append(allErrs, fmt.Errorf("invalid job failure reason '%s' in failure policy is not a recognized job failure reason", failureReason))
}
}
}

// Checking that rule names are unique
for ruleName, rulesWithName := range ruleNameToRulesWithName {
if len(rulesWithName) > 1 {
err := fmt.Errorf("rule names are not unique, rules with indices %v all have the same name '%v'", rulesWithName, ruleName)
allErrs = append(allErrs, err)
}
}

return allErrs
}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
js, ok := obj.(*jobset.JobSet)
Expand Down Expand Up @@ -308,11 +252,15 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
return nil, fmt.Errorf("expected a JobSet from old object but got a %T", old)
}
mungedSpec := js.Spec.DeepCopy()

// Allow pod template to be mutated for suspended JobSets.
// This is needed for integration with Kueue/DWS.
if ptr.Deref(oldJS.Spec.Suspend, false) {
for index := range js.Spec.ReplicatedJobs {
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector
mungedSpec.ReplicatedJobs[index].Template.Spec.Template = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template
}
}

// Note that SucccessPolicy and failurePolicy are made immutable via CEL.
errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs"))
errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...)
Expand All @@ -324,6 +272,65 @@ func (j *jobSetWebhook) ValidateDelete(ctx context.Context, obj runtime.Object)
return nil, nil
}

// Constants for validating failure policies.
const (
minRuleNameLength = 1
maxRuleNameLength = 128
ruleNameFmt = "^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$"
)

var ruleNameRegexp = regexp.MustCompile(ruleNameFmt)

// validateFailurePolicy performs validation for jobset failure policies and returns all errors detected.
func validateFailurePolicy(failurePolicy *jobset.FailurePolicy, validReplicatedJobs []string) []error {
var allErrs []error
if failurePolicy == nil {
return allErrs
}

// ruleNameToRulesWithName is used to verify that rule names are unique
ruleNameToRulesWithName := make(map[string][]int)
for index, rule := range failurePolicy.Rules {
// Check that the rule name meets the minimum length
nameLen := len(rule.Name)
if !(minRuleNameLength <= nameLen && nameLen <= maxRuleNameLength) {
err := fmt.Errorf("invalid failure policy rule name of length %v, the rule name must be at least %v characters long and at most %v characters long", nameLen, minRuleNameLength, maxRuleNameLength)
allErrs = append(allErrs, err)
}

ruleNameToRulesWithName[rule.Name] = append(ruleNameToRulesWithName[rule.Name], index)

if !ruleNameRegexp.MatchString(rule.Name) {
err := fmt.Errorf("invalid failure policy rule name '%v', a failure policy rule name must start with an alphabetic character, optionally followed by a string of alphanumeric characters or '_,:', and must end with an alphanumeric character or '_'", rule.Name)
allErrs = append(allErrs, err)
}

// Validate the rules target replicated jobs are valid
for _, rjobName := range rule.TargetReplicatedJobs {
if !collections.Contains(validReplicatedJobs, rjobName) {
allErrs = append(allErrs, fmt.Errorf("invalid replicatedJob name '%s' in failure policy does not appear in .spec.ReplicatedJobs", rjobName))
}
}

// Validate the rules on job failure reasons are valid
for _, failureReason := range rule.OnJobFailureReasons {
if !collections.Contains(validOnJobFailureReasons, failureReason) {
allErrs = append(allErrs, fmt.Errorf("invalid job failure reason '%s' in failure policy is not a recognized job failure reason", failureReason))
}
}
}

// Checking that rule names are unique
for ruleName, rulesWithName := range ruleNameToRulesWithName {
if len(rulesWithName) > 1 {
err := fmt.Errorf("rule names are not unique, rules with indices %v all have the same name '%v'", rulesWithName, ruleName)
allErrs = append(allErrs, err)
}
}

return allErrs
}

func completionModePtr(mode batchv1.CompletionMode) *batchv1.CompletionMode {
return &mode
}
Expand Down
112 changes: 91 additions & 21 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1475,7 +1476,7 @@ func TestValidateUpdate(t *testing.T) {
}.ToAggregate(),
},
{
name: "replicated jobs are immutable",
name: "replicated job pod template can be updated for suspended jobset",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Expand All @@ -1484,17 +1485,56 @@ func TestValidateUpdate(t *testing.T) {
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
// Adding an annotation.
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"key": "value"},
},
},
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
},
{
name: "replicated job pod template cannot be updated for running jobset",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
// Adding an annotation.
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"key": "value"},
},
},
},
},
},
Expand All @@ -1504,31 +1544,60 @@ func TestValidateUpdate(t *testing.T) {
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: validReplicatedJobs,
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
want: field.ErrorList{
field.Invalid(field.NewPath("spec").Child("replicatedJobs"), []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"),
}.ToAggregate(),
},
{
name: "replicated job name cannot be updated",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "new-replicated-job-name",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
}, "field is immutable"),
},
},
want: field.ErrorList{
field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"),
}.ToAggregate(),
},
}
Expand All @@ -1541,7 +1610,8 @@ func TestValidateUpdate(t *testing.T) {
newObj := tc.js.DeepCopyObject()
oldObj := tc.oldJs.DeepCopyObject()
_, err = webhook.ValidateUpdate(context.TODO(), oldObj, newObj)
if diff := cmp.Diff(tc.want, err); diff != "" {
// Ignore bad value to keep test cases short and readable.
if diff := cmp.Diff(tc.want, err, cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" {
t.Errorf("ValidateResources() mismatch (-want +got):\n%s", diff)
}
})
Expand Down
Loading

0 comments on commit 6988760

Please sign in to comment.