Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storing error data in redis #1419

Merged
merged 1 commit into from
Oct 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions cmd/bosun/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import (

// Core data access interface for everything sched needs
type DataAccess interface {
Metadata() MetadataDataAccess
Search() SearchDataAccess
Errors() ErrorDataAccess
}

type MetadataDataAccess interface {
// Insert Metric Metadata. Field must be one of "desc", "rate", or "unit".
PutMetricMetadata(metric string, field string, value string) error
// Get Metric Metadata for given metric.
Expand All @@ -25,8 +31,6 @@ type DataAccess interface {
PutTagMetadata(tags opentsdb.TagSet, name string, value string, updated time.Time) error
GetTagMetadata(tags opentsdb.TagSet, name string) ([]*TagMetadata, error)
DeleteTagMetadata(tags opentsdb.TagSet, name string) error

Search() SearchDataAccess
}

type SearchDataAccess interface {
Expand Down Expand Up @@ -122,3 +126,26 @@ func newPool(server, password string, database int, isRedis bool, maxActive int,
func init() {
collect.AggregateMeta("bosun.redis", metadata.MilliSecond, "time in milliseconds per redis call.")
}

// Ledis can't do DEL in a blanket way like redis can. It has a unique command per type.
// These helpers allow easy switching.
func (d *dataAccess) LCLEAR() string {
if d.isRedis {
return "DEL"
}
return "LCLEAR"
}

func (d *dataAccess) SCLEAR() string {
if d.isRedis {
return "DEL"
}
return "SCLEAR"
}

func (d *dataAccess) LMCLEAR(key string, value string) (string, []interface{}) {
if d.isRedis {
return "LREM", []interface{}{key, 0, value}
}
return "LMCLEAR", []interface{}{key, value}
}
242 changes: 242 additions & 0 deletions cmd/bosun/database/error_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package database

import (
"bosun.org/_third_party/github.com/garyburd/redigo/redis"
"bosun.org/collect"
"bosun.org/models"
"bosun.org/opentsdb"
"encoding/json"
"fmt"
"time"
)

/*

failingAlerts = set of currently failing alerts
alertsWithErrors = set of alerts with any errors
errorEvents = list of (alert) one per individual error event
error:{name} = list of json objects for coalesced error events (most recent first).

*/

type ErrorDataAccess interface {
MarkAlertSuccess(name string) error
MarkAlertFailure(name string, msg string) error
GetFailingAlertCounts() (int, int, error)

GetFailingAlerts() (map[string]bool, error)
IsAlertFailing(name string) (bool, error)

GetFullErrorHistory() (map[string][]*models.AlertError, error)
ClearAlert(name string) error
ClearAll() error
}

func (d *dataAccess) Errors() ErrorDataAccess {
return d
}

const (
failingAlerts = "failingAlerts"
errorEvents = "errorEvents"
alertsWithErrors = "alertsWithErrors"
)

func (d *dataAccess) MarkAlertSuccess(name string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "MarkAlertSuccess"})()
conn := d.GetConnection()
defer conn.Close()
_, err := conn.Do("SREM", failingAlerts, name)
return err
}

func (d *dataAccess) MarkAlertFailure(name string, msg string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "MarkAlertFailure"})()
conn := d.GetConnection()
defer conn.Close()

failing, err := d.IsAlertFailing(name)
if err != nil {
return err
}

if _, err := conn.Do("SADD", alertsWithErrors, name); err != nil {
return err
}
if _, err := conn.Do("SADD", failingAlerts, name); err != nil {
return err
}
var event *models.AlertError
if failing {
event, err = d.getLastErrorEvent(name)
if err != nil {
return err
}
}
now := time.Now().UTC().Truncate(time.Second)
if event == nil || event.Message != msg {
event = &models.AlertError{
FirstTime: now,
LastTime: now,
Count: 1,
Message: msg,
}
} else {
event.Count++
event.LastTime = now
// pop prior record
_, err = conn.Do("LPOP", errorListKey(name))
if err != nil {
return err
}
}
marshalled, err := json.Marshal(event)
if err != nil {
return err
}
_, err = conn.Do("LPUSH", errorListKey(name), marshalled)
if err != nil {
return err
}
_, err = conn.Do("LPUSH", errorEvents, name)
return err
}

func (d *dataAccess) GetFailingAlertCounts() (int, int, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetFailingAlertCounts"})()
conn := d.GetConnection()
defer conn.Close()
failing, err := redis.Int(conn.Do("SCARD", failingAlerts))
if err != nil {
return 0, 0, err
}
events, err := redis.Int(conn.Do("LLEN", errorEvents))
if err != nil {
return 0, 0, err
}
return failing, events, nil
}

func (d *dataAccess) GetFailingAlerts() (map[string]bool, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetFailingAlertCounts"})()
conn := d.GetConnection()
defer conn.Close()
alerts, err := redis.Strings(conn.Do("SMEMBERS", failingAlerts))
if err != nil {
return nil, err
}
r := make(map[string]bool, len(alerts))
for _, a := range alerts {
r[a] = true
}
return r, nil
}
func (d *dataAccess) IsAlertFailing(name string) (bool, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "IsAlertFailing"})()
conn := d.GetConnection()
defer conn.Close()
return redis.Bool(conn.Do("SISMEMBER", failingAlerts, name))
}

func errorListKey(name string) string {
return fmt.Sprintf("errors:%s", name)
}
func (d *dataAccess) getLastErrorEvent(name string) (*models.AlertError, error) {
conn := d.GetConnection()
str, err := redis.Bytes(conn.Do("LINDEX", errorListKey(name), "0"))
if err != nil {
if err == redis.ErrNil {
return nil, nil
}
return nil, err
}
ev := &models.AlertError{}
if err = json.Unmarshal(str, ev); err != nil {
return nil, err
}
return ev, nil
}

func (d *dataAccess) GetFullErrorHistory() (map[string][]*models.AlertError, error) {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "GetFullErrorHistory"})()
conn := d.GetConnection()
defer conn.Close()

alerts, err := redis.Strings(conn.Do("SMEMBERS", alertsWithErrors))
if err != nil {
return nil, err
}
results := make(map[string][]*models.AlertError, len(alerts))
for _, a := range alerts {
rows, err := redis.Strings(conn.Do("LRANGE", errorListKey(a), 0, -1))
if err != nil {
return nil, err
}
list := make([]*models.AlertError, len(rows))
for i, row := range rows {
ae := &models.AlertError{}
err = json.Unmarshal([]byte(row), ae)
if err != nil {
return nil, err
}
list[i] = ae
}
results[a] = list
}
return results, nil
}

func (d *dataAccess) ClearAlert(name string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "ClearAlert"})()
conn := d.GetConnection()
defer conn.Close()

_, err := conn.Do("SREM", alertsWithErrors, name)
if err != nil {
return err
}
_, err = conn.Do("SREM", failingAlerts, name)
if err != nil {
return err
}
_, err = conn.Do(d.LCLEAR(), errorListKey(name))
if err != nil {
return err
}
cmd, args := d.LMCLEAR(errorEvents, name)
_, err = conn.Do(cmd, args...)
if err != nil {
return err
}

return nil
}

//Things could forseeably get a bit inconsistent if concurrent changes happen in just the wrong way.
//Clear all should do a more thourogh cleanup to fully reset things.
func (d *dataAccess) ClearAll() error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "ClearAll"})()
conn := d.GetConnection()
defer conn.Close()

alerts, err := redis.Strings(conn.Do("SMEMBERS", alertsWithErrors))
if err != nil {
return err
}
for _, a := range alerts {
if _, err := conn.Do(d.LCLEAR(), errorListKey(a)); err != nil {
return err
}
}
if _, err := conn.Do(d.SCLEAR(), alertsWithErrors); err != nil {
return err
}
if _, err := conn.Do(d.SCLEAR(), failingAlerts); err != nil {
return err
}
if _, err = conn.Do(d.LCLEAR(), errorEvents); err != nil {
return err
}

return nil
}
4 changes: 4 additions & 0 deletions cmd/bosun/database/metric_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func metricMetaKey(metric string) string {

const metricMetaTTL = int((time.Hour * 24 * 7) / time.Second)

func (d *dataAccess) Metadata() MetadataDataAccess {
return d
}

func (d *dataAccess) PutMetricMetadata(metric string, field string, value string) error {
defer collect.StartTimer("redis", opentsdb.TagSet{"op": "PutMetricMeta"})()
if field != "desc" && field != "unit" && field != "rate" {
Expand Down
6 changes: 6 additions & 0 deletions cmd/bosun/database/test/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ func randString(l int) string {
}
return s
}

func check(t *testing.T, err error) {
if err != nil {
t.Fatal(err)
}
}
79 changes: 79 additions & 0 deletions cmd/bosun/database/test/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package dbtest

import (
"testing"
)

func TestErrors_RoundTrip(t *testing.T) {
ed := testData.Errors()
alert := "abcdefg"

// make sure we can mark success and error
check(t, ed.MarkAlertSuccess(alert))
failing, err := ed.IsAlertFailing(alert)
check(t, err)
if failing {
t.Fatal("Alert should not be failing")
}
check(t, ed.MarkAlertFailure(alert, "Something bad happened"))
failing, err = ed.IsAlertFailing(alert)
check(t, err)
if !failing {
t.Fatal("Alert should be failing")
}

// generate a sequence of errors. We should have groups of 2/1/1 (oldest to newest)
check(t, ed.MarkAlertFailure(alert, "Something bad happened"))
check(t, ed.MarkAlertSuccess(alert))
check(t, ed.MarkAlertFailure(alert, "Something bad happened"))
check(t, ed.MarkAlertFailure(alert, "Something different bad happened"))

failingCount, events, err := ed.GetFailingAlertCounts()
check(t, err)
if failingCount != 1 {
t.Fatalf("Expected 1 failing alert. Got %d", failingCount)
}
if events != 4 {
t.Fatalf("Expected 1 error events. Got %d", events)
}

fullData, err := ed.GetFullErrorHistory()
check(t, err)
if len(fullData) != 1 {
t.Fatalf("Expected data for 1 alert. See %d", len(fullData))
}
ad := fullData[alert]
if len(ad) != 3 {
t.Fatalf("Expected data for alert to have 3 entries. See %d", len(ad))
}
if ad[0].Count != 1 {
t.Fatalf("Expected first entry to have length 1. Found %d.", ad[0].Count)
}
if ad[1].Count != 1 {
t.Fatalf("Expected second entry to have length 1. Found %d.", ad[1].Count)
}
if ad[2].Count != 2 {
t.Fatalf("Expected third entry to have length 2. Found %d.", ad[2].Count)
}

check(t, ed.ClearAlert(alert))
failingCount, events, err = ed.GetFailingAlertCounts()
check(t, err)
if failingCount != 0 {
t.Fatalf("Expected 0 failing alert. Got %d", failingCount)
}
if events != 0 {
t.Fatalf("Expected 0 error events. Got %d", events)
}

check(t, ed.MarkAlertFailure(alert, "Something bad happened"))
check(t, ed.ClearAll())
failingCount, events, err = ed.GetFailingAlertCounts()
check(t, err)
if failingCount != 0 {
t.Fatalf("Expected 0 failing alert. Got %d", failingCount)
}
if events != 0 {
t.Fatalf("Expected 0 error events. Got %d", events)
}
}
Loading