Skip to content

Commit

Permalink
various sensu fixes #240
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 2, 2016
1 parent 2ad6ea2 commit 829870d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ With #144 you can now join streams with differing group by dimensions.
- [#213](https://github.com/influxdata/kapacitor/issues/231): Add SourceStreamNode so that yuou must always first call `.from` on the `stream` object before filtering it, so as to not create confusing to understand TICKscripts.
- [#255](https://github.com/influxdata/kapacitor/issues/255): Add OPTIONS handler for task delete method so it can be preflighted.
- [#258](https://github.com/influxdata/kapacitor/issues/258): Fix UDP internal metrics, change subscriptions to use clusterID.
- [#240](https://github.com/influxdata/kapacitor/issues/240): BREAKING: Fix issues with Sensu integration. The breaking change is that the config no longer takes a `url` but rather a `host` option since the communication is raw TCP rather HTTP.

## v0.10.1 [2016-02-08]

Expand Down
23 changes: 14 additions & 9 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
imodels "github.com/influxdb/influxdb/models"
)

const (
statsAlertsTriggered = "alerts_triggered"
)

// The newest state change is weighted 'weightDiff' times more than oldest state change.
const weightDiff = 1.5

Expand Down Expand Up @@ -269,6 +273,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
a.statMap.Add(statsAlertsTriggered, 0)
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand All @@ -291,9 +296,7 @@ func (a *AlertNode) runAlert([]byte) error {
if err != nil {
return err
}
for _, h := range a.handlers {
h(ad)
}
a.handleAlert(ad)
}
a.timer.Stop()
}
Expand All @@ -313,9 +316,7 @@ func (a *AlertNode) runAlert([]byte) error {
if err != nil {
return err
}
for _, h := range a.handlers {
h(ad)
}
a.handleAlert(ad)
break
}
}
Expand All @@ -330,16 +331,20 @@ func (a *AlertNode) runAlert([]byte) error {
if err != nil {
return err
}
for _, h := range a.handlers {
h(ad)
}
a.handleAlert(ad)
}
}
a.timer.Stop()
}
}
return nil
}
func (a *AlertNode) handleAlert(ad *AlertData) {
a.statMap.Add(statsAlertsTriggered, 1)
for _, h := range a.handlers {
h(ad)
}
}

func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string) (level AlertLevel) {
for l, se := range a.levels {
Expand Down
4 changes: 2 additions & 2 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ data_dir = "/var/lib/kapacitor"
[sensu]
# Configure Sensu.
enabled = false
# The Sensu URL.
url = "http://sensu:3030"
# The Sensu Client host:port address.
addr = "sensu-client:3030"
# Default JIT source.
source = "Kapacitor"

Expand Down
74 changes: 47 additions & 27 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -2004,35 +2005,54 @@ stream

func TestStream_AlertSensu(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
type postData struct {
Name string `json:"name"`
Source string `json:"source"`
Output string `json:"output"`
Status int `json:"status"`
}
pd := postData{}
dec := json.NewDecoder(r.Body)
dec.Decode(&pd)
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
listen, err := net.ListenTCP("tcp", addr)
if err != nil {
t.Fatal(err)
}
defer listen.Close()
go func() {
for {
conn, err := listen.Accept()
if err != nil {
t.Log(err)
return
}
func() {
defer conn.Close()

requestCount++
type postData struct {
Name string `json:"name"`
Source string `json:"source"`
Output string `json:"output"`
Status int `json:"status"`
}
pd := postData{}
dec := json.NewDecoder(conn)
dec.Decode(&pd)

if exp := "Kapacitor"; pd.Source != exp {
t.Errorf("unexpected source got %s exp %s", pd.Source, exp)
}
if exp := "Kapacitor"; pd.Source != exp {
t.Errorf("unexpected source got %s exp %s", pd.Source, exp)
}

if exp := "kapacitor/cpu/serverA is CRITICAL"; pd.Output != exp {
t.Errorf("unexpected text got %s exp %s", pd.Output, exp)
}
if exp := "kapacitor.cpu.serverA is CRITICAL"; pd.Output != exp {
t.Errorf("unexpected text got %s exp %s", pd.Output, exp)
}

if exp := "kapacitor/cpu/serverA"; pd.Name != exp {
t.Errorf("unexpected text got %s exp %s", pd.Name, exp)
}
if exp := "kapacitor.cpu.serverA"; pd.Name != exp {
t.Errorf("unexpected text got %s exp %s", pd.Name, exp)
}

if exp := 2; pd.Status != exp {
t.Errorf("unexpected status got %v exp %v", pd.Status, exp)
if exp := 2; pd.Status != exp {
t.Errorf("unexpected status got %v exp %v", pd.Status, exp)
}
}()
}
}))
defer ts.Close()
}()

var script = `
stream
Expand All @@ -2044,7 +2064,7 @@ stream
.every(10s)
.mapReduce(influxql.count('value'))
.alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.id('kapacitor.{{ .Name }}.{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
Expand All @@ -2055,12 +2075,12 @@ stream
defer tm.Close()

c := sensu.NewConfig()
c.URL = ts.URL
c.Addr = listen.Addr().String()
c.Source = "Kapacitor"
sl := sensu.NewService(c, logService.NewLogger("[test_sensu] ", log.LstdFlags))
tm.SensuService = sl

err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
err = fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
if err != nil {
t.Error(err)
}
Expand Down
13 changes: 5 additions & 8 deletions services/sensu/config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package sensu

import (
"net/url"
)
import "errors"

const DefaultSource = "Kapacitor"

type Config struct {
// Whether Sensu integration is enabled.
Enabled bool `toml:"enabled"`
// The Sensu URL.
URL string `toml:"url"`
// The Sensu client host:port address.
Addr string `toml:"addr"`
// The JIT sensu source name of the alert.
Source string `toml:"source"`
}
Expand All @@ -22,9 +20,8 @@ func NewConfig() Config {
}

func (c Config) Validate() error {
_, err := url.Parse(c.URL)
if err != nil {
return err
if c.Enabled && c.Addr == "" {
return errors.New("must specify sensu client address")
}
return nil
}
43 changes: 21 additions & 22 deletions services/sensu/service.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
package sensu

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net"
"regexp"

"github.com/influxdata/kapacitor"
)

type Service struct {
url string
addr string
source string
logger *log.Logger
}

var validNamePattern = regexp.MustCompile(`^[\w\.-]+$`)

func NewService(c Config, l *log.Logger) *Service {
return &Service{
url: c.URL,
addr: c.Addr,
source: c.Source,
logger: l,
}
Expand All @@ -35,6 +37,10 @@ func (s *Service) Close() error {
}

func (s *Service) Alert(name, output string, level kapacitor.AlertLevel) error {
if !validNamePattern.MatchString(name) {
return fmt.Errorf("invalid name %q for sensu alert. Must match %v", name, validNamePattern)
}

var status int
switch level {
case kapacitor.OKAlert:
Expand All @@ -55,31 +61,24 @@ func (s *Service) Alert(name, output string, level kapacitor.AlertLevel) error {
postData["output"] = output
postData["status"] = status

var post bytes.Buffer
enc := json.NewEncoder(&post)
err := enc.Encode(postData)
addr, err := net.ResolveTCPAddr("tcp", s.addr)
if err != nil {
return err
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return err
}
defer conn.Close()

resp, err := http.Post(s.url, "application/json", &post)
enc := json.NewEncoder(conn)
err = enc.Encode(postData)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
type response struct {
Error string `json:"error"`
}
r := &response{Error: fmt.Sprintf("failed to understand Sensu response. code: %d content: %s", resp.StatusCode, string(body))}
b := bytes.NewReader(body)
dec := json.NewDecoder(b)
dec.Decode(r)
return errors.New(r.Error)
resp, err := ioutil.ReadAll(conn)
if string(resp) != "ok" {
return errors.New("sensu socket error: " + string(resp))
}
return nil
}

0 comments on commit 829870d

Please sign in to comment.