Skip to content

Commit

Permalink
feat(pkger): add support for applying checks
Browse files Browse the repository at this point in the history
  • Loading branch information
jsteenb2 committed Dec 18, 2019
1 parent ecbc061 commit 5d5d708
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 22 deletions.
8 changes: 8 additions & 0 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ func TestLauncher_Pkger(t *testing.T) {
assert.Equal(t, "rucket_1", bkts[0].Name)
hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1")

checks := sum1.Checks
require.Len(t, checks, 2)
for i, ch := range checks {
assert.NotZero(t, ch.Check.GetID())
assert.Equal(t, fmt.Sprintf("check_%d", i), ch.Check.GetName())
hasLabelAssociations(t, ch.LabelAssociations, 1, "label_1")
}

dashs := sum1.Dashboards
require.Len(t, dashs, 1)
assert.NotZero(t, dashs[0].ID)
Expand Down
3 changes: 1 addition & 2 deletions notification/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (b Base) generateTaskOption() ast.Statement {
}

func (b Base) generateFluxASTCheckDefinition(checkType string) ast.Statement {
props := []*ast.Property{}
props = append(props, flux.Property("_check_id", flux.String(b.ID.String())))
props := append([]*ast.Property{}, flux.Property("_check_id", flux.String(b.ID.String())))
props = append(props, flux.Property("_check_name", flux.String(b.Name)))
props = append(props, flux.Property("_type", flux.String(checkType)))

Expand Down
3 changes: 1 addition & 2 deletions notification/check/deadman.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (c Deadman) generateFluxASTBody() []ast.Statement {
statements = append(statements, c.generateFluxASTCheckDefinition("deadman"))
statements = append(statements, c.generateLevelFn())
statements = append(statements, c.generateFluxASTMessageFunction())
statements = append(statements, c.generateFluxASTChecksFunction())
return statements
return append(statements, c.generateFluxASTChecksFunction())
}

func (c Deadman) generateLevelFn() ast.Statement {
Expand Down
38 changes: 26 additions & 12 deletions notification/check/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,16 @@ func (t Threshold) GenerateFluxAST() (*ast.Package, error) {
return nil, fmt.Errorf("expect a single file to be returned from query parsing got %d", len(p.Files))
}

fields := getFields(p)
if len(fields) != 1 {
return nil, fmt.Errorf("expected a single field but got: %s", fields)
}

f := p.Files[0]
assignPipelineToData(f)

f.Imports = append(f.Imports, flux.Imports("influxdata/influxdb/monitor", "influxdata/influxdb/v1")...)
f.Body = append(f.Body, t.generateFluxASTBody()...)
f.Body = append(f.Body, t.generateFluxASTBody(fields[0])...)

return p, nil
}
Expand All @@ -146,7 +151,7 @@ func (t Threshold) getSelectedField() (string, error) {
if kv.Key == "_field" && len(kv.Values) != 1 {
return "", fmt.Errorf("expect there to be a single field value in builder config")
}
if kv.Key == "_field" && len(kv.Values) == 1 {
if kv.Key == "_field" {
return kv.Values[0], nil
}
}
Expand Down Expand Up @@ -222,6 +227,22 @@ func removeAggregateWindow(pkg *ast.Package) {
})
}

func getFields(pkg *ast.Package) []string {
var fields []string
ast.Visit(pkg, func(n ast.Node) {
if fn, ok := n.(*ast.BinaryExpression); ok {
if me, ok := fn.Left.(*ast.MemberExpression); ok {
if me.Property.Key() == "_field" {
if str, ok := fn.Right.(*ast.StringLiteral); ok {
fields = append(fields, str.Value)
}
}
}
}
})
return fields
}

func assignPipelineToData(f *ast.File) error {
if len(f.Body) != 1 {
return fmt.Errorf("expected there to be a single statement in the flux script body, recieved %d", len(f.Body))
Expand Down Expand Up @@ -249,11 +270,11 @@ func assignPipelineToData(f *ast.File) error {
return nil
}

func (t Threshold) generateFluxASTBody() []ast.Statement {
func (t Threshold) generateFluxASTBody(field string) []ast.Statement {
var statements []ast.Statement
statements = append(statements, t.generateTaskOption())
statements = append(statements, t.generateFluxASTCheckDefinition("threshold"))
statements = append(statements, t.generateFluxASTThresholdFunctions()...)
statements = append(statements, t.generateFluxASTThresholdFunctions(field)...)
statements = append(statements, t.generateFluxASTMessageFunction())
statements = append(statements, t.generateFluxASTChecksFunction())
return statements
Expand All @@ -280,16 +301,9 @@ func (t Threshold) generateFluxASTChecksCall() *ast.CallExpression {
return flux.Call(flux.Member("monitor", "check"), flux.Object(objectProps...))
}

func (t Threshold) generateFluxASTThresholdFunctions() []ast.Statement {
func (t Threshold) generateFluxASTThresholdFunctions(field string) []ast.Statement {
thresholdStatements := make([]ast.Statement, len(t.Thresholds))

field, err := t.getSelectedField()
if err != nil {
// the error here should never happen since it should be validated before this
// function is ever called.
panic(err)
}

// This assumes that the ThresholdConfigs we've been provided do not have duplicates.
for k, v := range t.Thresholds {
thresholdStatements[k] = v.generateFluxASTThresholdFunction(field)
Expand Down
26 changes: 20 additions & 6 deletions pkger/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,8 @@ const (
)

type check struct {
id influxdb.ID
orgID influxdb.ID
kind checkKind
name string
description string
Expand All @@ -837,6 +839,13 @@ type check struct {
existing influxdb.Check
}

func (c *check) ID() influxdb.ID {
if c.existing != nil {
return c.existing.GetID()
}
return c.id
}

func (c *check) Name() string {
return c.name
}
Expand All @@ -845,8 +854,18 @@ func (c *check) ResourceType() influxdb.ResourceType {
return KindCheck.ResourceType()
}

func (c *check) Status() influxdb.Status {
status := influxdb.Status(c.status)
if status == "" {
status = influxdb.TaskStatusActive
}
return status
}

func (c *check) summarize() SummaryCheck {
base := icheck.Base{
ID: c.ID(),
OrgID: c.orgID,
Name: c.Name(),
Description: c.description,
Every: toNotificationDuration(c.every),
Expand All @@ -858,13 +877,8 @@ func (c *check) summarize() SummaryCheck {
base.Tags = append(base.Tags, influxdb.Tag{Key: tag.k, Value: tag.v})
}

status := influxdb.Status(c.status)
if status == "" {
status = influxdb.TaskStatusActive
}

sum := SummaryCheck{
Status: status,
Status: c.Status(),
LabelAssociations: toSummaryLabels(c.labels...),
}
switch c.kind {
Expand Down
93 changes: 93 additions & 0 deletions pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
// primary resources
s.applyVariables(pkg.variables()),
s.applyBuckets(pkg.buckets()),
s.applyChecks(pkg.checks()),
s.applyDashboards(pkg.dashboards()),
s.applyNotificationEndpoints(pkg.notificationEndpoints()),
s.applyTelegrafs(pkg.telegrafs()),
Expand Down Expand Up @@ -1076,6 +1077,98 @@ func (s *Service) applyBucket(ctx context.Context, b bucket) (influxdb.Bucket, e
return influxBucket, nil
}

func (s *Service) applyChecks(checks []*check) applier {
const resource = "check"

mutex := new(doMutex)
rollbackChecks := make([]*check, 0, len(checks))

createFn := func(ctx context.Context, i int, orgID, userID influxdb.ID) *applyErrBody {
var c check
mutex.Do(func() {
checks[i].orgID = orgID
c = *checks[i]
})

influxBucket, err := s.applyCheck(ctx, c, userID)
if err != nil {
return &applyErrBody{
name: c.Name(),
msg: err.Error(),
}
}

mutex.Do(func() {
checks[i].id = influxBucket.GetID()
rollbackChecks = append(rollbackChecks, checks[i])
})

return nil
}

return applier{
creater: creater{
entries: len(checks),
fn: createFn,
},
rollbacker: rollbacker{
resource: resource,
fn: func() error { return s.rollbackChecks(rollbackChecks) },
},
}
}

func (s *Service) rollbackChecks(checks []*check) error {
var errs []string
for _, c := range checks {
if c.existing == nil {
err := s.checkSVC.DeleteCheck(context.Background(), c.ID())
if err != nil {
errs = append(errs, c.ID().String())
}
continue
}

_, err := s.checkSVC.UpdateCheck(context.Background(), c.ID(), influxdb.CheckCreate{
Check: c.summarize().Check,
Status: influxdb.Status(c.status),
})
if err != nil {
errs = append(errs, c.ID().String())
}
}

if len(errs) > 0 {
// TODO: fixup error
return fmt.Errorf(`check_ids=[%s] err="unable to delete"`, strings.Join(errs, ", "))
}

return nil
}

func (s *Service) applyCheck(ctx context.Context, c check, userID influxdb.ID) (influxdb.Check, error) {
if c.existing != nil {
influxCheck, err := s.checkSVC.UpdateCheck(ctx, c.ID(), influxdb.CheckCreate{
Check: c.summarize().Check,
Status: c.Status(),
})
if err != nil {
return nil, err
}
return influxCheck, nil
}

checkStub := influxdb.CheckCreate{
Check: c.summarize().Check,
Status: c.Status(),
}
err := s.checkSVC.CreateCheck(ctx, checkStub, userID)
if err != nil {
return nil, err
}
return checkStub.Check, nil
}

func (s *Service) applyDashboards(dashboards []*dashboard) applier {
const resource = "dashboard"

Expand Down
Loading

0 comments on commit 5d5d708

Please sign in to comment.