Skip to content

Commit

Permalink
Refactor logging with diagnostics
Browse files Browse the repository at this point in the history
vendor zap for logging refactor

Change name of diagnostic handlers file

Add diagnostics for slack service

Use keyvalue instead of map[string]string for ctx

Add diagnostic to task_store service

Add diagnostics to reporting handler

Add diagnostics to storage service

Fix wrong service type

Add diagnostic to http service

Add diagnostics to alerta service

Add diagnostic to kapacitor package

SQUASH ME: figure out how to deal with UDF node

Add diagnostic to alert service

Add hipchat diagnostic

Add diagnostic for udf and udf service

Add diagnostic to pagerduty

Add diagnostic to SMTP service

Add diagnostic to opsgenie service

Add diagnostic to pushover service

Add diagnostic to httppost service

Add diagnostic to sensu service

Add diagnostic to snmp service

Add diagnostic to telegram service

Add diagnostic to mqtt service

Add diagnostic to talk handler

Add diagnostics to config override service

Remove logger from test service

I'm not sure if this is what we want to do

Add diagnostic to server package

Not totally sold on the interface, for server, but it felt okay for this
package for some reason

Add diagnostic to replay service

Add diagnostic to kubernetes service

Add diagnostic to swarm service

Add diagnostic to deadman service

Add diagnostic to noauth service

Add diagnostic to stats service

Add diagnostic to UDP service

Add diagnostic to InfluxDB service

Add diagnostic to scraper services

Add diagnostic to edge logger

Add static level diagnostic handler

Remove log service from server package

Remove log service from kapacitord

Change diagnostic Service to struct

SQUASH ME

Address various TODO items

Address various TODOs

Remove logging service

Change from interfaces to explicit types on service

Add time configuration to to zap

Use ioutil.Discard in tests

Set default level to debug

Remove commented out code

Address TODOs in diagnostic package

Prevent newlines from scaper service

DRY up common code

Fix level logger

Add log level setting from CLI

Fix log data node

Address a few of the changes suggested in the PR

Fix addition comments from PR

Fix log node level and prefix

Wrap NodeDiagnostic to increment error count

Clarify comment in Error function

Fix derivative error function

Fix alerta diagnostic

First pass: impelment basic logfmt logger

Remove zap dependency

Remove usage of zap

Don't use pointers to objects

Fix log line format

Format fields and tags appropriately

Add time to log function

Add logger tests

Add tests to logger

Fix logger test wording

Add changes suggested in PR
  • Loading branch information
desa committed Sep 18, 2017
1 parent d36f371 commit 4dcab56
Show file tree
Hide file tree
Showing 109 changed files with 4,195 additions and 1,432 deletions.
86 changes: 42 additions & 44 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
html "html/template"
"log"
"os"
"sync"
text "text/template"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
Expand Down Expand Up @@ -75,9 +75,13 @@ type AlertNode struct {
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *AlertNode, err error) {
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (an *AlertNode, err error) {
ctx := []keyvalue.T{
keyvalue.KV("task", et.Task.ID),
}

an = &AlertNode{
node: node{Node: n, et: et, logger: l},
node: node{Node: n, et: et, diag: d},
a: n,
}
an.node.runF = an.runAlert
Expand Down Expand Up @@ -126,20 +130,20 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
c := alertservice.TCPHandlerConfig{
Address: tcp.Address,
}
h := alertservice.NewTCPHandler(c, l)
h := alertservice.NewTCPHandler(c, an.diag)
an.handlers = append(an.handlers, h)
}

for _, email := range n.EmailHandlers {
c := smtp.HandlerConfig{
To: email.ToList,
}
h := et.tm.SMTPService.Handler(c, l)
h := et.tm.SMTPService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.EmailHandlers) == 0 && (et.tm.SMTPService != nil && et.tm.SMTPService.Global()) {
c := smtp.HandlerConfig{}
h := et.tm.SMTPService.Handler(c, l)
h := et.tm.SMTPService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If email has been configured with state changes only set it.
Expand All @@ -155,7 +159,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Args: e.Command[1:],
Commander: et.tm.Commander,
}
h := alertservice.NewExecHandler(c, l)
h := alertservice.NewExecHandler(c, an.diag)
an.handlers = append(an.handlers, h)
}

Expand All @@ -165,7 +169,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if log.Mode != 0 {
c.Mode = os.FileMode(log.Mode)
}
h, err := alertservice.NewLogHandler(c, l)
h, err := alertservice.NewLogHandler(c, an.diag)
if err != nil {
return nil, errors.Wrap(err, "failed to create log alert handler")
}
Expand All @@ -176,25 +180,25 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
c := victorops.HandlerConfig{
RoutingKey: vo.RoutingKey,
}
h := et.tm.VictorOpsService.Handler(c, l)
h := et.tm.VictorOpsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.VictorOpsHandlers) == 0 && (et.tm.VictorOpsService != nil && et.tm.VictorOpsService.Global()) {
c := victorops.HandlerConfig{}
h := et.tm.VictorOpsService.Handler(c, l)
h := et.tm.VictorOpsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

for _, pd := range n.PagerDutyHandlers {
c := pagerduty.HandlerConfig{
ServiceKey: pd.ServiceKey,
}
h := et.tm.PagerDutyService.Handler(c, l)
h := et.tm.PagerDutyService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.PagerDutyHandlers) == 0 && (et.tm.PagerDutyService != nil && et.tm.PagerDutyService.Global()) {
c := pagerduty.HandlerConfig{}
h := et.tm.PagerDutyService.Handler(c, l)
h := et.tm.PagerDutyService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -203,7 +207,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Source: s.Source,
Handlers: s.HandlersList,
}
h, err := et.tm.SensuService.Handler(c, l)
h, err := et.tm.SensuService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create sensu alert handler")
}
Expand All @@ -216,11 +220,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Username: s.Username,
IconEmoji: s.IconEmoji,
}
h := et.tm.SlackService.Handler(c, l)
h := et.tm.SlackService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.SlackHandlers) == 0 && (et.tm.SlackService != nil && et.tm.SlackService.Global()) {
h := et.tm.SlackService.Handler(slack.HandlerConfig{}, l)
h := et.tm.SlackService.Handler(slack.HandlerConfig{}, ctx...)
an.handlers = append(an.handlers, h)
}
// If slack has been configured with state changes only set it.
Expand All @@ -237,7 +241,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
DisableWebPagePreview: t.IsDisableWebPagePreview,
DisableNotification: t.IsDisableNotification,
}
h := et.tm.TelegramService.Handler(c, l)
h := et.tm.TelegramService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -254,7 +258,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
TrapOid: s.TrapOid,
DataList: dataList,
}
h, err := et.tm.SNMPTrapService.Handler(c, l)
h, err := et.tm.SNMPTrapService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create SNMP handler")
}
Expand All @@ -263,7 +267,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *

if len(n.TelegramHandlers) == 0 && (et.tm.TelegramService != nil && et.tm.TelegramService.Global()) {
c := telegram.HandlerConfig{}
h := et.tm.TelegramService.Handler(c, l)
h := et.tm.TelegramService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If telegram has been configured with state changes only set it.
Expand All @@ -278,12 +282,12 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Room: hc.Room,
Token: hc.Token,
}
h := et.tm.HipChatService.Handler(c, l)
h := et.tm.HipChatService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.HipChatHandlers) == 0 && (et.tm.HipChatService != nil && et.tm.HipChatService.Global()) {
c := hipchat.HandlerConfig{}
h := et.tm.HipChatService.Handler(c, l)
h := et.tm.HipChatService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If HipChat has been configured with state changes only set it.
Expand Down Expand Up @@ -322,7 +326,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if a.Timeout != 0 {
c.Timeout = a.Timeout
}
h, err := et.tm.AlertaService.Handler(c, l)
h, err := et.tm.AlertaService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create Alerta handler")
}
Expand All @@ -346,7 +350,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if p.Sound != "" {
c.Sound = p.Sound
}
h := et.tm.PushoverService.Handler(c, l)
h := et.tm.PushoverService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -356,7 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
Endpoint: p.Endpoint,
Headers: p.Headers,
}
h := et.tm.HTTPPostService.Handler(c, l)
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -365,17 +369,17 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
TeamsList: og.TeamsList,
RecipientsList: og.RecipientsList,
}
h := et.tm.OpsGenieService.Handler(c, l)
h := et.tm.OpsGenieService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.OpsGenieHandlers) == 0 && (et.tm.OpsGenieService != nil && et.tm.OpsGenieService.Global()) {
c := opsgenie.HandlerConfig{}
h := et.tm.OpsGenieService.Handler(c, l)
h := et.tm.OpsGenieService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

for range n.TalkHandlers {
h := et.tm.TalkService.Handler(l)
h := et.tm.TalkService.Handler(ctx...)
an.handlers = append(an.handlers, h)
}

Expand All @@ -386,7 +390,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
QoS: mqtt.QoSLevel(m.Qos),
Retained: m.Retained,
}
h := et.tm.MQTTService.Handler(c, l)
h := et.tm.MQTTService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// Parse level expressions
Expand Down Expand Up @@ -560,8 +564,8 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
// Check for previous state on anonTopic
if n.hasAnonTopic() {
if state, ok, err := n.et.tm.AlertService.EventState(n.anonTopic, id); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to get event state for anonymous topic %s, event %s: %v", n.anonTopic, id, err)
n.diag.Error("failed to get event state for anonymous topic", err,
keyvalue.KV("topic", n.anonTopic), keyvalue.KV("event", id))
} else if ok {
anonTopicState = state
anonFound = true
Expand All @@ -570,8 +574,8 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
// Check for previous state on topic.
if n.hasTopic() {
if state, ok, err := n.et.tm.AlertService.EventState(n.topic, id); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to get event state for topic %s, event %s: %v", n.topic, id, err)
n.diag.Error("failed to get event state for topic", err,
keyvalue.KV("topic", n.anonTopic), keyvalue.KV("event", id))
} else if ok {
topicState = state
topicFound = true
Expand All @@ -581,14 +585,12 @@ func (n *AlertNode) restoreEvent(id string) (alert.Level, time.Time) {
if anonFound && topicFound {
// Anon topic takes precedence
if err := n.et.tm.AlertService.UpdateEvent(n.topic, anonTopicState); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to update topic %q event state for event %q", n.topic, id)
n.diag.Error("failed to update topic event state", err, keyvalue.KV("topic", n.topic), keyvalue.KV("event", id))
}
} else if topicFound && n.hasAnonTopic() {
// Update event state for topic
if err := n.et.tm.AlertService.UpdateEvent(n.anonTopic, topicState); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! failed to update topic %q event state for event %q", n.topic, id)
n.diag.Error("failed to update topic event state", err, keyvalue.KV("topic", n.topic), keyvalue.KV("event", id))
}
} // else nothing was found, nothing to do
}
Expand Down Expand Up @@ -623,16 +625,15 @@ func (n *AlertNode) handleEvent(event alert.Event) {
case alert.Critical:
n.critsTriggered.Add(1)
}
n.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0])
n.diag.AlertTriggered(event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0])

// If we have anon handlers, emit event to the anonTopic
if n.hasAnonTopic() {
event.Topic = n.anonTopic
err := n.et.tm.AlertService.Collect(event)
if err != nil {
n.eventsDropped.Add(1)
n.incrementErrorCount()
n.logger.Println("E!", err)
n.diag.Error("encountered error collecting event", err)
}
}

Expand All @@ -642,8 +643,7 @@ func (n *AlertNode) handleEvent(event alert.Event) {
err := n.et.tm.AlertService.Collect(event)
if err != nil {
n.eventsDropped.Add(1)
n.incrementErrorCount()
n.logger.Println("E!", err)
n.diag.Error("encountered error collecting event", err)
}
}
}
Expand All @@ -654,8 +654,7 @@ func (n *AlertNode) determineLevel(p edge.FieldsTagsTimeGetter, currentLevel ale
}
if rse := n.levelResets[currentLevel]; rse != nil {
if pass, err := EvalPredicate(rse, n.lrScopePools[currentLevel], p); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err)
n.diag.Error("error evaluating reset expression for current level", err, keyvalue.KV("level", currentLevel.String()))
} else if !pass {
return currentLevel
}
Expand All @@ -676,8 +675,7 @@ func (n *AlertNode) findFirstMatchLevel(start alert.Level, stop alert.Level, p e
continue
}
if pass, err := EvalPredicate(se, n.scopePools[l], p); err != nil {
n.incrementErrorCount()
n.logger.Printf("E! error evaluating expression for level %v: %s", alert.Level(l), err)
n.diag.Error("error evaluating expression for level", err, keyvalue.KV("level", alert.Level(l).String()))
continue
} else if pass {
return alert.Level(l), true
Expand Down
6 changes: 1 addition & 5 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package alert

import (
"fmt"
"log"
"path"
"sort"
"sync"
Expand All @@ -20,14 +19,11 @@ type Topics struct {
mu sync.RWMutex

topics map[string]*Topic

logger *log.Logger
}

func NewTopics(l *log.Logger) *Topics {
func NewTopics() *Topics {
s := &Topics{
topics: make(map[string]*Topic),
logger: l,
}
return s
}
Expand Down
Loading

0 comments on commit 4dcab56

Please sign in to comment.