Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Add task middleware's for checks and notifications #14809

Merged
merged 1 commit into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v2.0.0-alpha.17 [unreleased]

### Features
1. [14809](https://github.com/influxdata/influxdb/pull/14809): Add task middleware's for checks and notifications
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.
1. [14604](https://github.com/influxdata/influxdb/pull/14604): When getting task runs from the API, runs will be returned in order of most recently scheduled first.
Expand Down
14 changes: 12 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
labelSvc platform.LabelService = m.kvService
secretSvc platform.SecretService = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationRuleSvc platform.NotificationRuleStore = m.kvService
notificationEndpointSvc platform.NotificationEndpointService = m.kvService
checkSvc platform.CheckService = m.kvService
)

switch m.secretStore {
Expand Down Expand Up @@ -565,6 +563,18 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.taskControlService = combinedTaskService
}

var checkSvc platform.CheckService
{
coordinator := coordinator.New(m.logger, m.scheduler)
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.New(m.logger, m.scheduler)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
}

// NATS streaming server
m.natsServer = nats.NewServer()
if err := m.natsServer.Open(); err != nil {
Expand Down
120 changes: 120 additions & 0 deletions task/backend/middleware/check_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package middleware

import (
"context"
"fmt"

"github.com/influxdata/influxdb"
)

// CoordinatingCheckService acts as a CheckService decorator that handles coordinating the api request
// with the required task control actions asynchronously via a message dispatcher
type CoordinatingCheckService struct {
influxdb.CheckService
coordinator Coordinator
taskService influxdb.TaskService
}

// NewCheckService constructs a new coordinating check service
func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingCheckService {
c := &CoordinatingCheckService{
CheckService: cs,
taskService: ts,
coordinator: coordinator,
}

return c
}

// CreateCheck Creates a check and Publishes the change it can be scheduled.
func (cs *CoordinatingCheckService) CreateCheck(ctx context.Context, c influxdb.Check, userID influxdb.ID) error {

if err := cs.CheckService.CreateCheck(ctx, c, userID); err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, c.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskCreated(ctx, t); err != nil {
if derr := cs.CheckService.DeleteCheck(ctx, c.GetID()); derr != nil {
return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr)
}

return err
}

return nil
}

// UpdateCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.UpdateCheck(ctx, id, c)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
}

// PatchCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.PatchCheck(ctx, id, upd)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)

}

// DeleteCheck delete the check and publishes the change, to allow the task owner to find out about this change faster.
func (cs *CoordinatingCheckService) DeleteCheck(ctx context.Context, id influxdb.ID) error {
check, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, check.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskDeleted(ctx, t.ID); err != nil {
return err
}

return cs.CheckService.DeleteCheck(ctx, id)
}
234 changes: 234 additions & 0 deletions task/backend/middleware/check_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package middleware_test

import (
"context"
"fmt"
"testing"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/check"
"github.com/influxdata/influxdb/notification/rule"
"github.com/influxdata/influxdb/task/backend/middleware"
)

type pipingCoordinator struct {
err error
taskCreatedPipe chan *influxdb.Task
taskUpdatedPipe chan *influxdb.Task
taskDeletedPipe chan influxdb.ID
}

func (p *pipingCoordinator) taskCreatedChan() <-chan *influxdb.Task {
if p.taskCreatedPipe == nil {
p.taskCreatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskCreatedPipe
}
func (p *pipingCoordinator) taskUpdatedChan() <-chan *influxdb.Task {
if p.taskUpdatedPipe == nil {
p.taskUpdatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskUpdatedPipe
}
func (p *pipingCoordinator) taskDeletedChan() <-chan influxdb.ID {
if p.taskDeletedPipe == nil {
p.taskDeletedPipe = make(chan influxdb.ID, 1)
}
return p.taskDeletedPipe
}

func (p *pipingCoordinator) TaskCreated(_ context.Context, t *influxdb.Task) error {
if p.taskCreatedPipe != nil {
p.taskCreatedPipe <- t
}
return p.err
}
func (p *pipingCoordinator) TaskUpdated(_ context.Context, from, to *influxdb.Task) error {
if p.taskUpdatedPipe != nil {
p.taskUpdatedPipe <- to
}
return p.err
}
func (p *pipingCoordinator) TaskDeleted(_ context.Context, id influxdb.ID) error {
if p.taskDeletedPipe != nil {
p.taskDeletedPipe <- id
}
return p.err
}
func (p *pipingCoordinator) RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error {
return p.err
}
func (p *pipingCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}
func (p *pipingCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}

type mockedSvc struct {
taskSvc *mock.TaskService
checkSvc *mock.CheckService
notificationSvc *mock.NotificationRuleStore
pipingCoordinator *pipingCoordinator
}

func newMockServices() mockedSvc {
return mockedSvc{
taskSvc: &mock.TaskService{
FindTaskByIDFn: func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { return &influxdb.Task{ID: id}, nil },
CreateTaskFn: func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) { return &influxdb.Task{ID: 1}, nil },
UpdateTaskFn: func(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) {
return &influxdb.Task{ID: id}, nil
},
DeleteTaskFn: func(context.Context, influxdb.ID) error { return nil },
},
checkSvc: &mock.CheckService{
FindCheckByIDFn: func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
CreateCheckFn: func(context.Context, influxdb.Check, influxdb.ID) error { return nil },
UpdateCheckFn: func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { return c, nil },
PatchCheckFn: func(_ context.Context, id influxdb.ID, _ influxdb.CheckUpdate) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
DeleteCheckFn: func(context.Context, influxdb.ID) error { return nil },
},
notificationSvc: &mock.NotificationRuleStore{
FindNotificationRuleByIDF: func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
CreateNotificationRuleF: func(context.Context, influxdb.NotificationRule, influxdb.ID) error { return nil },
UpdateNotificationRuleF: func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) {
return c, nil
},
PatchNotificationRuleF: func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
DeleteNotificationRuleF: func(context.Context, influxdb.ID) error { return nil },
},
pipingCoordinator: &pipingCoordinator{},
}
}

func newCheckSvcStack() (mockedSvc, *middleware.CoordinatingCheckService) {
msvcs := newMockServices()
return msvcs, middleware.NewCheckService(msvcs.checkSvc, msvcs.taskSvc, msvcs.pipingCoordinator)
}

func TestCheckCreate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskCreatedChan()

check := &check.Deadman{}
check.SetTaskID(4)

err := checkService.CreateCheck(context.Background(), check, 1)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}

mocks.pipingCoordinator.err = fmt.Errorf("bad")
mocks.checkSvc.DeleteCheckFn = func(context.Context, influxdb.ID) error { return fmt.Errorf("AARGH") }

err = checkService.CreateCheck(context.Background(), check, 1)
if err.Error() != "schedule task failed: bad\n\tcleanup also failed: AARGH" {
t.Fatal(err)
}
}

func TestCheckUpdate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
c.SetTaskID(10)
return c, nil
}

deadman := &check.Deadman{}
deadman.SetTaskID(4)

check, err := checkService.UpdateCheck(context.Background(), 1, deadman)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckPatch(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

deadman := &check.Deadman{}
deadman.SetTaskID(4)

mocks.checkSvc.PatchCheckFn = func(context.Context, influxdb.ID, influxdb.CheckUpdate) (influxdb.Check, error) {
return deadman, nil
}

check, err := checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{})
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckDelete(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskDeletedChan()

mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
c.SetTaskID(21)
return c, nil
}

err := checkService.DeleteCheck(context.Background(), 1)
if err != nil {
t.Fatal(err)
}

select {
case id := <-ch:
if id != influxdb.ID(21) {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}
Loading