Skip to content

Commit

Permalink
feat(task): Add task middleware's for checks and notifications
Browse files Browse the repository at this point in the history
To have checks and notifications happen transactionally we need to be
able to alert the task system when a new task was created using the checks and notifications systems.
These two new middlewares allow us to inform the task system of a update
to a task that was created through the check or notification systems.
  • Loading branch information
lyondhill committed Aug 26, 2019
1 parent 669b5f9 commit fc0d236
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v2.0.0-alpha.17 [unreleased]

### Features
1. [14809](https://github.com/influxdata/influxdb/pull/14809): Add task middleware's for checks and notifications
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.
1. [14604](https://github.com/influxdata/influxdb/pull/14604): When getting task runs from the API, runs will be returned in order of most recently scheduled first.
Expand Down
14 changes: 12 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
labelSvc platform.LabelService = m.kvService
secretSvc platform.SecretService = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationRuleSvc platform.NotificationRuleStore = m.kvService
notificationEndpointSvc platform.NotificationEndpointService = m.kvService
checkSvc platform.CheckService = m.kvService
)

switch m.secretStore {
Expand Down Expand Up @@ -565,6 +563,18 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.taskControlService = combinedTaskService
}

var checkSvc platform.CheckService
{
coordinator := coordinator.New(m.logger, m.scheduler)
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.New(m.logger, m.scheduler)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
}

// NATS streaming server
m.natsServer = nats.NewServer()
if err := m.natsServer.Open(); err != nil {
Expand Down
120 changes: 120 additions & 0 deletions task/backend/middleware/check_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package middleware

import (
"context"
"fmt"

"github.com/influxdata/influxdb"
)

// CoordinatingCheckService acts as a CheckService decorator that handles coordinating the api request
// with the required task control actions asynchronously via a message dispatcher
type CoordinatingCheckService struct {
influxdb.CheckService
coordinator Coordinator
taskService influxdb.TaskService
}

// NewCheckService constructs a new coordinating check service
func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingCheckService {
c := &CoordinatingCheckService{
CheckService: cs,
taskService: ts,
coordinator: coordinator,
}

return c
}

// CreateCheck Creates a check and Publishes the change it can be scheduled.
func (cs *CoordinatingCheckService) CreateCheck(ctx context.Context, c influxdb.Check, userID influxdb.ID) error {

if err := cs.CheckService.CreateCheck(ctx, c, userID); err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, c.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskCreated(ctx, t); err != nil {
if derr := cs.CheckService.DeleteCheck(ctx, c.GetID()); derr != nil {
return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr)
}

return err
}

return nil
}

// UpdateCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.UpdateCheck(ctx, id, c)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
}

// PatchCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.PatchCheck(ctx, id, upd)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)

}

// DeleteCheck delete the check and publishes the change, to allow the task owner to find out about this change faster.
func (cs *CoordinatingCheckService) DeleteCheck(ctx context.Context, id influxdb.ID) error {
check, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, check.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskDeleted(ctx, t.ID); err != nil {
return err
}

return cs.CheckService.DeleteCheck(ctx, id)
}
234 changes: 234 additions & 0 deletions task/backend/middleware/check_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package middleware_test

import (
"context"
"fmt"
"testing"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/check"
"github.com/influxdata/influxdb/notification/rule"
"github.com/influxdata/influxdb/task/backend/middleware"
)

type pipingCoordinator struct {
err error
taskCreatedPipe chan *influxdb.Task
taskUpdatedPipe chan *influxdb.Task
taskDeletedPipe chan influxdb.ID
}

func (p *pipingCoordinator) taskCreatedChan() <-chan *influxdb.Task {
if p.taskCreatedPipe == nil {
p.taskCreatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskCreatedPipe
}
func (p *pipingCoordinator) taskUpdatedChan() <-chan *influxdb.Task {
if p.taskUpdatedPipe == nil {
p.taskUpdatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskUpdatedPipe
}
func (p *pipingCoordinator) taskDeletedChan() <-chan influxdb.ID {
if p.taskDeletedPipe == nil {
p.taskDeletedPipe = make(chan influxdb.ID, 1)
}
return p.taskDeletedPipe
}

func (p *pipingCoordinator) TaskCreated(_ context.Context, t *influxdb.Task) error {
if p.taskCreatedPipe != nil {
p.taskCreatedPipe <- t
}
return p.err
}
func (p *pipingCoordinator) TaskUpdated(_ context.Context, from, to *influxdb.Task) error {
if p.taskUpdatedPipe != nil {
p.taskUpdatedPipe <- to
}
return p.err
}
func (p *pipingCoordinator) TaskDeleted(_ context.Context, id influxdb.ID) error {
if p.taskDeletedPipe != nil {
p.taskDeletedPipe <- id
}
return p.err
}
func (p *pipingCoordinator) RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error {
return p.err
}
func (p *pipingCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}
func (p *pipingCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}

type mockedSvc struct {
taskSvc *mock.TaskService
checkSvc *mock.CheckService
notificationSvc *mock.NotificationRuleStore
pipingCoordinator *pipingCoordinator
}

func newMockServices() mockedSvc {
return mockedSvc{
taskSvc: &mock.TaskService{
FindTaskByIDFn: func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { return &influxdb.Task{ID: id}, nil },
CreateTaskFn: func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) { return &influxdb.Task{ID: 1}, nil },
UpdateTaskFn: func(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) {
return &influxdb.Task{ID: id}, nil
},
DeleteTaskFn: func(context.Context, influxdb.ID) error { return nil },
},
checkSvc: &mock.CheckService{
FindCheckByIDFn: func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
CreateCheckFn: func(context.Context, influxdb.Check, influxdb.ID) error { return nil },
UpdateCheckFn: func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { return c, nil },
PatchCheckFn: func(_ context.Context, id influxdb.ID, _ influxdb.CheckUpdate) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
DeleteCheckFn: func(context.Context, influxdb.ID) error { return nil },
},
notificationSvc: &mock.NotificationRuleStore{
FindNotificationRuleByIDF: func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
CreateNotificationRuleF: func(context.Context, influxdb.NotificationRule, influxdb.ID) error { return nil },
UpdateNotificationRuleF: func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) {
return c, nil
},
PatchNotificationRuleF: func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
DeleteNotificationRuleF: func(context.Context, influxdb.ID) error { return nil },
},
pipingCoordinator: &pipingCoordinator{},
}
}

func newCheckSvcStack() (mockedSvc, *middleware.CoordinatingCheckService) {
msvcs := newMockServices()
return msvcs, middleware.NewCheckService(msvcs.checkSvc, msvcs.taskSvc, msvcs.pipingCoordinator)
}

func TestCheckCreate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskCreatedChan()

check := &check.Deadman{}
check.SetTaskID(4)

err := checkService.CreateCheck(context.Background(), check, 1)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}

mocks.pipingCoordinator.err = fmt.Errorf("bad")
mocks.checkSvc.DeleteCheckFn = func(context.Context, influxdb.ID) error { return fmt.Errorf("AARGH") }

err = checkService.CreateCheck(context.Background(), check, 1)
if err.Error() != "schedule task failed: bad\n\tcleanup also failed: AARGH" {
t.Fatal(err)
}
}

func TestCheckUpdate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
c.SetTaskID(10)
return c, nil
}

deadman := &check.Deadman{}
deadman.SetTaskID(4)

check, err := checkService.UpdateCheck(context.Background(), 1, deadman)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckPatch(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

deadman := &check.Deadman{}
deadman.SetTaskID(4)

mocks.checkSvc.PatchCheckFn = func(context.Context, influxdb.ID, influxdb.CheckUpdate) (influxdb.Check, error) {
return deadman, nil
}

check, err := checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{})
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckDelete(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskDeletedChan()

mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
c.SetTaskID(21)
return c, nil
}

err := checkService.DeleteCheck(context.Background(), 1)
if err != nil {
t.Fatal(err)
}

select {
case id := <-ch:
if id != influxdb.ID(21) {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}
Loading

0 comments on commit fc0d236

Please sign in to comment.