Skip to content

Commit

Permalink
feat(task): Add task middleware's for checks and notifications
Browse files Browse the repository at this point in the history
To have checks and notifications happen transactionally we need to be
able to alert the task system when a new task was created using the checks and notifications systems.
These two new middlewares allow us to inform the task system of a update
to a task that was created through the check or notification systems.
  • Loading branch information
lyondhill committed Aug 26, 2019
1 parent 669b5f9 commit bbf92be
Show file tree
Hide file tree
Showing 6 changed files with 642 additions and 3 deletions.
14 changes: 12 additions & 2 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
labelSvc platform.LabelService = m.kvService
secretSvc platform.SecretService = m.kvService
lookupSvc platform.LookupService = m.kvService
notificationRuleSvc platform.NotificationRuleStore = m.kvService
notificationEndpointSvc platform.NotificationEndpointService = m.kvService
checkSvc platform.CheckService = m.kvService
)

switch m.secretStore {
Expand Down Expand Up @@ -565,6 +563,18 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.taskControlService = combinedTaskService
}

var checkSvc platform.CheckService
{
coordinator := coordinator.New(m.logger, m.scheduler)
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.New(m.logger, m.scheduler)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
}

// NATS streaming server
m.natsServer = nats.NewServer()
if err := m.natsServer.Open(); err != nil {
Expand Down
120 changes: 120 additions & 0 deletions task/backend/middleware/check_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package middleware

import (
"context"
"fmt"

"github.com/influxdata/influxdb"
)

// CoordinatingCheckService acts as a CheckService decorator that handles coordinating the api request
// with the required task control actions asynchronously via a message dispatcher
type CoordinatingCheckService struct {
influxdb.CheckService
coordinator Coordinator
taskService influxdb.TaskService
}

// NewCheckService constructs a new coordinating check service
func NewCheckService(cs influxdb.CheckService, ts influxdb.TaskService, coordinator Coordinator) *CoordinatingCheckService {
c := &CoordinatingCheckService{
CheckService: cs,
taskService: ts,
coordinator: coordinator,
}

return c
}

// CreateCheck Creates a check and Publishes the change it can be scheduled.
func (cs *CoordinatingCheckService) CreateCheck(ctx context.Context, c influxdb.Check, userID influxdb.ID) error {

if err := cs.CheckService.CreateCheck(ctx, c, userID); err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, c.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskCreated(ctx, t); err != nil {
if derr := cs.CheckService.DeleteCheck(ctx, c.GetID()); derr != nil {
return fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, derr)
}

return err
}

return nil
}

// UpdateCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb.ID, c influxdb.Check) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.UpdateCheck(ctx, id, c)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)
}

// PatchCheck Updates a check and publishes the change so the task owner can act on the update
func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.ID, upd influxdb.CheckUpdate) (influxdb.Check, error) {
from, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return nil, err
}

fromTask, err := cs.taskService.FindTaskByID(ctx, from.GetTaskID())
if err != nil {
return nil, err
}

to, err := cs.CheckService.PatchCheck(ctx, id, upd)
if err != nil {
return to, err
}

toTask, err := cs.taskService.FindTaskByID(ctx, to.GetTaskID())
if err != nil {
return nil, err
}

return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask)

}

// DeleteCheck delete the check and publishes the change, to allow the task owner to find out about this change faster.
func (cs *CoordinatingCheckService) DeleteCheck(ctx context.Context, id influxdb.ID) error {
check, err := cs.CheckService.FindCheckByID(ctx, id)
if err != nil {
return err
}

t, err := cs.taskService.FindTaskByID(ctx, check.GetTaskID())
if err != nil {
return err
}

if err := cs.coordinator.TaskDeleted(ctx, t.ID); err != nil {
return err
}

return cs.CheckService.DeleteCheck(ctx, id)
}
Loading

0 comments on commit bbf92be

Please sign in to comment.