From e922c8a26f9bb1ad5a6827aac84550eec7fdac41 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Tue, 6 Aug 2019 10:27:52 -0600 Subject: [PATCH] feat(task): add task types (#14567) --- CHANGELOG.md | 1 + authorizer/task.go | 4 ++ authorizer/task_test.go | 21 +++++++- http/swagger.yml | 6 +++ http/task_service.go | 8 +++ http/task_test.go | 7 ++- kv/task.go | 29 ++++++++-- task.go | 5 ++ task/servicetest/servicetest.go | 93 ++++++++++++++++++++++++++++++++- task_errors.go | 5 ++ 10 files changed, 172 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 394f63dc16d..52f08e36893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response. +1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types. ### UI Improvements diff --git a/authorizer/task.go b/authorizer/task.go index bc022a320dd..150677828f5 100644 --- a/authorizer/task.go +++ b/authorizer/task.go @@ -122,6 +122,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC return nil, influxdb.ErrMissingToken } + if t.Type == influxdb.TaskTypeWildcard { + return nil, influxdb.ErrInvalidTaskType + } + p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID) if err != nil { return nil, err diff --git a/authorizer/task_test.go b/authorizer/task_test.go index 6222108e54e..fe7d9f9ce0a 100644 --- a/authorizer/task_test.go +++ b/authorizer/task_test.go @@ -216,6 +216,25 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`, auth: &influxdb.Authorization{}, }, { + name: "create bad type", + check: func(ctx context.Context, svc influxdb.TaskService) error { + _, err := svc.CreateTask(ctx, influxdb.TaskCreate{ + OrganizationID: r.Org.ID, + Token: r.Auth.Token, + Type: influxdb.TaskTypeWildcard, + Flux: `option task = { + name: "my_task", + every: 1s, +} +from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`, + }) + if err != influxdb.ErrInvalidTaskType { + return errors.New("failed to error with invalid task type") + } + return nil + }, + auth: &influxdb.Authorization{}, + }, { name: "create success", auth: r.Auth, check: func(ctx context.Context, svc influxdb.TaskService) error { @@ -232,7 +251,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`, }, }, { - name: "create badbucket", + name: "create bad bucket", auth: r.Auth, check: func(ctx context.Context, svc influxdb.TaskService) error { var ( diff --git a/http/swagger.yml b/http/swagger.yml index 0412de22172..6438d916eea 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -6556,6 +6556,9 @@ components: id: readOnly: true type: string + type: + description: The type of task, this can be used for filtering tasks on list actions. + type: string orgID: description: The ID of the organization that owns this Task. type: string @@ -8816,6 +8819,9 @@ components: TaskCreateRequest: type: object properties: + type: + description: The type of task, this can be used for filtering tasks on list actions. + type: string orgID: description: The ID of the organization that owns this Task. type: string diff --git a/http/task_service.go b/http/task_service.go index 295430dc80d..30833a0e22e 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -358,6 +358,10 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs platform.O req.filter.Limit = platform.TaskDefaultPageSize } + if ttype := qp.Get("type"); ttype != "" { + req.filter.Type = &ttype + } + return req, nil } @@ -1309,6 +1313,10 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter) val.Add("limit", strconv.Itoa(filter.Limit)) } + if filter.Type != nil { + val.Add("type", *filter.Type) + } + u.RawQuery = val.Encode() req, err := http.NewRequest("GET", u.String(), nil) diff --git a/http/task_test.go b/http/task_test.go index 372ce1bb89b..57d199cebb6 100644 --- a/http/task_test.go +++ b/http/task_test.go @@ -67,7 +67,12 @@ func TestTaskService(t *testing.T) { Token: auth.Token, } - cFunc := func() (servicetest.TestCreds, error) { + cFunc := func(t *testing.T) (servicetest.TestCreds, error) { + org := &platform.Organization{Name: t.Name() + "_org"} + if err := service.CreateOrganization(ctx, org); err != nil { + t.Fatal(err) + } + return servicetest.TestCreds{ OrgID: org.ID, Org: org.Name, diff --git a/kv/task.go b/kv/task.go index 4ba9308cdc3..ff6c8f87dea 100644 --- a/kv/task.go +++ b/kv/task.go @@ -227,6 +227,14 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta continue } + if filter.Type == nil { + ft := "" + filter.Type = &ft + } + if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != task.Type { + continue + } + ts = append(ts, task) if len(ts) >= filter.Limit { @@ -296,10 +304,16 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task return nil, 0, err } - // insert the new task into the list if t != nil { - ts = append(ts, t) - + typ := "" + if filter.Type != nil { + typ = *filter.Type + } + + // if the filter type matches task type or filter type is a wildcard + if typ == t.Type || typ == influxdb.TaskTypeWildcard { + ts = append(ts, t) + } } } } @@ -334,6 +348,14 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task break } + if filter.Type == nil { + ft := "" + filter.Type = &ft + } + if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != t.Type { + continue + } + // insert the new task into the list ts = append(ts, t) @@ -491,6 +513,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) createdAt := time.Now().UTC().Format(time.RFC3339) task := &influxdb.Task{ ID: s.IDGenerator.ID(), + Type: tc.Type, OrganizationID: org.ID, Organization: org.Name, AuthorizationID: auth.Identifier(), diff --git a/task.go b/task.go index 9231baeab5c..935925cd21f 100644 --- a/task.go +++ b/task.go @@ -20,11 +20,14 @@ const ( TaskStatusActive = "active" TaskStatusInactive = "inactive" + + TaskTypeWildcard = "*" ) // Task is a task. 🎊 type Task struct { ID ID `json:"id"` + Type string `json:"type,omitempty"` OrganizationID ID `json:"orgID"` Organization string `json:"org"` AuthorizationID ID `json:"authorizationID"` @@ -134,6 +137,7 @@ type TaskService interface { // TaskCreate is the set of values to create a task. type TaskCreate struct { + Type string `json:"type,omitempty"` Flux string `json:"flux"` Description string `json:"description,omitempty"` Status string `json:"status,omitempty"` @@ -374,6 +378,7 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error { // TaskFilter represents a set of filters that restrict the returned results type TaskFilter struct { + Type *string After *ID OrganizationID *ID Organization string diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index cc0b57cfa6f..11b62dcf57b 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -83,6 +83,12 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s t.Parallel() testManualRun(t, sys) }) + + t.Run("Task Type", func(t *testing.T) { + t.Parallel() + testTaskType(t, sys) + }) + }) case "analytical": t.Run("AnalyticalTaskService", func(t *testing.T) { @@ -149,7 +155,7 @@ type System struct { // However, if the system needs to verify credentials, // the caller should set this value and return valid IDs and a valid token. // It is safe if this returns the same values every time it is called. - CredsFunc func() (TestCreds, error) + CredsFunc func(*testing.T) (TestCreds, error) } func testTaskCRUD(t *testing.T, sys *System) { @@ -1405,7 +1411,7 @@ func creds(t *testing.T, s *System) TestCreds { } } - c, err := s.CredsFunc() + c, err := s.CredsFunc(t) if err != nil { t.Fatal(err) } @@ -1437,3 +1443,86 @@ option task = { from(bucket: "b") |> http.to(url: "http://example.com")` ) + +func testTaskType(t *testing.T, sys *System) { + cr := creds(t, sys) + authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) + + // Create a tasks + ts := influxdb.TaskCreate{ + OrganizationID: cr.OrgID, + Flux: fmt.Sprintf(scriptFmt, 0), + Token: cr.Token, + } + + tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts) + if err != nil { + t.Fatal(err) + } + if !tsk.ID.Valid() { + t.Fatal("no task ID set") + } + + tc := influxdb.TaskCreate{ + Type: "cows", + OrganizationID: cr.OrgID, + Flux: fmt.Sprintf(scriptFmt, 0), + Token: cr.Token, + } + + tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc) + if err != nil { + t.Fatal(err) + } + if !tskCow.ID.Valid() { + t.Fatal("no task ID set") + } + + tp := influxdb.TaskCreate{ + Type: "pigs", + OrganizationID: cr.OrgID, + Flux: fmt.Sprintf(scriptFmt, 0), + Token: cr.Token, + } + + tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp) + if err != nil { + t.Fatal(err) + } + if !tskPig.ID.Valid() { + t.Fatal("no task ID set") + } + + // get default tasks + tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID}) + if err != nil { + t.Fatal(err) + } + + for _, task := range tasks { + if task.Type != "" { + t.Fatal("recieved a task with a type when sending no type restriction") + } + } + + // get filtered tasks + tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &tc.Type}) + if err != nil { + t.Fatal(err) + } + + if len(tasks) != 1 { + t.Fatalf("failed to return tasks by type, expected 1, got %d", len(tasks)) + } + + // get all tasks + wc := influxdb.TaskTypeWildcard + tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &wc}) + if err != nil { + t.Fatal(err) + } + + if len(tasks) != 3 { + t.Fatalf("failed to return tasks with wildcard, expected 3, got %d", len(tasks)) + } +} diff --git a/task_errors.go b/task_errors.go index c5390164872..18465dd4fcd 100644 --- a/task_errors.go +++ b/task_errors.go @@ -36,6 +36,11 @@ var ( Msg: "invalid id", } + // ErrInvalidTaskType error object for bad id's + ErrInvalidTaskType = &Error{ + Code: EInvalid, + Msg: "invalid task type", + } // ErrTaskNotFound indicates no task could be found for given parameters. ErrTaskNotFound = &Error{ Code: ENotFound,