diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index e0cfe3d2a4d..9e257797b85 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -230,7 +230,7 @@ spec: pkg, err := pkger.Parse(pkger.EncodingYAML, pkger.FromString(pkgStr)) require.NoError(t, err) - sum, _, err := svc.DryRun(context.Background(), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ + sum, _, err := svc.DryRun(timedCtx(2*time.Second), l.Org.ID, l.User.ID, pkg, pkger.ApplyWithEnvRefs(map[string]string{ "bkt-1-name-ref": "new-bkt-name", "label-1-name-ref": "new-label-name", })) @@ -1167,8 +1167,9 @@ spec: apiVersion: %[1]s kind: Task metadata: - name: task_1 + name: task_UUID spec: + name: task_1 description: desc_1 cron: 15 * * * * query: > diff --git a/pkger/clone_resource.go b/pkger/clone_resource.go index e3c13c4e69d..0a982aa4519 100644 --- a/pkger/clone_resource.go +++ b/pkger/clone_resource.go @@ -886,21 +886,15 @@ func taskToObject(t influxdb.Task, name string) Object { query := strings.TrimSpace(taskFluxRegex.ReplaceAllString(t.Flux, "")) - k := Object{ - APIVersion: APIVersion, - Kind: KindTask, - Metadata: convertToMetadataResource(name), - Spec: Resource{ - fieldQuery: strings.TrimSpace(query), - }, - } - assignNonZeroStrings(k.Spec, map[string]string{ + o := newObject(KindTask, name) + assignNonZeroStrings(o.Spec, map[string]string{ fieldTaskCron: t.Cron, fieldDescription: t.Description, fieldEvery: t.Every, fieldOffset: durToStr(t.Offset), + fieldQuery: strings.TrimSpace(query), }) - return k + return o } func telegrafToObject(t influxdb.TelegrafConfig, name string) Object { @@ -954,12 +948,6 @@ func VariableToObject(v influxdb.Variable, name string) Object { return o } -func convertToMetadataResource(name string) Resource { - return Resource{ - fieldName: name, - } -} - func newObject(kind Kind, name string) Object { return Object{ APIVersion: APIVersion, diff --git a/pkger/models.go b/pkger/models.go index 913bd949ab0..2ad1f051f52 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -1874,6 +1874,7 @@ type task struct { id influxdb.ID orgID influxdb.ID name *references + displayName *references cron string description string every time.Duration @@ -1897,6 +1898,13 @@ func (t *task) Labels() []*label { } func (t *task) Name() string { + if displayName := t.displayName.String(); displayName != "" { + return displayName + } + return t.name.String() +} + +func (t *task) PkgName() string { return t.name.String() } @@ -1914,7 +1922,7 @@ func (t *task) Status() influxdb.Status { var fluxRegex = regexp.MustCompile(`import\s+\".*\"`) func (t *task) flux() string { - taskOpts := []string{fmt.Sprintf("name: %q", t.name)} + taskOpts := []string{fmt.Sprintf("name: %q", t.Name())} if t.cron != "" { taskOpts = append(taskOpts, fmt.Sprintf("cron: %q", t.cron)) } @@ -1987,7 +1995,14 @@ func (t *task) valid() []validationErr { Msg: "must be 1 of [active, inactive]", }) } - return vErrs + + if len(vErrs) > 0 { + return []validationErr{ + objectValidationErr(fieldSpec, vErrs...), + } + } + + return nil } type mapperTasks []*task diff --git a/pkger/parser.go b/pkger/parser.go index 8d91276ad32..5e0d68a0d14 100644 --- a/pkger/parser.go +++ b/pkger/parser.go @@ -241,7 +241,7 @@ type Pkg struct { mDashboards map[string]*dashboard mNotificationEndpoints map[string]*notificationEndpoint mNotificationRules map[string]*notificationRule - mTasks []*task + mTasks map[string]*task mTelegrafs map[string]*telegraf mVariables map[string]*variable @@ -528,7 +528,10 @@ func (p *Pkg) missingSecrets() []string { } func (p *Pkg) tasks() []*task { - tasks := p.mTasks[:] + tasks := make([]*task, 0, len(p.mTasks)) + for _, t := range p.mTasks { + tasks = append(tasks, t) + } sort.Slice(tasks, func(i, j int) bool { return tasks[i].Name() < tasks[j].Name() }) @@ -541,7 +544,9 @@ func (p *Pkg) telegrafs() []*telegraf { t.config.Name = t.Name() teles = append(teles, t) } + sort.Slice(teles, func(i, j int) bool { return teles[i].Name() < teles[j].Name() }) + return teles } @@ -848,6 +853,7 @@ func (p *Pkg) graphDashboards() *parseErr { }), } } + dash := &dashboard{ name: nameRef, displayName: p.getRefWithKnownEnvs(o.Spec, fieldName), @@ -1031,10 +1037,21 @@ func (p *Pkg) graphNotificationRules() *parseErr { } func (p *Pkg) graphTasks() *parseErr { - p.mTasks = make([]*task, 0) + p.mTasks = make(map[string]*task) return p.eachResource(KindTask, 1, func(o Object) []validationErr { + nameRef := p.getRefWithKnownEnvs(o.Metadata, fieldName) + if _, ok := p.mTasks[nameRef.String()]; ok { + return []validationErr{ + objectValidationErr(fieldMetadata, validationErr{ + Field: fieldName, + Msg: "duplicate name: " + nameRef.String(), + }), + } + } + t := &task{ name: p.getRefWithKnownEnvs(o.Metadata, fieldName), + displayName: p.getRefWithKnownEnvs(o.Spec, fieldName), cron: o.Spec.stringShort(fieldTaskCron), description: o.Spec.stringShort(fieldDescription), every: o.Spec.durationShort(fieldEvery), @@ -1050,7 +1067,7 @@ func (p *Pkg) graphTasks() *parseErr { }) sort.Sort(t.labels) - p.mTasks = append(p.mTasks, t) + p.mTasks[t.PkgName()] = t p.setRefs(t.name) return append(failures, t.valid()...) }) diff --git a/pkger/parser_test.go b/pkger/parser_test.go index b0578850099..bc818cf84f4 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -3343,14 +3343,14 @@ spec: require.Len(t, sum.Labels, 1) - task1 := tasks[0] - baseEqual(t, 0, influxdb.Inactive, task1) - assert.Equal(t, (10 * time.Minute).String(), task1.Every) - assert.Equal(t, (15 * time.Second).String(), task1.Offset) - - task2 := tasks[1] - baseEqual(t, 1, influxdb.Active, task2) - assert.Equal(t, "15 * * * *", task2.Cron) + task0 := tasks[0] + baseEqual(t, 0, influxdb.Inactive, task0) + assert.Equal(t, (10 * time.Minute).String(), task0.Every) + assert.Equal(t, (15 * time.Second).String(), task0.Offset) + + task1 := tasks[1] + baseEqual(t, 1, influxdb.Active, task1) + assert.Equal(t, "15 * * * *", task1.Cron) }) }) @@ -3381,7 +3381,7 @@ spec: resErr: testPkgResourceError{ name: "invalid status", validationErrs: 1, - valFields: []string{fieldStatus}, + valFields: []string{fieldSpec, fieldStatus}, pkgStr: `apiVersion: influxdata.com/v2alpha1 kind: Task metadata: @@ -3399,7 +3399,7 @@ spec: resErr: testPkgResourceError{ name: "missing query", validationErrs: 1, - valFields: []string{fieldQuery}, + valFields: []string{fieldSpec, fieldQuery}, pkgStr: `apiVersion: influxdata.com/v2alpha1 kind: Task metadata: @@ -3416,7 +3416,7 @@ spec: resErr: testPkgResourceError{ name: "missing every and cron fields", validationErrs: 1, - valFields: []string{fieldEvery, fieldTaskCron}, + valFields: []string{fieldSpec, fieldEvery, fieldTaskCron}, pkgStr: `apiVersion: influxdata.com/v2alpha1 kind: Task metadata: @@ -3432,7 +3432,7 @@ spec: resErr: testPkgResourceError{ name: "invalid association", validationErrs: 1, - valFields: []string{fieldAssociations}, + valFields: []string{fieldSpec, fieldAssociations}, pkgStr: `apiVersion: influxdata.com/v2alpha1 kind: Task metadata: @@ -3452,7 +3452,7 @@ spec: resErr: testPkgResourceError{ name: "duplicate association", validationErrs: 1, - valFields: []string{fieldAssociations}, + valFields: []string{fieldSpec, fieldAssociations}, pkgStr: `--- apiVersion: influxdata.com/v2alpha1 kind: Label @@ -3474,6 +3474,33 @@ spec: name: label_1 - kind: Label name: label_1 +`, + }, + }, + { + kind: KindTask, + resErr: testPkgResourceError{ + name: "duplicate meta names", + validationErrs: 1, + valFields: []string{fieldMetadata, fieldName}, + pkgStr: ` +apiVersion: influxdata.com/v2alpha1 +kind: Task +metadata: + name: task_0 +spec: + every: 10m + query: > + from(bucket: "rucket_1") |> yield(name: "mean") +--- +apiVersion: influxdata.com/v2alpha1 +kind: Task +metadata: + name: task_0 +spec: + every: 10m + query: > + from(bucket: "rucket_1") |> yield(name: "mean") `, }, }, diff --git a/pkger/service_test.go b/pkger/service_test.go index f747792a7f3..5a17ae1ee73 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -2427,78 +2427,125 @@ func TestService(t *testing.T) { }) t.Run("tasks", func(t *testing.T) { - tests := []struct { - name string - newName string - task influxdb.Task - }{ - { - name: "every offset is set", - newName: "new name", - task: influxdb.Task{ - ID: 1, - Name: "name_9000", - Every: time.Minute.String(), - Offset: 10 * time.Second, - Type: influxdb.TaskSystemType, - Flux: `option task = { name: "larry" } from(bucket: "rucket") |> yield()`, + t.Run("single task exports", func(t *testing.T) { + tests := []struct { + name string + newName string + task influxdb.Task + }{ + { + name: "every offset is set", + newName: "new name", + task: influxdb.Task{ + ID: 1, + Name: "name_9000", + Every: time.Minute.String(), + Offset: 10 * time.Second, + Type: influxdb.TaskSystemType, + Flux: `option task = { name: "larry" } from(bucket: "rucket") |> yield()`, + }, }, - }, - { - name: "cron is set", - task: influxdb.Task{ - ID: 1, - Name: "name_0", - Cron: "2 * * * *", - Type: influxdb.TaskSystemType, - Flux: `option task = { name: "larry" } from(bucket: "rucket") |> yield()`, + { + name: "cron is set", + task: influxdb.Task{ + ID: 1, + Name: "name_0", + Cron: "2 * * * *", + Type: influxdb.TaskSystemType, + Flux: `option task = { name: "larry" } from(bucket: "rucket") |> yield()`, + }, }, - }, - } + } - for _, tt := range tests { - fn := func(t *testing.T) { - taskSVC := mock.NewTaskService() - taskSVC.FindTaskByIDFn = func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { - if id != tt.task.ID { - return nil, errors.New("wrong id provided: " + id.String()) + for _, tt := range tests { + fn := func(t *testing.T) { + taskSVC := mock.NewTaskService() + taskSVC.FindTaskByIDFn = func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { + if id != tt.task.ID { + return nil, errors.New("wrong id provided: " + id.String()) + } + return &tt.task, nil } - return &tt.task, nil - } - svc := newTestService(WithTaskSVC(taskSVC)) + svc := newTestService(WithTaskSVC(taskSVC)) - resToClone := ResourceToClone{ - Kind: KindTask, - ID: tt.task.ID, - Name: tt.newName, - } - pkg, err := svc.CreatePkg(context.TODO(), CreateWithExistingResources(resToClone)) - require.NoError(t, err) + resToClone := ResourceToClone{ + Kind: KindTask, + ID: tt.task.ID, + Name: tt.newName, + } + pkg, err := svc.CreatePkg(context.TODO(), CreateWithExistingResources(resToClone)) + require.NoError(t, err) - newPkg := encodeAndDecode(t, pkg) + newPkg := encodeAndDecode(t, pkg) - sum := newPkg.Summary() + sum := newPkg.Summary() - tasks := sum.Tasks - require.Len(t, tasks, 1) + tasks := sum.Tasks + require.Len(t, tasks, 1) - expectedName := tt.task.Name - if tt.newName != "" { - expectedName = tt.newName + expectedName := tt.task.Name + if tt.newName != "" { + expectedName = tt.newName + } + actual := tasks[0] + assert.Equal(t, expectedName, actual.Name) + assert.Equal(t, tt.task.Cron, actual.Cron) + assert.Equal(t, tt.task.Description, actual.Description) + assert.Equal(t, tt.task.Every, actual.Every) + assert.Equal(t, durToStr(tt.task.Offset), actual.Offset) + + expectedQuery := `from(bucket: "rucket") |> yield()` + assert.Equal(t, expectedQuery, actual.Query) } - actual := tasks[0] - assert.Equal(t, expectedName, actual.Name) - assert.Equal(t, tt.task.Cron, actual.Cron) - assert.Equal(t, tt.task.Description, actual.Description) - assert.Equal(t, tt.task.Every, actual.Every) - assert.Equal(t, durToStr(tt.task.Offset), actual.Offset) + t.Run(tt.name, fn) + } + }) - expectedQuery := `from(bucket: "rucket") |> yield()` - assert.Equal(t, expectedQuery, actual.Query) + t.Run("handles multiple tasks of same name", func(t *testing.T) { + taskSVC := mock.NewTaskService() + taskSVC.FindTaskByIDFn = func(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { + return &influxdb.Task{ + ID: id, + Type: influxdb.TaskSystemType, + Name: "same name", + Description: "desc", + Status: influxdb.TaskStatusActive, + Flux: `from(bucket: "foo")`, + Every: "5m0s", + }, nil } - t.Run(tt.name, fn) - } + + svc := newTestService(WithTaskSVC(taskSVC)) + + resourcesToClone := []ResourceToClone{ + { + Kind: KindTask, + ID: 1, + }, + { + Kind: KindTask, + ID: 2, + }, + } + pkg, err := svc.CreatePkg(context.TODO(), CreateWithExistingResources(resourcesToClone...)) + require.NoError(t, err) + + newPkg := encodeAndDecode(t, pkg) + + sum := newPkg.Summary() + + tasks := sum.Tasks + require.Len(t, tasks, len(resourcesToClone)) + + for _, actual := range sum.Tasks { + assert.Equal(t, "same name", actual.Name) + assert.Equal(t, "desc", actual.Description) + assert.Equal(t, influxdb.Active, actual.Status) + assert.Equal(t, `from(bucket: "foo")`, actual.Query) + assert.Equal(t, "5m0s", actual.Every) + } + }) }) t.Run("telegraf configs", func(t *testing.T) { diff --git a/pkger/testdata/tasks.json b/pkger/testdata/tasks.json index 0bf60abcd2d..daf84fa13a6 100644 --- a/pkger/testdata/tasks.json +++ b/pkger/testdata/tasks.json @@ -10,9 +10,10 @@ "apiVersion": "influxdata.com/v2alpha1", "kind": "Task", "metadata": { - "name": "task_0" + "name": "task_UUID" }, "spec": { + "name": "task_0", "description": "desc_0", "every": "10m", "offset": "15s", diff --git a/pkger/testdata/tasks.yml b/pkger/testdata/tasks.yml index 4f548d4ad61..5a655129708 100644 --- a/pkger/testdata/tasks.yml +++ b/pkger/testdata/tasks.yml @@ -6,8 +6,9 @@ metadata: apiVersion: influxdata.com/v2alpha1 kind: Task metadata: - name: task_0 + name: task_UUID spec: + name: task_0 description: desc_0 every: 10m offset: 15s