From bbf92be2ca71604bdf354b41403dc51b995a63c0 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Mon, 26 Aug 2019 15:13:09 -0600 Subject: [PATCH] feat(task): Add task middleware's for checks and notifications To have checks and notifications happen transactionally we need to be able to alert the task system when a new task was created using the checks and notifications systems. These two new middlewares allow us to inform the task system of a update to a task that was created through the check or notification systems. --- cmd/influxd/launcher/launcher.go | 14 +- task/backend/middleware/check_middleware.go | 120 ++++++++ .../middleware/check_middleware_test.go | 264 ++++++++++++++++++ task/backend/middleware/middleware_test.go | 2 +- .../middleware/notification_middleware.go | 120 ++++++++ .../notification_middleware_test.go | 125 +++++++++ 6 files changed, 642 insertions(+), 3 deletions(-) create mode 100644 task/backend/middleware/check_middleware.go create mode 100644 task/backend/middleware/check_middleware_test.go create mode 100644 task/backend/middleware/notification_middleware.go create mode 100644 task/backend/middleware/notification_middleware_test.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index bf7a3581128..bc354263f82 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -459,9 +459,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { labelSvc platform.LabelService = m.kvService secretSvc platform.SecretService = m.kvService lookupSvc platform.LookupService = m.kvService - notificationRuleSvc platform.NotificationRuleStore = m.kvService notificationEndpointSvc platform.NotificationEndpointService = m.kvService - checkSvc platform.CheckService = m.kvService ) switch m.secretStore { @@ -565,6 +563,18 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.taskControlService = combinedTaskService } + var checkSvc platform.CheckService + { + coordinator := coordinator.New(m.logger, m.scheduler) + checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator) + } + + var notificationRuleSvc platform.NotificationRuleStore + { + coordinator := coordinator.New(m.logger, m.scheduler) + notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator) + } + // NATS streaming server m.natsServer = nats.NewServer() if err := m.natsServer.Open(); err != nil { diff --git a/task/backend/middleware/check_middleware.go b/task/backend/middleware/check_middleware.go new file mode 100644 index 00000000000..76a2c4b5fe7 --- /dev/null +++ b/task/backend/middleware/check_middleware.go @@ -0,0 +1,120 @@ +package middleware + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb" +) + +// CoordinatingCheckService acts as a CheckService decorator that handles coordinating the api request +// with the required task control actions asynchronously via a message dispatcher +type CoordinatingCheckService struct { + influxdb.CheckService + coordinator Coordinator + taskService influxdb.TaskService +} + +// NewCheckService constructs a new coordinating check service +func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingCheckService { + c := &CoordinatingCheckService{ + CheckService: cs, + taskService: ts, + coordinator: coordinator, + } + + return c +} + +// CreateCheck Creates a check and Publishes the change it can be scheduled. +func (cs *CoordinatingCheckService) CreateCheck(ctx context.Context, c influxdb.Check, userID influxdb.ID) error { + + if err := cs.CheckService.CreateCheck(ctx, c, userID); err != nil { + return err + } + + t, err := cs.taskService.FindTaskByID(ctx, c.GetTaskID()) + if err != nil { + return err + } + + if err := cs.coordinator.TaskCreated(ctx, t); err != nil { + if derr := cs.CheckService.DeleteCheck(ctx, c.GetID()); derr != nil { + return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr) + } + + return err + } + + return nil +} + +// UpdateCheck Updates a check and publishes the change so the task owner can act on the update +func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb.ID, c influxdb.Check) (influxdb.Check, error) { + from, err := cs.CheckService.FindCheckByID(ctx, id) + if err != nil { + return nil, err + } + + fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID()) + if err != nil { + return nil, err + } + + to, err := cs.CheckService.UpdateCheck(ctx, id, c) + if err != nil { + return to, err + } + + toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID()) + if err != nil { + return nil, err + } + + return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) +} + +// PatchCheck Updates a check and publishes the change so the task owner can act on the update +func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) { + from, err := cs.CheckService.FindCheckByID(ctx, id) + if err != nil { + return nil, err + } + + fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID()) + if err != nil { + return nil, err + } + + to, err := cs.CheckService.PatchCheck(ctx, id, upd) + if err != nil { + return to, err + } + + toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID()) + if err != nil { + return nil, err + } + + return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) + +} + +// DeleteCheck delete the check and publishes the change, to allow the task owner to find out about this change faster. +func (cs *CoordinatingCheckService) DeleteCheck(ctx context.Context, id influxdb.ID) error { + check, err := cs.CheckService.FindCheckByID(ctx, id) + if err != nil { + return err + } + + t, err := cs.taskService.FindTaskByID(ctx, check.GetTaskID()) + if err != nil { + return err + } + + if err := cs.coordinator.TaskDeleted(ctx, t.ID); err != nil { + return err + } + + return cs.CheckService.DeleteCheck(ctx, id) +} diff --git a/task/backend/middleware/check_middleware_test.go b/task/backend/middleware/check_middleware_test.go new file mode 100644 index 00000000000..7e87d51cef3 --- /dev/null +++ b/task/backend/middleware/check_middleware_test.go @@ -0,0 +1,264 @@ +package middleware_test + +import ( + "context" + "fmt" + "testing" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/mock" + "github.com/influxdata/influxdb/notification/check" + "github.com/influxdata/influxdb/notification/rule" + "github.com/influxdata/influxdb/task/backend/middleware" +) + +type pipingCoordinator struct { + err error + taskCreatedPipe chan *influxdb.Task + taskUpdatedPipe chan *influxdb.Task + taskDeletedPipe chan influxdb.ID + runCancelledPipe chan influxdb.ID + runRetriedPipe chan *influxdb.Run + runForcedPipe chan *influxdb.Run +} + +func (p *pipingCoordinator) taskCreatedChan() <-chan *influxdb.Task { + if p.taskCreatedPipe == nil { + p.taskCreatedPipe = make(chan *influxdb.Task, 1) + } + return p.taskCreatedPipe +} +func (p *pipingCoordinator) taskUpdatedChan() <-chan *influxdb.Task { + if p.taskUpdatedPipe == nil { + p.taskUpdatedPipe = make(chan *influxdb.Task, 1) + } + return p.taskUpdatedPipe +} +func (p *pipingCoordinator) taskDeletedChan() <-chan influxdb.ID { + if p.taskDeletedPipe == nil { + p.taskDeletedPipe = make(chan influxdb.ID, 1) + } + return p.taskDeletedPipe +} +func (p *pipingCoordinator) runCancelledChan() <-chan influxdb.ID { + if p.runCancelledPipe == nil { + p.runCancelledPipe = make(chan influxdb.ID, 1) + } + return p.runCancelledPipe +} +func (p *pipingCoordinator) runRetriedChan() <-chan *influxdb.Run { + if p.runRetriedPipe == nil { + p.runRetriedPipe = make(chan *influxdb.Run, 1) + } + return p.runRetriedPipe +} +func (p *pipingCoordinator) runForcedChan() <-chan *influxdb.Run { + if p.runForcedPipe == nil { + p.runForcedPipe = make(chan *influxdb.Run, 1) + } + return p.runForcedPipe +} + +func (p *pipingCoordinator) TaskCreated(_ context.Context, t *influxdb.Task) error { + if p.taskCreatedPipe != nil { + p.taskCreatedPipe <- t + } + return p.err +} +func (p *pipingCoordinator) TaskUpdated(_ context.Context, from, to *influxdb.Task) error { + if p.taskUpdatedPipe != nil { + p.taskUpdatedPipe <- to + } + return p.err +} +func (p *pipingCoordinator) TaskDeleted(_ context.Context, id influxdb.ID) error { + if p.taskDeletedPipe != nil { + p.taskDeletedPipe <- id + } + return p.err +} +func (p *pipingCoordinator) RunCancelled(ctx context.Context, taskID, runID influxdb.ID) error { + if p.runCancelledPipe != nil { + p.runCancelledPipe <- runID + } + return p.err +} +func (p *pipingCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error { + if p.runRetriedPipe != nil { + p.runRetriedPipe <- run + } + return p.err +} +func (p *pipingCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error { + if p.runForcedPipe != nil { + p.runForcedPipe <- run + } + return p.err +} + +type mockedSvc struct { + taskSvc *mock.TaskService + checkSvc *mock.CheckService + notificationSvc *mock.NotificationRuleStore + pipingCoordinator *pipingCoordinator +} + +func newMockServices() mockedSvc { + return mockedSvc{ + taskSvc: &mock.TaskService{ + FindTaskByIDFn: func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { return &influxdb.Task{ID: id}, nil }, + CreateTaskFn: func(context.Context, influxdb.TaskCreate) (*influxdb.Task, error) { return &influxdb.Task{ID: 1}, nil }, + UpdateTaskFn: func(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) { + return &influxdb.Task{ID: id}, nil + }, + DeleteTaskFn: func(context.Context, influxdb.ID) error { return nil }, + }, + checkSvc: &mock.CheckService{ + FindCheckByIDFn: func(_ context.Context, id influxdb.ID) (influxdb.Check, error) { + c := &check.Deadman{} + c.SetID(id) + return c, nil + }, + CreateCheckFn: func(context.Context, influxdb.Check, influxdb.ID) error { return nil }, + UpdateCheckFn: func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { return c, nil }, + PatchCheckFn: func(_ context.Context, id influxdb.ID, _ influxdb.CheckUpdate) (influxdb.Check, error) { + c := &check.Deadman{} + c.SetID(id) + return c, nil + }, + DeleteCheckFn: func(context.Context, influxdb.ID) error { return nil }, + }, + notificationSvc: &mock.NotificationRuleStore{ + FindNotificationRuleByIDF: func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { + c := &rule.HTTP{} + c.SetID(id) + return c, nil + }, + CreateNotificationRuleF: func(context.Context, influxdb.NotificationRule, influxdb.ID) error { return nil }, + UpdateNotificationRuleF: func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) { + return c, nil + }, + PatchNotificationRuleF: func(_ context.Context, id influxdb.ID, _ influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + c := &rule.HTTP{} + c.SetID(id) + return c, nil + }, + DeleteNotificationRuleF: func(context.Context, influxdb.ID) error { return nil }, + }, + pipingCoordinator: &pipingCoordinator{}, + } +} + +func newCheckSvcStack() (mockedSvc, *middleware.CoordinatingCheckService) { + msvcs := newMockServices() + return msvcs, middleware.NewCheckService(msvcs.checkSvc, msvcs.taskSvc, msvcs.pipingCoordinator) +} + +func TestCheckCreate(t *testing.T) { + mocks, checkService := newCheckSvcStack() + ch := mocks.pipingCoordinator.taskCreatedChan() + + check := &check.Deadman{} + check.SetTaskID(4) + + err := checkService.CreateCheck(context.Background(), check, 1) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != check.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } + + mocks.pipingCoordinator.err = fmt.Errorf("bad") + mocks.checkSvc.DeleteCheckFn = func(context.Context, influxdb.ID) error { return fmt.Errorf("AARGH") } + + err = checkService.CreateCheck(context.Background(), check, 1) + if err.Error() != "schedule task failed: bad\n\tcleanup also failed: AARGH" { + t.Fatal(err) + } +} + +func TestCheckUpdate(t *testing.T) { + mocks, checkService := newCheckSvcStack() + ch := mocks.pipingCoordinator.taskUpdatedChan() + + mocks.checkSvc.UpdateCheckFn = func(_ context.Context, _ influxdb.ID, c influxdb.Check) (influxdb.Check, error) { + c.SetTaskID(10) + return c, nil + } + + deadman := &check.Deadman{} + deadman.SetTaskID(4) + + check, err := checkService.UpdateCheck(context.Background(), 1, deadman) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != check.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +} + +func TestCheckPatch(t *testing.T) { + mocks, checkService := newCheckSvcStack() + ch := mocks.pipingCoordinator.taskUpdatedChan() + + deadman := &check.Deadman{} + deadman.SetTaskID(4) + + mocks.checkSvc.PatchCheckFn = func(context.Context, influxdb.ID, influxdb.CheckUpdate) (influxdb.Check, error) { + return deadman, nil + } + + check, err := checkService.PatchCheck(context.Background(), 1, influxdb.CheckUpdate{}) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != check.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +} + +func TestCheckDelete(t *testing.T) { + mocks, checkService := newCheckSvcStack() + ch := mocks.pipingCoordinator.taskDeletedChan() + + mocks.checkSvc.FindCheckByIDFn = func(_ context.Context, id influxdb.ID) (influxdb.Check, error) { + c := &check.Deadman{} + c.SetID(id) + c.SetTaskID(21) + return c, nil + } + + err := checkService.DeleteCheck(context.Background(), 1) + if err != nil { + t.Fatal(err) + } + + select { + case id := <-ch: + if id != influxdb.ID(21) { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +} diff --git a/task/backend/middleware/middleware_test.go b/task/backend/middleware/middleware_test.go index bf710c5a30c..dd0ea6495df 100644 --- a/task/backend/middleware/middleware_test.go +++ b/task/backend/middleware/middleware_test.go @@ -222,7 +222,7 @@ func TestCoordinatingTaskService_ClaimTaskUpdatesLatestCompleted(t *testing.T) { ts = inmemTaskService() sched = mock.NewScheduler() coord = coordinator.New(zaptest.NewLogger(t), sched) - latest = time.Now().Add(time.Second) + latest = time.Now().UTC().Add(time.Second) middleware = middleware.New(ts, coord, middleware.WithNowFunc(func() time.Time { return latest })) diff --git a/task/backend/middleware/notification_middleware.go b/task/backend/middleware/notification_middleware.go new file mode 100644 index 00000000000..2d784eb1137 --- /dev/null +++ b/task/backend/middleware/notification_middleware.go @@ -0,0 +1,120 @@ +package middleware + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb" +) + +// CoordinatingNotificationRuleStore acts as a NotificationRuleStore decorator that handles coordinating the api request +// with the required task control actions asynchronously via a message dispatcher +type CoordinatingNotificationRuleStore struct { + influxdb.NotificationRuleStore + coordinator Coordinator + taskService influxdb.TaskService +} + +// NewNotificationRuleStore constructs a new coordinating notification service +func NewNotificationRuleStore(ns influxdb.NotificationRuleStore, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingNotificationRuleStore { + c := &CoordinatingNotificationRuleStore{ + NotificationRuleStore: ns, + taskService: ts, + coordinator: coordinator, + } + + return c +} + +// CreateNotificationRule Creates a notification and Publishes the change it can be scheduled. +func (ns *CoordinatingNotificationRuleStore) CreateNotificationRule(ctx context.Context, nr influxdb.NotificationRule, userID influxdb.ID) error { + + if err := ns.NotificationRuleStore.CreateNotificationRule(ctx, nr, userID); err != nil { + return err + } + + t, err := ns.taskService.FindTaskByID(ctx, nr.GetTaskID()) + if err != nil { + return err + } + + if err := ns.coordinator.TaskCreated(ctx, t); err != nil { + if derr := ns.NotificationRuleStore.DeleteNotificationRule(ctx, nr.GetID()); derr != nil { + return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr) + } + + return err + } + + return nil +} + +// UpdateNotificationRule Updates a notification and publishes the change so the task owner can act on the update +func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context.Context, id influxdb.ID, nr influxdb.NotificationRule, uid influxdb.ID) (influxdb.NotificationRule, error) { + from, err := ns.NotificationRuleStore.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + fromTask, err := ns.taskService.FindTaskByID(ctx, from.GetTaskID()) + if err != nil { + return nil, err + } + + to, err := ns.NotificationRuleStore.UpdateNotificationRule(ctx, id, nr, uid) + if err != nil { + return to, err + } + + toTask, err := ns.taskService.FindTaskByID(ctx, to.GetTaskID()) + if err != nil { + return nil, err + } + + return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) +} + +// PatchNotificationRule Updates a notification and publishes the change so the task owner can act on the update +func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.Context, id influxdb.ID, upd influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + from, err := ns.NotificationRuleStore.FindNotificationRuleByID(ctx, id) + if err != nil { + return nil, err + } + + fromTask, err := ns.taskService.FindTaskByID(ctx, from.GetTaskID()) + if err != nil { + return nil, err + } + + to, err := ns.NotificationRuleStore.PatchNotificationRule(ctx, id, upd) + if err != nil { + return to, err + } + + toTask, err := ns.taskService.FindTaskByID(ctx, to.GetTaskID()) + if err != nil { + return nil, err + } + + return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) + +} + +// DeleteNotificationRule delete the notification and publishes the change, to allow the task owner to find out about this change faster. +func (ns *CoordinatingNotificationRuleStore) DeleteNotificationRule(ctx context.Context, id influxdb.ID) error { + notification, err := ns.NotificationRuleStore.FindNotificationRuleByID(ctx, id) + if err != nil { + return err + } + + t, err := ns.taskService.FindTaskByID(ctx, notification.GetTaskID()) + if err != nil { + return err + } + + if err := ns.coordinator.TaskDeleted(ctx, t.ID); err != nil { + return err + } + + return ns.NotificationRuleStore.DeleteNotificationRule(ctx, id) +} diff --git a/task/backend/middleware/notification_middleware_test.go b/task/backend/middleware/notification_middleware_test.go new file mode 100644 index 00000000000..de2092f6aa3 --- /dev/null +++ b/task/backend/middleware/notification_middleware_test.go @@ -0,0 +1,125 @@ +package middleware_test + +import ( + "context" + "fmt" + "testing" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/notification/rule" + "github.com/influxdata/influxdb/task/backend/middleware" +) + +func newNotificationRuleSvcStack() (mockedSvc, *middleware.CoordinatingNotificationRuleStore) { + msvcs := newMockServices() + return msvcs, middleware.NewNotificationRuleStore(msvcs.notificationSvc, msvcs.taskSvc, msvcs.pipingCoordinator) +} + +func TestNotificationRuleCreate(t *testing.T) { + mocks, nrService := newNotificationRuleSvcStack() + ch := mocks.pipingCoordinator.taskCreatedChan() + + nr := &rule.HTTP{} + nr.SetTaskID(4) + + err := nrService.CreateNotificationRule(context.Background(), nr, 1) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != nr.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } + + mocks.pipingCoordinator.err = fmt.Errorf("bad") + mocks.notificationSvc.DeleteNotificationRuleF = func(context.Context, influxdb.ID) error { return fmt.Errorf("AARGH") } + + err = nrService.CreateNotificationRule(context.Background(), nr, 1) + if err.Error() != "schedule task failed: bad\n\tcleanup also failed: AARGH" { + t.Fatal(err) + } +} + +func TestNotificationRuleUpdate(t *testing.T) { + mocks, nrService := newNotificationRuleSvcStack() + ch := mocks.pipingCoordinator.taskUpdatedChan() + + mocks.notificationSvc.UpdateNotificationRuleF = func(_ context.Context, _ influxdb.ID, c influxdb.NotificationRule, _ influxdb.ID) (influxdb.NotificationRule, error) { + c.SetTaskID(10) + return c, nil + } + + deadman := &rule.HTTP{} + deadman.SetTaskID(4) + + nr, err := nrService.UpdateNotificationRule(context.Background(), 1, deadman, 2) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != nr.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +} + +func TestNotificationRulePatch(t *testing.T) { + mocks, nrService := newNotificationRuleSvcStack() + ch := mocks.pipingCoordinator.taskUpdatedChan() + + deadman := &rule.HTTP{} + deadman.SetTaskID(4) + + mocks.notificationSvc.PatchNotificationRuleF = func(context.Context, influxdb.ID, influxdb.NotificationRuleUpdate) (influxdb.NotificationRule, error) { + return deadman, nil + } + + nr, err := nrService.PatchNotificationRule(context.Background(), 1, influxdb.NotificationRuleUpdate{}) + if err != nil { + t.Fatal(err) + } + + select { + case task := <-ch: + if task.ID != nr.GetTaskID() { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +} + +func TestNotificationRuleDelete(t *testing.T) { + mocks, nrService := newNotificationRuleSvcStack() + ch := mocks.pipingCoordinator.taskDeletedChan() + + mocks.notificationSvc.FindNotificationRuleByIDF = func(_ context.Context, id influxdb.ID) (influxdb.NotificationRule, error) { + c := &rule.HTTP{} + c.SetID(id) + c.SetTaskID(21) + return c, nil + } + + err := nrService.DeleteNotificationRule(context.Background(), 1) + if err != nil { + t.Fatal(err) + } + + select { + case id := <-ch: + if id != influxdb.ID(21) { + t.Fatalf("task sent to coordinator doesn't match expected") + } + default: + t.Fatal("didn't receive task") + } +}