From 49d265bb7778571b09d12c462a7492edef261191 Mon Sep 17 00:00:00 2001 From: Juana De La Cuesta Date: Tue, 4 Apr 2023 14:10:44 +0200 Subject: [PATCH] func: add validation for kill timeout smaller than progress dealine --- nomad/structs/structs.go | 17 ++ nomad/structs/structs_test.go | 534 +++++++++++++++++++++++++++++----- 2 files changed, 485 insertions(+), 66 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 490c963584bd..55f0b1fb17f8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5136,6 +5136,12 @@ func (u *UpdateStrategy) IsEmpty() bool { return true } + // When the Job is transformed from api to struct, the Update Strategy block is + // copied into the existing task groups, the only things that are passed along + // are MaxParallel and Stagger, because they are enforced at job level. + // That is why checking if MaxParallel is cero is enough to know if the + // update block is empty. + return u.MaxParallel == 0 } @@ -6708,6 +6714,8 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, outer) } + isTypeService := j.Type == JobTypeService + // Validate the tasks for _, task := range tg.Tasks { // Validate the task does not reference undefined volume mounts @@ -6727,6 +6735,15 @@ func (tg *TaskGroup) Validate(j *Job) error { outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err) mErr.Errors = append(mErr.Errors, outer) } + + // Validate the group's Update Strategy does not conflict with the Task's kill_timeout for service type jobs + if isTypeService && tg.Update != nil { + if task.KillTimeout > tg.Update.ProgressDeadline { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %s has a kill timout (%s) longer than the group's progress deadline (%s)", + task.Name, task.KillTimeout.String(), tg.Update.ProgressDeadline.String())) + } + } + } return mErr.ErrorOrNil() diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 772fa49ffb7c..b2230a8feb3e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -22,81 +22,109 @@ import ( func TestJob_Validate(t *testing.T) { ci.Parallel(t) - j := &Job{} - err := j.Validate() - requireErrors(t, err, - "datacenters", - "job ID", - "job name", - "job region", - "job type", - "namespace", - "task groups", - ) - - j = &Job{ - Type: "invalid-job-type", - } - err = j.Validate() - if expected := `Invalid job type: "invalid-job-type"`; !strings.Contains(err.Error(), expected) { - t.Errorf("expected %s but found: %v", expected, err) - } - - j = &Job{ - Type: JobTypeService, - Periodic: &PeriodicConfig{ - Enabled: true, + tests := []struct { + name string + job *Job + expErr []string + }{ + { + name: "job is empty", + job: &Job{}, + expErr: []string{ + "datacenters", + "job ID", + "job name", + "job region", + "job type", + "namespace", + "task groups", + }, }, - } - err = j.Validate() - require.Error(t, err, "Periodic") - - j = &Job{ - Region: "global", - ID: uuid.Generate(), - Namespace: "test", - Name: "my-job", - Type: JobTypeService, - Priority: JobDefaultPriority, - Datacenters: []string{"*"}, - TaskGroups: []*TaskGroup{ - { - Name: "web", - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, - }, + { + name: "job type is invalid", + job: &Job{ + Type: "invalid-job-type", }, - { - Name: "web", - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, + expErr: []string{ + `Invalid job type: "invalid-job-type"`, + }, + }, + { + name: "job periodic specification type is missing", + job: &Job{ + Type: JobTypeService, + Periodic: &PeriodicConfig{ + Enabled: true, }, }, - { - RestartPolicy: &RestartPolicy{ - Interval: 5 * time.Minute, - Delay: 10 * time.Second, - Attempts: 10, + expErr: []string{ + `Unknown periodic specification type ""`, + "Must specify a spec", + }, + }, + { + name: "job datacenters is empty", + job: &Job{ + Datacenters: []string{""}, + }, + expErr: []string{ + "datacenter must be non-empty string", + }, + }, + { + name: "job task group is type invalid", + job: &Job{ + Region: "global", + ID: uuid.Generate(), + Namespace: "test", + Name: "my-job", + Type: JobTypeService, + Priority: JobDefaultPriority, + Datacenters: []string{"*"}, + TaskGroups: []*TaskGroup{ + { + Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, + { + Name: "web", + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, + { + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + }, + }, }, }, + expErr: []string{ + "2 redefines 'web' from group 1", + "group 3 missing name", + "Task group web validation failed", + "Missing tasks for task group", + "Unsupported restart mode", + "Task Group web should have a reschedule policy", + "Task Group web should have an ephemeral disk object", + }, }, } - err = j.Validate() - requireErrors(t, err, - "2 redefines 'web' from group 1", - "group 3 missing name", - "Task group web validation failed", - ) - // test for invalid datacenters - j = &Job{ - Datacenters: []string{""}, + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.job.Validate() + requireErrors(t, err, tc.expErr...) + }) } - err = j.Validate() - require.Error(t, err, "datacenter must be non-empty string") + } func TestJob_ValidateScaling(t *testing.T) { @@ -1004,6 +1032,380 @@ func TestTaskGroup_UsesConnect(t *testing.T) { }) } +func TestTaskGroup_Validate2(t *testing.T) { + ci.Parallel(t) + + tests := []struct { + name string + tg *TaskGroup + expErr []string + jobType string + }{ + { + name: "task group is missing basic specs", + tg: &TaskGroup{ + Count: -1, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 5, + Delay: 5 * time.Second, + }, + }, + expErr: []string{ + "group name", + "count can't be negative", + "Missing tasks", + }, + jobType: JobTypeService, + }, + { + name: "two tasks using same port", + tg: &TaskGroup{ + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{{Label: "foo", Value: 123}}, + }, + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{{Label: "foo", Value: 123}}, + }, + }, + }, + }, + }, + }, + expErr: []string{ + "Static port 123 already reserved by task-a:foo", + }, + jobType: JobTypeService, + }, + { + name: "one task using same port twice", + tg: &TaskGroup{ + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{ + Networks: []*NetworkResource{ + { + ReservedPorts: []Port{ + {Label: "foo", Value: 123}, + {Label: "bar", Value: 123}, + }, + }, + }, + }, + }, + }, + }, + expErr: []string{ + "Static port 123 already reserved by task-a:foo", + }, + jobType: JobTypeService, + }, + { + name: "multiple leaders defined and one empty task", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + {Name: "web", Leader: true}, + {Name: "web", Leader: true}, + {}, + }, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 10, + Delay: 5 * time.Second, + DelayFunction: "constant", + }, + }, + expErr: []string{ + "should have an ephemeral disk object", + "2 redefines 'web' from task 1", + "Task 3 missing name", + "Only one task may be marked as leader", + "Task web validation failed", + }, + jobType: JobTypeService, + }, + { + name: "invalid update block for batch job", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + {Name: "web", Leader: true}, + }, + Update: DefaultUpdateStrategy.Copy(), + }, + expErr: []string{ + "does not allow update block", + }, + jobType: JobTypeBatch, + }, + { + name: "invalid reschedule policy for system job", + tg: &TaskGroup{ + Count: -1, + RestartPolicy: &RestartPolicy{ + Interval: 5 * time.Minute, + Delay: 10 * time.Second, + Attempts: 10, + Mode: RestartPolicyModeDelay, + }, + ReschedulePolicy: &ReschedulePolicy{ + Interval: 5 * time.Minute, + Attempts: 5, + Delay: 5 * time.Second, + }, + }, + expErr: []string{ + "System jobs should not have a reschedule policy", + }, + jobType: JobTypeSystem, + }, + { + name: "duplicated por label", + tg: &TaskGroup{ + Networks: []*NetworkResource{ + { + DynamicPorts: []Port{{"http", 0, 80, ""}}, + }, + }, + Tasks: []*Task{ + { + Resources: &Resources{ + Networks: []*NetworkResource{ + { + DynamicPorts: []Port{{"http", 0, 80, ""}}, + }, + }, + }, + }, + }, + }, + expErr: []string{ + "Port label http already in use", + }, + jobType: JobTypeService, + }, + { + name: "invalid volume type", + tg: &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "nothost", + Source: "foo", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + }, + expErr: []string{ + "volume has unrecognized type nothost", + }, + jobType: JobTypeService, + }, + { + name: "invalid volume with wrong CSI and canary specs", + tg: &TaskGroup{ + Name: "group-a", + Update: &UpdateStrategy{ + Canary: 1, + }, + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "csi", + PerAlloc: true, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + }, + expErr: []string{ + `volume has an empty source`, + `volume cannot be per_alloc when canaries are in use`, + `CSI volumes must have an attachment mode`, + `CSI volumes must have an access mode`, + }, + jobType: JobTypeService, + }, + { + name: "invalid task referencing non existent task", + tg: &TaskGroup{ + Name: "group-a", + Services: []*Service{ + { + Name: "service-a", + Provider: "consul", + Checks: []*ServiceCheck{ + { + Name: "check-a", + Type: "tcp", + TaskName: "task-b", + PortLabel: "http", + Interval: time.Duration(1 * time.Second), + Timeout: time.Duration(1 * time.Second), + }, + }, + }, + }, + Tasks: []*Task{ + {Name: "task-a"}, + }, + }, + expErr: []string{ + "Check check-a invalid: refers to non-existent task task-b", + }, + jobType: JobTypeService, + }, + { + name: "invalid volume for tasks", + tg: &TaskGroup{ + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "host", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "", + }, + }, + }, + { + Name: "task-b", + Resources: &Resources{}, + VolumeMounts: []*VolumeMount{ + { + Volume: "foob", + }, + }, + }, + }, + }, + expErr: []string{ + `Task task-a has a volume mount (0) referencing an empty volume`, + `Task task-b has a volume mount (0) referencing undefined volume foob`, + }, + jobType: JobTypeService, + }, + { + name: "services inside group using different providers", + tg: &TaskGroup{ + Name: "group-a", + Services: []*Service{ + { + Name: "service-a", + Provider: "nomad", + }, + { + Name: "service-b", + Provider: "consul", + }, + }, + Tasks: []*Task{{Name: "task-a"}}, + }, + expErr: []string{ + "Multiple service providers used: task group services must use the same provider", + }, + jobType: JobTypeService, + }, + { + name: "conflicting progress deadline and kill timeout", + tg: &TaskGroup{ + Name: "web", + Count: 1, + Tasks: []*Task{ + { + Name: "web", + Leader: true, + KillTimeout: DefaultUpdateStrategy.ProgressDeadline + 25*time.Minute, + }, + }, + Update: DefaultUpdateStrategy.Copy(), + }, + expErr: []string{ + "Task web has a kill timout (35m0s) longer than the group's progress deadline (10m0s)", + }, + jobType: JobTypeService, + }, + { + name: "service and task using different provider", + tg: &TaskGroup{ + Name: "group-a", + Services: []*Service{ + { + Name: "service-a", + Provider: "nomad", + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Services: []*Service{ + { + Name: "service-b", + Provider: "consul", + }, + }, + }, + }, + }, + expErr: []string{ + "Multiple service providers used: task group services must use the same provider", + }, + jobType: JobTypeService, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + j := testJob() + j.Type = tc.jobType + + err := tc.tg.Validate(j) + fmt.Println(err.Error()) + requireErrors(t, err, tc.expErr...) + }) + } +} + func TestTaskGroup_Validate(t *testing.T) { ci.Parallel(t)