Skip to content

Commit

Permalink
feat(task): Add task middleware's for checks and notifications (#14809)
Browse files Browse the repository at this point in the history
To have checks and notifications happen transactionally we need to be
able to alert the task system when a new task was created using the checks and notifications systems.
These two new middlewares allow us to inform the task system of a update
to a task that was created through the check or notification systems.
  • Loading branch information
lyondhill authored Aug 26, 2019
1 parent 3e16be7 commit ee9e622
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v2.0.0-alpha.17 [unreleased]

### Features
1. [14809](https://github.com/influxdata/influxdb/pull/14809): Add task middleware's for checks and notifications
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.
1. [14604](https://github.com/influxdata/influxdb/pull/14604): When getting task runs from the API, runs will be returned in order of most recently scheduled first.
Expand Down
14 changes: 12 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
labelSvc platform.LabelService = m.kvService
secretSvc platform.SecretService = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationRuleSvc platform.NotificationRuleStore = m.kvService
notificationEndpointSvc platform.NotificationEndpointService = m.kvService
checkSvc platform.CheckService = m.kvService
)

switch m.secretStore {
Expand Down Expand Up @@ -566,6 +564,18 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.taskControlService = combinedTaskService
}

var checkSvc platform.CheckService
{
coordinator := coordinator.New(m.logger, m.scheduler)
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.New(m.logger, m.scheduler)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
}

// NATS streaming server
m.natsServer = nats.NewServer()
if err := m.natsServer.Open(); err != nil {
Expand Down
120 changes: 120 additions & 0 deletions task/backend/middleware/check_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package middleware

import (
"context"
"fmt"

"github.com/influxdata/influxdb"
)

// CoordinatingCheckService acts as a CheckService decorator that handles coordinating the api request
// with the required task control actions asynchronously via a message dispatcher
type CoordinatingCheckService struct {
influxdb.CheckService
coordinator Coordinator
taskService influxdb.TaskService
}

// NewCheckService constructs a new coordinating check service
func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingCheckService {
c := &CoordinatingCheckService{
CheckService: cs,
taskService: ts,
coordinator: coordinator,
}

return c
}

// CreateCheck Creates a check and Publishes the change it can be scheduled.
func (cs *CoordinatingCheckService) CreateCheck(ctx context.Context, c influxdb.Check, userID influxdb.ID) error {

if err := cs.CheckService.CreateCheck(ctx, c, userID); err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, c.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskCreated(ctx, t); err != nil {
if derr := cs.CheckService.DeleteCheck(ctx, c.GetID()); derr != nil {
return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr)
}

return err
}

return nil
}

// UpdateCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.UpdateCheck(ctx, id, c)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
}

// PatchCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.PatchCheck(ctx, id, upd)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)

}

// DeleteCheck delete the check and publishes the change, to allow the task owner to find out about this change faster.
func (cs *CoordinatingCheckService) DeleteCheck(ctx context.Context, id influxdb.ID) error {
check, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, check.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskDeleted(ctx, t.ID); err != nil {
return err
}

return cs.CheckService.DeleteCheck(ctx, id)
}
234 changes: 234 additions & 0 deletions task/backend/middleware/check_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package middleware_test

import (
"context"
"fmt"
"testing"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/check"
"github.com/influxdata/influxdb/notification/rule"
"github.com/influxdata/influxdb/task/backend/middleware"
)

type pipingCoordinator struct {
err error
taskCreatedPipe chan *influxdb.Task
taskUpdatedPipe chan *influxdb.Task
taskDeletedPipe chan influxdb.ID
}

func (p *pipingCoordinator) taskCreatedChan() <-chan *influxdb.Task {
if p.taskCreatedPipe == nil {
p.taskCreatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskCreatedPipe
}
func (p *pipingCoordinator) taskUpdatedChan() <-chan *influxdb.Task {
if p.taskUpdatedPipe == nil {
p.taskUpdatedPipe = make(chan *influxdb.Task, 1)
}
return p.taskUpdatedPipe
}
func (p *pipingCoordinator) taskDeletedChan() <-chan influxdb.ID {
if p.taskDeletedPipe == nil {
p.taskDeletedPipe = make(chan influxdb.ID, 1)
}
return p.taskDeletedPipe
}

func (p *pipingCoordinator) TaskCreated(_ context.Context, t *influxdb.Task) error {
if p.taskCreatedPipe != nil {
p.taskCreatedPipe <- t
}
return p.err
}
func (p *pipingCoordinator) TaskUpdated(_ context.Context, from, to *influxdb.Task) error {
if p.taskUpdatedPipe != nil {
p.taskUpdatedPipe <- to
}
return p.err
}
func (p *pipingCoordinator) TaskDeleted(_ context.Context, id influxdb.ID) error {
if p.taskDeletedPipe != nil {
p.taskDeletedPipe <- id
}
return p.err
}
func (p *pipingCoordinator) RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error {
return p.err
}
func (p *pipingCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}
func (p *pipingCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
return p.err
}

type mockedSvc struct {
taskSvc *mock.TaskService
checkSvc *mock.CheckService
notificationSvc *mock.NotificationRuleStore
pipingCoordinator *pipingCoordinator
}

func newMockServices() mockedSvc {
return mockedSvc{
taskSvc: &mock.TaskService{
FindTaskByIDFn: func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { return &influxdb.Task{ID: id}, nil },
CreateTaskFn: func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) { return &influxdb.Task{ID: 1}, nil },
UpdateTaskFn: func(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) {
return &influxdb.Task{ID: id}, nil
},
DeleteTaskFn: func(context.Context, influxdb.ID) error { return nil },
},
checkSvc: &mock.CheckService{
FindCheckByIDFn: func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
CreateCheckFn: func(context.Context, influxdb.Check, influxdb.ID) error { return nil },
UpdateCheckFn: func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { return c, nil },
PatchCheckFn: func(_ context.Context, id influxdb.ID, _ influxdb.CheckUpdate) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
return c, nil
},
DeleteCheckFn: func(context.Context, influxdb.ID) error { return nil },
},
notificationSvc: &mock.NotificationRuleStore{
FindNotificationRuleByIDF: func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
CreateNotificationRuleF: func(context.Context, influxdb.NotificationRule, influxdb.ID) error { return nil },
UpdateNotificationRuleF: func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) {
return c, nil
},
PatchNotificationRuleF: func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) {
c := &rule.HTTP{}
c.SetID(id)
return c, nil
},
DeleteNotificationRuleF: func(context.Context, influxdb.ID) error { return nil },
},
pipingCoordinator: &pipingCoordinator{},
}
}

func newCheckSvcStack() (mockedSvc, *middleware.CoordinatingCheckService) {
msvcs := newMockServices()
return msvcs, middleware.NewCheckService(msvcs.checkSvc, msvcs.taskSvc, msvcs.pipingCoordinator)
}

func TestCheckCreate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskCreatedChan()

check := &check.Deadman{}
check.SetTaskID(4)

err := checkService.CreateCheck(context.Background(), check, 1)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}

mocks.pipingCoordinator.err = fmt.Errorf("bad")
mocks.checkSvc.DeleteCheckFn = func(context.Context, influxdb.ID) error { return fmt.Errorf("AARGH") }

err = checkService.CreateCheck(context.Background(), check, 1)
if err.Error() != "schedule task failed: bad\n\tcleanup also failed: AARGH" {
t.Fatal(err)
}
}

func TestCheckUpdate(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
c.SetTaskID(10)
return c, nil
}

deadman := &check.Deadman{}
deadman.SetTaskID(4)

check, err := checkService.UpdateCheck(context.Background(), 1, deadman)
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckPatch(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskUpdatedChan()

deadman := &check.Deadman{}
deadman.SetTaskID(4)

mocks.checkSvc.PatchCheckFn = func(context.Context, influxdb.ID, influxdb.CheckUpdate) (influxdb.Check, error) {
return deadman, nil
}

check, err := checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{})
if err != nil {
t.Fatal(err)
}

select {
case task := <-ch:
if task.ID != check.GetTaskID() {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}

func TestCheckDelete(t *testing.T) {
mocks, checkService := newCheckSvcStack()
ch := mocks.pipingCoordinator.taskDeletedChan()

mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) {
c := &check.Deadman{}
c.SetID(id)
c.SetTaskID(21)
return c, nil
}

err := checkService.DeleteCheck(context.Background(), 1)
if err != nil {
t.Fatal(err)
}

select {
case id := <-ch:
if id != influxdb.ID(21) {
t.Fatalf("task sent to coordinator doesn't match expected")
}
default:
t.Fatal("didn't receive task")
}
}
Loading

0 comments on commit ee9e622

Please sign in to comment.