Skip to content

Commit

Permalink
feat(task): add task types (#14567)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyondhill authored Aug 6, 2019
1 parent 17de20e commit e922c8a
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.

### UI Improvements

Expand Down
4 changes: 4 additions & 0 deletions authorizer/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
return nil, influxdb.ErrMissingToken
}

if t.Type == influxdb.TaskTypeWildcard {
return nil, influxdb.ErrInvalidTaskType
}

p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
if err != nil {
return nil, err
Expand Down
21 changes: 20 additions & 1 deletion authorizer/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
auth: &influxdb.Authorization{},
},
{
name: "create bad type",
check: func(ctx context.Context, svc influxdb.TaskService) error {
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: r.Org.ID,
Token: r.Auth.Token,
Type: influxdb.TaskTypeWildcard,
Flux: `option task = {
name: "my_task",
every: 1s,
}
from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
})
if err != influxdb.ErrInvalidTaskType {
return errors.New("failed to error with invalid task type")
}
return nil
},
auth: &influxdb.Authorization{},
}, {
name: "create success",
auth: r.Auth,
check: func(ctx context.Context, svc influxdb.TaskService) error {
Expand All @@ -232,7 +251,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
},
},
{
name: "create badbucket",
name: "create bad bucket",
auth: r.Auth,
check: func(ctx context.Context, svc influxdb.TaskService) error {
var (
Expand Down
6 changes: 6 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6556,6 +6556,9 @@ components:
id:
readOnly: true
type: string
type:
description: The type of task, this can be used for filtering tasks on list actions.
type: string
orgID:
description: The ID of the organization that owns this Task.
type: string
Expand Down Expand Up @@ -8816,6 +8819,9 @@ components:
TaskCreateRequest:
type: object
properties:
type:
description: The type of task, this can be used for filtering tasks on list actions.
type: string
orgID:
description: The ID of the organization that owns this Task.
type: string
Expand Down
8 changes: 8 additions & 0 deletions http/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs platform.O
req.filter.Limit = platform.TaskDefaultPageSize
}

if ttype := qp.Get("type"); ttype != "" {
req.filter.Type = &ttype
}

return req, nil
}

Expand Down Expand Up @@ -1309,6 +1313,10 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter)
val.Add("limit", strconv.Itoa(filter.Limit))
}

if filter.Type != nil {
val.Add("type", *filter.Type)
}

u.RawQuery = val.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
Expand Down
7 changes: 6 additions & 1 deletion http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func TestTaskService(t *testing.T) {
Token: auth.Token,
}

cFunc := func() (servicetest.TestCreds, error) {
cFunc := func(t *testing.T) (servicetest.TestCreds, error) {
org := &platform.Organization{Name: t.Name() + "_org"}
if err := service.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}

return servicetest.TestCreds{
OrgID: org.ID,
Org: org.Name,
Expand Down
29 changes: 26 additions & 3 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
continue
}

if filter.Type == nil {
ft := ""
filter.Type = &ft
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != task.Type {
continue
}

ts = append(ts, task)

if len(ts) >= filter.Limit {
Expand Down Expand Up @@ -296,10 +304,16 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
return nil, 0, err
}

// insert the new task into the list
if t != nil {
ts = append(ts, t)

typ := ""
if filter.Type != nil {
typ = *filter.Type
}

// if the filter type matches task type or filter type is a wildcard
if typ == t.Type || typ == influxdb.TaskTypeWildcard {
ts = append(ts, t)
}
}
}
}
Expand Down Expand Up @@ -334,6 +348,14 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
break
}

if filter.Type == nil {
ft := ""
filter.Type = &ft
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != t.Type {
continue
}

// insert the new task into the list
ts = append(ts, t)

Expand Down Expand Up @@ -491,6 +513,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
createdAt := time.Now().UTC().Format(time.RFC3339)
task := &influxdb.Task{
ID: s.IDGenerator.ID(),
Type: tc.Type,
OrganizationID: org.ID,
Organization: org.Name,
AuthorizationID: auth.Identifier(),
Expand Down
5 changes: 5 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ const (

TaskStatusActive = "active"
TaskStatusInactive = "inactive"

TaskTypeWildcard = "*"
)

// Task is a task. 🎊
type Task struct {
ID ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"authorizationID"`
Expand Down Expand Up @@ -134,6 +137,7 @@ type TaskService interface {

// TaskCreate is the set of values to create a task.
type TaskCreate struct {
Type string `json:"type,omitempty"`
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
Expand Down Expand Up @@ -374,6 +378,7 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {

// TaskFilter represents a set of filters that restrict the returned results
type TaskFilter struct {
Type *string
After *ID
OrganizationID *ID
Organization string
Expand Down
93 changes: 91 additions & 2 deletions task/servicetest/servicetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
t.Parallel()
testManualRun(t, sys)
})

t.Run("Task Type", func(t *testing.T) {
t.Parallel()
testTaskType(t, sys)
})

})
case "analytical":
t.Run("AnalyticalTaskService", func(t *testing.T) {
Expand Down Expand Up @@ -149,7 +155,7 @@ type System struct {
// However, if the system needs to verify credentials,
// the caller should set this value and return valid IDs and a valid token.
// It is safe if this returns the same values every time it is called.
CredsFunc func() (TestCreds, error)
CredsFunc func(*testing.T) (TestCreds, error)
}

func testTaskCRUD(t *testing.T, sys *System) {
Expand Down Expand Up @@ -1405,7 +1411,7 @@ func creds(t *testing.T, s *System) TestCreds {
}
}

c, err := s.CredsFunc()
c, err := s.CredsFunc(t)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1437,3 +1443,86 @@ option task = {
from(bucket: "b")
|> http.to(url: "http://example.com")`
)

func testTaskType(t *testing.T, sys *System) {
cr := creds(t, sys)
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())

// Create a tasks
ts := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts)
if err != nil {
t.Fatal(err)
}
if !tsk.ID.Valid() {
t.Fatal("no task ID set")
}

tc := influxdb.TaskCreate{
Type: "cows",
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc)
if err != nil {
t.Fatal(err)
}
if !tskCow.ID.Valid() {
t.Fatal("no task ID set")
}

tp := influxdb.TaskCreate{
Type: "pigs",
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}

tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp)
if err != nil {
t.Fatal(err)
}
if !tskPig.ID.Valid() {
t.Fatal("no task ID set")
}

// get default tasks
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID})
if err != nil {
t.Fatal(err)
}

for _, task := range tasks {
if task.Type != "" {
t.Fatal("recieved a task with a type when sending no type restriction")
}
}

// get filtered tasks
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &tc.Type})
if err != nil {
t.Fatal(err)
}

if len(tasks) != 1 {
t.Fatalf("failed to return tasks by type, expected 1, got %d", len(tasks))
}

// get all tasks
wc := influxdb.TaskTypeWildcard
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &wc})
if err != nil {
t.Fatal(err)
}

if len(tasks) != 3 {
t.Fatalf("failed to return tasks with wildcard, expected 3, got %d", len(tasks))
}
}
5 changes: 5 additions & 0 deletions task_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ var (
Msg: "invalid id",
}

// ErrInvalidTaskType error object for bad id's
ErrInvalidTaskType = &Error{
Code: EInvalid,
Msg: "invalid task type",
}
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = &Error{
Code: ENotFound,
Expand Down

0 comments on commit e922c8a

Please sign in to comment.