Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add match conditions to alert handlers #1259

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,42 @@

### Release Notes

With this release the technical preview alerting service has been refactored.
Alert handlers now only ever have a single action and belong to a single topic.

The handler defintion has been simplified as a result.
Here are some example alert handlers using the new structure:

```yaml
id: my_handler
kind: pagerDuty
options:
serviceKey: XXX
```

```yaml
id: aggregate_by_1m
kind: aggregate
options:
interval: 1m
topic: aggregated
```

```yaml
id: publish_to_system
kind: publish
options:
topics: [ system ]
```

To define a handler now you must specify which topic the handler belongs to.
For example to define the above aggregate handler on the system topic use this command:

```sh
kapacitor define-handler system aggregate_by_1m.yaml
```


### Features

- [#1159](https://github.com/influxdata/kapacitor/pulls/1159): Go version 1.7.4 -> 1.7.5
Expand All @@ -14,6 +50,11 @@
- [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration.
- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node.
- [#1211](https://github.com/influxdata/kapacitor/issues/1211): Add StateDuration node.
- [#1209](https://github.com/influxdata/kapacitor/issues/1209): BREAKING: Refactor the Alerting service.
The change is completely breaking for the technical preview alerting service, a.k.a. the new alert topic handler features.
The change boils down to simplifying how you define and interact with topics.
Alert handlers now only ever have a single action and belong to a single topic.
See the updated API docs.

### Bugfixes

Expand Down
27 changes: 17 additions & 10 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/kapacitor/services/snmptrap"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/influxdata/kapacitor/vars"
"github.com/pkg/errors"
Expand Down Expand Up @@ -388,14 +389,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

an.levels[alert.Info] = statefulExpression
an.scopePools[alert.Info] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Info.Expression))
an.scopePools[alert.Info] = stateful.NewScopePool(ast.FindReferenceVariables(n.Info.Expression))
if n.InfoReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.InfoReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for infoReset: %s", lexpressionCompileError)
}
an.levelResets[alert.Info] = lstatefulExpression
an.lrScopePools[alert.Info] = stateful.NewScopePool(stateful.FindReferenceVariables(n.InfoReset.Expression))
an.lrScopePools[alert.Info] = stateful.NewScopePool(ast.FindReferenceVariables(n.InfoReset.Expression))
}
}

Expand All @@ -405,14 +406,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for warn: %s", expressionCompileError)
}
an.levels[alert.Warning] = statefulExpression
an.scopePools[alert.Warning] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Warn.Expression))
an.scopePools[alert.Warning] = stateful.NewScopePool(ast.FindReferenceVariables(n.Warn.Expression))
if n.WarnReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.WarnReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for warnReset: %s", lexpressionCompileError)
}
an.levelResets[alert.Warning] = lstatefulExpression
an.lrScopePools[alert.Warning] = stateful.NewScopePool(stateful.FindReferenceVariables(n.WarnReset.Expression))
an.lrScopePools[alert.Warning] = stateful.NewScopePool(ast.FindReferenceVariables(n.WarnReset.Expression))
}
}

Expand All @@ -422,14 +423,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, fmt.Errorf("Failed to compile stateful expression for crit: %s", expressionCompileError)
}
an.levels[alert.Critical] = statefulExpression
an.scopePools[alert.Critical] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Crit.Expression))
an.scopePools[alert.Critical] = stateful.NewScopePool(ast.FindReferenceVariables(n.Crit.Expression))
if n.CritReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.CritReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for critReset: %s", lexpressionCompileError)
}
an.levelResets[alert.Critical] = lstatefulExpression
an.lrScopePools[alert.Critical] = stateful.NewScopePool(stateful.FindReferenceVariables(n.CritReset.Expression))
an.lrScopePools[alert.Critical] = stateful.NewScopePool(ast.FindReferenceVariables(n.CritReset.Expression))
}
}

Expand Down Expand Up @@ -464,7 +465,7 @@ func (a *AlertNode) runAlert([]byte) error {

// Register Handlers on topic
for _, h := range a.handlers {
a.et.tm.AlertService.RegisterHandler([]string{a.anonTopic}, h)
a.et.tm.AlertService.RegisterAnonHandler(a.anonTopic, h)
}
// Restore anonTopic
a.et.tm.AlertService.RestoreTopic(a.anonTopic)
Expand Down Expand Up @@ -707,7 +708,7 @@ func (a *AlertNode) runAlert([]byte) error {
a.et.tm.AlertService.CloseTopic(a.anonTopic)
// Deregister Handlers on topic
for _, h := range a.handlers {
a.et.tm.AlertService.DeregisterHandler([]string{a.anonTopic}, h)
a.et.tm.AlertService.DeregisterAnonHandler(a.anonTopic, h)
}
return nil
}
Expand All @@ -730,14 +731,20 @@ func (a *AlertNode) restoreEventState(id string) (alert.Level, time.Time) {
var anonFound, topicFound bool
// Check for previous state on anonTopic
if a.hasAnonTopic() {
if state, ok := a.et.tm.AlertService.EventState(a.anonTopic, id); ok {
if state, ok, err := a.et.tm.AlertService.EventState(a.anonTopic, id); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! failed to get event state for anonymous topic %s, event %s: %v", a.anonTopic, id, err)
} else if ok {
anonTopicState = state
anonFound = true
}
}
// Check for previous state on topic.
if a.hasTopic() {
if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok {
if state, ok, err := a.et.tm.AlertService.EventState(a.topic, id); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! failed to get event state for topic %s, event %s: %v", a.topic, id, err)
} else if ok {
topicState = state
topicFound = true
}
Expand Down
69 changes: 43 additions & 26 deletions alert/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,40 +73,57 @@ Then alert handlers can be configured to subscribe to the events.
These alert handlers will be configured via the API.
Use yaml/json to define the alert handlers.

Here are a few examples:

```yaml
id: my_handler
kind: pagerDuty
options:
serviceKey: XXX
```

```yaml
id: aggregate_by_1m
kind: aggregate
options:
interval: 1m
topic: aggregated
```

topics:
- cpu
- mem

actions:
- kind: aggregate
options:
groupBy: id
interval: 1m
- kind: throttle
options:
count: 10
every: 5m
- kind: publish
options:
topics: [ throttled_aggreated ]
- kind: pagerDuty
options:
serviceKey: XXX
```yaml
id: publish_to_system
kind: publish
options:
topics: [ system ]
```

```json
{
"id": "my_handler",
"topics": ["cpu", "mem"],
"actions": [
{"kind":"aggregate", "options": {"groupBy":"id","internal":"1m"}},
{"kind":"throttle", "options": {"count":10,"every":"5m"}},
{"kind":"publish", "options": {"topics":["throttled_aggreated"]}},
{"kind":"pagerDuty", "options": {"serviceKey":"XXX"}}
]
"kind": "pagerDuty",
"options": {
"serviceKey": "XXX"
}
}
```

```json
{
"id": "aggregate_by_1m",
"kind": "aggregate",
"options": {
"interval": "1m"
}
}
```

```json
{
"id": "publish_to_system",
"kind": "publish",
"options": {
"topics": ["system"]
}
}
```

Expand Down
Loading