Skip to content

Commit

Permalink
Merge pull request #2287 from mattnotmitt/features/discord-alert-handler
Browse files Browse the repository at this point in the history
Feature: Discord Webhook Alert Handler
  • Loading branch information
docmerlin authored May 5, 2020
2 parents 85403fa + a38fb11 commit 7e29533
Show file tree
Hide file tree
Showing 17 changed files with 887 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## unreleased

### Features

- [#2287](https://github.com/influxdata/kapacitor/pull/2287): Added Discord Webhook Alert Handler
- [#2311](https://github.com/influxdata/kapacitor/pull/2311): UDF Agent Python3 fixes.
- [#2318](https://github.com/influxdata/kapacitor/pull/2322): Add support for TLS 1.3.

Expand Down
29 changes: 29 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/discord"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/kafka"
Expand Down Expand Up @@ -459,6 +460,34 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
}
an.handlers = append(an.handlers, h)
}

for _, s := range n.DiscordHandlers {
c := discord.HandlerConfig{
Workspace: s.Workspace,
Username: s.Username,
AvatarURL: s.AvatarURL,
EmbedTitle: s.EmbedTitle,
}
h, err := et.tm.DiscordService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create Discord handler")
}
an.handlers = append(an.handlers, h)
}
if len(n.DiscordHandlers) == 0 && (et.tm.DiscordService != nil && et.tm.DiscordService.Global()) {
h, err := et.tm.DiscordService.Handler(discord.HandlerConfig{}, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create Discord handler")
}
an.handlers = append(an.handlers, h)
}
// If discord has been configured with state changes only set it.
if et.tm.DiscordService != nil &&
et.tm.DiscordService.Global() &&
et.tm.DiscordService.StateChangesOnly() {
n.IsStateChangesOnly = true
}

// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down
90 changes: 90 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/influxdata/kapacitor/services/discord"
"github.com/influxdata/kapacitor/services/discord/discordtest"
"html"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -8972,6 +8974,94 @@ stream
}
}

func TestStream_AlertDiscord(t *testing.T) {
ts := discordtest.NewServer()
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.discord()
.workspace('company_private')
.discord()
.username('testy')
`

tmInit := func(tm *kapacitor.TaskMaster) {
c1 := discord.NewConfig()
c1.Default = true
c1.Enabled = true
c1.URL = ts.URL + "/test/discord/url"
c2 := discord.NewConfig()
c2.Workspace = "company_private"
c2.Username = "comp testy"
c2.Enabled = true
c2.URL = ts.URL + "/test/discord/url2"
d := diagService.NewDiscordHandler().WithContext(keyvalue.KV("test", "discord"))
sl, err := discord.NewService([]discord.Config{c1, c2}, d)
if err != nil {
t.Error(err)
}
tm.DiscordService = sl
}
testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)

exp := []interface{}{
discordtest.Request{
URL: "/test/discord/url",
PostData: discordtest.PostData{
Username: "testy",
AvatarURL: "",
Embeds: []discordtest.Embed{
{
Color: 0xF95F53,
Description: "kapacitor/cpu/serverA is CRITICAL",
Title: "",
Timestamp: "",
},
},
},
},
discordtest.Request{
URL: "/test/discord/url2",
PostData: discordtest.PostData{
Username: "comp testy",
AvatarURL: "",
Embeds: []discordtest.Embed{
{
Color: 0xF95F53,
Description: "kapacitor/cpu/serverA is CRITICAL",
Title: "",
Timestamp: "",
},
},
},
},
}

ts.Close()
var got []interface{}
for _, g := range ts.Requests() {
got = append(got, g)
}

if err := compareListIgnoreOrder(got, exp, nil); err != nil {
t.Error(err)
}
}

func TestStream_AlertPushover(t *testing.T) {
ts := pushovertest.NewServer()
defer ts.Close()
Expand Down
80 changes: 80 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type AlertNode struct{ *AlertNodeData }
// * Talk -- Post alert message to Talk client.
// * Telegram -- Post alert message to Telegram client.
// * MQTT -- Post alert message to MQTT.
// * Discord -- Post alert message to Discord webhook.
//
// See below for more details on configuring each handler.
//
Expand Down Expand Up @@ -350,6 +351,10 @@ type AlertNodeData struct {
// tick:ignore
SlackHandlers []*SlackHandler `tick:"Slack" json:"slack"`

// Send alert to Discord.
// tick:ignore
DiscordHandlers []*DiscordHandler `tick:"Discord" json:"discord"`

// Send alert to Telegram.
// tick:ignore
TelegramHandlers []*TelegramHandler `tick:"Telegram" json:"telegram"`
Expand Down Expand Up @@ -1523,6 +1528,81 @@ type SlackHandler struct {
IconEmoji string `json:"iconEmoji"`
}

// Send the alert to Discord.
// To allow Kapacitor to post to Discord,
// follow this guide https://support.discordapp.com/hc/en-us/articles/228383668
// and create a new webhook and place the generated URL
// in the 'discord' configuration section.
//
// Example:
// [[discord]]
// enabled = true
// url = "https://discordapp.com/api/webhooks/xxxxxxxxxxxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
//
// In order to not post a message every alert interval
// use AlertNode.StateChangesOnly so that only events
// where the alert changed state are posted to the channel.
//
// Example:
// stream
// |alert()
// .discord()
//
// Send alerts to the default workspace
//
// Example:
// stream
// |alert()
// .discord()
// .workspace('opencommunity')
//
// send alerts to the opencommunity workspace
//
// If the 'discord' section in the configuration has the option: global = true
// then all alerts are sent to Discord without the need to explicitly state it
// in the TICKscript.
//
// Example:
// [[discord]]
// enabled = true
// default = true
// workspace = examplecorp
// url = "https://discordapp.com/api/webhooks/xxxxxxxxxxxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
// global = true
// state-changes-only = true
//
// Example:
// stream
// |alert()
//
// Send alert to Discord.
// tick:property
func (n *AlertNodeData) Discord() *DiscordHandler {
discord := &DiscordHandler{
AlertNodeData: n,
}
n.DiscordHandlers = append(n.DiscordHandlers, discord)
return discord
}

// tick:embedded:AlertNode.Discord
type DiscordHandler struct {
*AlertNodeData `json:"-"`

// Discord workspace ID to use when posting to webhook
// If empty uses the default config
Workspace string `json:"workspace"`
// Username of webhook
// If empty uses the default config
Username string `json:"username"`
// URL of webhook's avatar
// If empty uses the default config
AvatarURL string `json:"avatarUrl"`
// Embed title
// If empty uses the default config
EmbedTitle string `json:"embedTitle"`
}

// Send the alert to Telegram.
// For step-by-step instructions on setting up Kapacitor with Telegram, see the [Event Handler Setup Guide](https://docs.influxdata.com//kapacitor/latest/guides/event-handler-setup/#telegram-setup).
// To allow Kapacitor to post to Telegram,
Expand Down
1 change: 1 addition & 0 deletions pipeline/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestAlertNode_MarshalJSON(t *testing.T) {
"pushover": null,
"sensu": null,
"slack": null,
"discord": null,
"telegram": null,
"hipChat": null,
"alerta": null,
Expand Down
1 change: 1 addition & 0 deletions pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func TestPipeline_MarshalJSON(t *testing.T) {
"pushover": null,
"sensu": null,
"slack": null,
"discord": null,
"telegram": null,
"hipChat": null,
"alerta": null,
Expand Down
1 change: 1 addition & 0 deletions pipeline/tick/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func TestAlertTCPJSON(t *testing.T) {
"pushover": null,
"sensu": null,
"slack": null,
"discord": null,
"telegram": null,
"hipChat": null,
"alerta": null,
Expand Down
6 changes: 6 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdata/kapacitor/services/consul"
"github.com/influxdata/kapacitor/services/deadman"
"github.com/influxdata/kapacitor/services/diagnostic"
"github.com/influxdata/kapacitor/services/discord"
"github.com/influxdata/kapacitor/services/dns"
"github.com/influxdata/kapacitor/services/ec2"
"github.com/influxdata/kapacitor/services/file_discovery"
Expand Down Expand Up @@ -86,6 +87,7 @@ type Config struct {

// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
Discord discord.Configs `toml:"discord" override:"discord,element-key=workspace"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
Kafka kafka.Configs `toml:"kafka" override:"kafka,element-key=id"`
MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"`
Expand Down Expand Up @@ -155,6 +157,7 @@ func NewConfig() *Config {
c.OpenTSDB = opentsdb.NewConfig()

c.Alerta = alerta.NewConfig()
c.Discord = discord.Configs{discord.NewDefaultConfig()}
c.HipChat = hipchat.NewConfig()
c.Kafka = kafka.Configs{kafka.NewConfig()}
c.MQTT = mqtt.Configs{mqtt.NewConfig()}
Expand Down Expand Up @@ -272,6 +275,9 @@ func (c *Config) Validate() error {
if err := c.Alerta.Validate(); err != nil {
return errors.Wrap(err, "alerta")
}
if err := c.Discord.Validate(); err != nil {
return err
}
if err := c.HipChat.Validate(); err != nil {
return errors.Wrap(err, "hipchat")
}
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/influxdata/kapacitor/services/consul"
"github.com/influxdata/kapacitor/services/deadman"
"github.com/influxdata/kapacitor/services/diagnostic"
"github.com/influxdata/kapacitor/services/discord"
"github.com/influxdata/kapacitor/services/dns"
"github.com/influxdata/kapacitor/services/ec2"
"github.com/influxdata/kapacitor/services/file_discovery"
Expand Down Expand Up @@ -240,6 +241,9 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv

// Append Alert integration services
s.appendAlertaService()
if err := s.appendDiscordService(); err != nil {
return nil, errors.Wrap(err, "discord service")
}
s.appendHipChatService()
s.appendKafkaService()
if err := s.appendMQTTService(); err != nil {
Expand Down Expand Up @@ -768,6 +772,22 @@ func (s *Server) appendAlertaService() {
s.AppendService("alerta", srv)
}

func (s *Server) appendDiscordService() error {
c := s.config.Discord
d := s.DiagService.NewDiscordHandler()
srv, err := discord.NewService(c, d)
if err != nil {
return err
}

s.TaskMaster.DiscordService = srv
s.AlertService.DiscordService = srv

s.SetDynamicService("discord", srv)
s.AppendService("discord", srv)
return nil
}

func (s *Server) appendTalkService() {
c := s.config.Talk
d := s.DiagService.NewTalkHandler()
Expand Down
Loading

0 comments on commit 7e29533

Please sign in to comment.