From a7c1fa88ccf174a1d39be77e02880ac1a0c97c35 Mon Sep 17 00:00:00 2001 From: Vyacheslav Spiridonov Date: Thu, 17 Jun 2021 12:42:09 +0300 Subject: [PATCH] feat(alerta-service): Add custom severities to Alerta handler Fix #2056 This commit allows usage of all or some Alerta Severities. It provides two keywords to fine tune kapacitor built-in severities. 1. First, you can rename kapacitor serverity levels: crit, warn, info, ok to any other severities configured in your Alerta: |alert() // ... .alerta() // ... .renameSeverity('crit', 'major') .renameSeverity('info', 'notice') I suppose this will cover most of the cases. But if you do want a lot of severity levels: 2. You can add custom severity levels, which will be avaluated on Alerta handler level after built-in alert was triggered. |alert() // ... .warn(lambda: "cpu" > 50) .alerta() // ... .addSeverity('minor', 3, lambda: "cpu" > 60) .addSeverity('major', 2, lambda: "cpu" > 70) .addSeverity('critical', 1, lambda: "cpu" > 80) .addSeverity('fatal', 0, lambda: "cpu" > 90) Note: evaluation of addSeverity condition only happen after build-in alert is triggered, so you need some entry point (like .warn() in exmple), which should cover all range of values interesting to you. Note: this severities use Alerta's code order - higher severity has lower code (0 for fatal, 9 for ok) Note: .addSeverity() is quite useless in combination with .stateChangesOnly(), but Alerta has decent deduplication mechanism, so it shouldn't be a problem --- CHANGELOG.md | 1 + alert.go | 17 +++ integrations/streamer_test.go | 25 +++++ pipeline/alert.go | 32 ++++++ pipeline/tick/alert.go | 16 ++- pipeline/tick/alert_test.go | 10 ++ server/server_test.go | 1 + services/alerta/alertatest/alertatest.go | 1 + services/alerta/service.go | 125 ++++++++++++++++++++--- 9 files changed, 209 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f3bcff28..a3aa00d17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks - [#2560](https://github.com/influxdata/kapacitor/pull/2560): enable new-style slack apps - [#2576](https://github.com/influxdata/kapacitor/pull/2576): shared secret auth to influxdb in OSS +- [#2584](https://github.com/influxdata/kapacitor/pull/2584): Add custom severities to Alerta handler ### Bugfixes - [#2564](https://github.com/influxdata/kapacitor/pull/2564): Fix a panic in the scraper handler when debug mode is enabled diff --git a/alert.go b/alert.go index 82513381f..895ba5789 100644 --- a/alert.go +++ b/alert.go @@ -366,6 +366,23 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a if a.Environment != "" { c.Environment = a.Environment } + if len(a.RenameSeverities) != 0 { + c.RenameSeverities = a.RenameSeverities + } + if len(a.ExtraSeverities) != 0 { + c.ExtraSeverityExpressions = make([]stateful.Expression, len(a.ExtraSeverities)) + c.ExtraSeverityNames = make([]string, len(a.ExtraSeverities)) + c.ExtraSeverityScopePools = make([]stateful.ScopePool, len(a.ExtraSeverities)) + for i, severity := range a.ExtraSeverities { + statefulExpression, expressionCompileError := stateful.NewExpression(severity.Condition.Expression) + if expressionCompileError != nil { + return nil, fmt.Errorf("Failed to compile stateful expression for Alerta extra severity %s: %s", severity.Name, expressionCompileError) + } + c.ExtraSeverityExpressions[i] = statefulExpression + c.ExtraSeverityNames[i] = severity.Name + c.ExtraSeverityScopePools[i] = stateful.NewScopePool(ast.FindReferenceVariables(severity.Condition.Expression)) + } + } if a.Group != "" { c.Group = a.Group } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index e8a74d951..056db568f 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -9197,11 +9197,18 @@ stream .token('testtoken1234567') .environment('production') .timeout(1h) + .alerta() + .token('testtoken7654321') + .environment('production') + .timeout(1h) + .addSeverity('fatal', 1, lambda: "count" > 9.0) + .addSeverity('disaster', 0, lambda: "count" > 15.0) .alerta() .token('anothertesttoken') .resource('resource: {{ index .Tags "host" }}') .event('event: {{ .TaskName }}') .environment('{{ index .Tags "host" }}') + .renameSeverity('crit', 'major') .origin('override') .group('{{ .ID }}') .value('{{ index .Fields "count" }}') @@ -9227,6 +9234,23 @@ stream Event: "serverA", Group: "host=serverA", Environment: "production", + Severity: "critical", + Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", + Origin: "Kapacitor", + Service: []string{"cpu"}, + Correlate: []string{"cpu"}, + Timeout: 3600, + }, + }, + alertatest.Request{ + URL: "/alert", + Authorization: "Bearer testtoken7654321", + PostData: alertatest.PostData{ + Resource: "cpu", + Event: "serverA", + Group: "host=serverA", + Environment: "production", + Severity: "fatal", Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", Origin: "Kapacitor", Service: []string{"cpu"}, @@ -9242,6 +9266,7 @@ stream Event: "event: TestStream_Alert", Group: "serverA", Environment: "serverA", + Severity: "major", Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", Origin: "override", Service: []string{"serviceA", "serviceB", "cpu"}, diff --git a/pipeline/alert.go b/pipeline/alert.go index ca21ea504..6025f0b33 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "sort" "strings" "time" @@ -1219,6 +1220,12 @@ func (n *AlertNodeData) Alerta() *AlertaHandler { return alerta } +type AlertaCustomSeverity struct { + Name string `json:"name"` + Code int64 `json:"code"` + Condition *ast.LambdaNode `json:"condition"` +} + // tick:embedded:AlertNode.Alerta type AlertaHandler struct { *AlertNodeData `json:"-"` @@ -1242,6 +1249,13 @@ type AlertaHandler struct { // Defaut is set from the configuration. Environment string `json:"environment"` + // Alerta supports many different severity levels. And it allows you to add even more. + // To benefit from this model we can use two ways: + // Rename kapacitor built-in severities to match some of alerta's severities + RenameSeverities map[string]string `tick:"RenameSeverity" json:"rename-severities"` + // Add post-processing to kapacitor alerts to fine tune severity levels + ExtraSeverities []*AlertaCustomSeverity `tick:"AddSeverity" json:"add-severities"` + // Alerta group. // Can be a template and has access to the same data as the AlertNode.Details property. // Default: {{ .Group }} @@ -1268,6 +1282,24 @@ type AlertaHandler struct { Timeout time.Duration `json:"timeout"` } +func (a *AlertaHandler) RenameSeverity(kapacitorName string, alertaName string) *AlertaHandler { + if a.RenameSeverities == nil { + a.RenameSeverities = make(map[string]string) + } + a.RenameSeverities[kapacitorName] = alertaName + return a +} +func (a *AlertaHandler) AddSeverity(name string, code int64, condition *ast.LambdaNode) *AlertaHandler { + a.ExtraSeverities = append(a.ExtraSeverities, &AlertaCustomSeverity{ + Name: name, + Code: code, + Condition: condition, + }) + // Alerta severities have descending order: higher severity has lower code (1 for critical, 9 for ok) + sort.SliceStable(a.ExtraSeverities, func(i, j int) bool { return a.ExtraSeverities[i].Code < a.ExtraSeverities[j].Code }) + return a +} + // List of effected services. // If not specified defaults to the Name of the stream. // tick:property diff --git a/pipeline/tick/alert.go b/pipeline/tick/alert.go index b80eb41cd..2939a992c 100644 --- a/pipeline/tick/alert.go +++ b/pipeline/tick/alert.go @@ -220,8 +220,20 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) { Dot("token", h.Token). Dot("resource", h.Resource). Dot("event", h.Event). - Dot("environment", h.Environment). - Dot("group", h.Group). + Dot("environment", h.Environment) + + var severitiesOrder = []string{"ok", "info", "warn", "crit"} + for _, k := range severitiesOrder { + if val, ok := h.RenameSeverities[k]; ok { + n.Dot("renameSeverity", k, val) + } + } + + for _, k := range h.ExtraSeverities { + n.Dot("addSeverity", k.Name, k.Code, k.Condition) + } + + n.Dot("group", h.Group). Dot("value", h.Value). Dot("origin", h.Origin). Dot("services", args(h.Service)...). diff --git a/pipeline/tick/alert_test.go b/pipeline/tick/alert_test.go index 0d371091a..67fb6fd90 100644 --- a/pipeline/tick/alert_test.go +++ b/pipeline/tick/alert_test.go @@ -661,6 +661,14 @@ func TestAlertAlerta(t *testing.T) { handler.Resource = "Harbinger" handler.Event = "Jump through Omega-4 Relay" handler.Environment = "Collector base" + handler.RenameSeverities = make(map[string]string) + handler.RenameSeverities["info"] = "notice" + handler.ExtraSeverities = make([]*pipeline.AlertaCustomSeverity, 1) + handler.ExtraSeverities[0] = &pipeline.AlertaCustomSeverity{ + Name: "major", + Code: 2, + Condition: newLambda(85), + } handler.Group = "I brought Jack, Miranda and Tali" handler.Value = "Save the Galaxy" handler.Origin = "Omega" @@ -680,6 +688,8 @@ func TestAlertAlerta(t *testing.T) { .resource('Harbinger') .event('Jump through Omega-4 Relay') .environment('Collector base') + .renameSeverity('info', 'notice') + .addSeverity('major', 2, lambda: "cpu" > 85) .group('I brought Jack, Miranda and Tali') .value('Save the Galaxy') .origin('Omega') diff --git a/server/server_test.go b/server/server_test.go index e7df2c17f..0e78ba0d6 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -10149,6 +10149,7 @@ func TestServer_AlertHandlers(t *testing.T) { Event: "id", Group: "test", Environment: "env", + Severity: "critical", Text: "message", Origin: "kapacitor", Service: []string{"alert"}, diff --git a/services/alerta/alertatest/alertatest.go b/services/alerta/alertatest/alertatest.go index fbb22cd8c..380d8a4cc 100644 --- a/services/alerta/alertatest/alertatest.go +++ b/services/alerta/alertatest/alertatest.go @@ -57,6 +57,7 @@ type PostData struct { Event string `json:"event"` Group string `json:"group"` Environment string `json:"environment"` + Severity string `json:"severity"` Text string `json:"text"` Origin string `json:"origin"` Service []string `json:"service"` diff --git a/services/alerta/service.go b/services/alerta/service.go index c4e1e635a..81e60ea73 100644 --- a/services/alerta/service.go +++ b/services/alerta/service.go @@ -17,6 +17,8 @@ import ( khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/tick/ast" + "github.com/influxdata/kapacitor/tick/stateful" "github.com/pkg/errors" ) @@ -267,6 +269,16 @@ type HandlerConfig struct { // Defaut is set from the configuration. Environment string `mapstructure:"environment"` + // Renaming rules for severities + // Allows to rewrite build-in kapacitor severities (info, warn, crit) to any of Alerta multiple severities + RenameSeverities map[string]string `mapstructure:"rename-severities"` + + // Expressions for custom Alerta severities + // Allows to fine tune severity levels of kapacitor + ExtraSeverityExpressions []stateful.Expression `mapstructure:"severity-expressions"` + ExtraSeverityNames []string `mapstructure:"severity-names"` + ExtraSeverityScopePools []stateful.ScopePool `mapstructure:"severity-scope-pool"` + // Alerta group. // Can be a template and has access to the same data as the AlertNode.Details property. // Default: {{ .Group }} @@ -297,13 +309,17 @@ type handler struct { c HandlerConfig diag Diagnostic - resourceTmpl *text.Template - eventTmpl *text.Template - environmentTmpl *text.Template - valueTmpl *text.Template - groupTmpl *text.Template - serviceTmpl []*text.Template - correlateTmpl []*text.Template + resourceTmpl *text.Template + eventTmpl *text.Template + environmentTmpl *text.Template + renameSeverities map[string]string + severityLevels []string + severityExpressions []stateful.Expression + scopePools []stateful.ScopePool + valueTmpl *text.Template + groupTmpl *text.Template + serviceTmpl []*text.Template + correlateTmpl []*text.Template } func (s *Service) DefaultHandlerConfig() HandlerConfig { @@ -357,16 +373,20 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er } return &handler{ - s: s, - c: c, - diag: s.diag.WithContext(ctx...), - resourceTmpl: rtmpl, - eventTmpl: evtmpl, - environmentTmpl: etmpl, - groupTmpl: gtmpl, - valueTmpl: vtmpl, - serviceTmpl: stmpl, - correlateTmpl: ctmpl, + s: s, + c: c, + diag: s.diag.WithContext(ctx...), + resourceTmpl: rtmpl, + eventTmpl: evtmpl, + environmentTmpl: etmpl, + renameSeverities: c.RenameSeverities, + severityLevels: c.ExtraSeverityNames, + severityExpressions: c.ExtraSeverityExpressions, + scopePools: c.ExtraSeverityScopePools, + groupTmpl: gtmpl, + valueTmpl: vtmpl, + serviceTmpl: stmpl, + correlateTmpl: ctmpl, }, nil } @@ -468,20 +488,40 @@ func (h *handler) Handle(event alert.Event) { } var severity string + var severityKey string switch event.State.Level { case alert.OK: severity = "ok" + severityKey = "ok" case alert.Info: severity = "informational" + severityKey = "info" case alert.Warning: severity = "warning" + severityKey = "warn" case alert.Critical: severity = "critical" + severityKey = "crit" default: severity = "indeterminate" } + if val, ok := h.renameSeverities[severityKey]; ok { + severity = val + } + + if len(h.severityLevels) != 0 { + for i, expression := range h.severityExpressions { + if pass, err := EvalPredicate(expression, h.scopePools[i], event.State.Time, event.Data.Fields, event.Data.Tags); err != nil { + h.diag.Error("error evaluating expression for Alerta severity", err) + } else if pass { + severity = h.severityLevels[i] + break + } + } + } + if err := h.s.Alert( h.c.Token, h.c.TokenPrefix, @@ -502,3 +542,54 @@ func (h *handler) Handle(event alert.Event) { h.diag.Error("failed to send event to Alerta", err) } } + +func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, now time.Time, fields models.Fields, tags models.Tags) (bool, error) { + vars := scopePool.Get() + defer scopePool.Put(vars) + err := fillScope(vars, scopePool.ReferenceVariables(), now, fields, tags) + if err != nil { + return false, err + } + + // for function signature check + if _, err := se.Type(vars); err != nil { + return false, err + } + + return se.EvalBool(vars) +} + +// fillScope - given a scope and reference variables, we fill the exact variables from the now, fields and tags. +func fillScope(vars *stateful.Scope, referenceVariables []string, now time.Time, fields models.Fields, tags models.Tags) error { + for _, refVariableName := range referenceVariables { + if refVariableName == "time" { + vars.Set("time", now.Local()) + continue + } + + // Support the error with tags/fields collision + var fieldValue interface{} + var isFieldExists bool + var tagValue interface{} + var isTagExists bool + + if fieldValue, isFieldExists = fields[refVariableName]; isFieldExists { + vars.Set(refVariableName, fieldValue) + } + + if tagValue, isTagExists = tags[refVariableName]; isTagExists { + if isFieldExists { + return fmt.Errorf("cannot have field and tags with same name %q", refVariableName) + } + vars.Set(refVariableName, tagValue) + } + if !isFieldExists && !isTagExists { + if !vars.Has(refVariableName) { + vars.Set(refVariableName, ast.MissingValue) + } + + } + } + + return nil +}