Skip to content

Commit

Permalink
update node stats
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 10, 2016
1 parent 129210e commit 3d4cbd1
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#504](https://github.com/influxdata/kapacitor/pull/504): BREAKING: Many changes to the API and underlying storage system. This release requires a special upgrade process.
- [#511](https://github.com/influxdata/kapacitor/pull/511): Adds DefaultNode for providing default values for missing fields or tags.
- [#285](https://github.com/influxdata/kapacitor/pull/285): Track created,modified and last enabled dates on tasks.
- [#533](https://github.com/influxdata/kapacitor/pull/533): Add useful statistics for nodes.

### Bugfixes

Expand Down
39 changes: 37 additions & 2 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ import (

"github.com/influxdata/influxdb/influxql"
imodels "github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
)

const (
statsAlertsTriggered = "alerts_triggered"
statsOKsTriggered = "oks_triggered"
statsInfosTriggered = "infos_triggered"
statsWarnsTriggered = "warns_triggered"
statsCritsTriggered = "crits_triggered"
)

// The newest state change is weighted 'weightDiff' times more than oldest state change.
Expand Down Expand Up @@ -104,6 +109,12 @@ type AlertNode struct {
messageTmpl *text.Template
detailsTmpl *html.Template

alertsTriggered *expvar.Int
oksTriggered *expvar.Int
infosTriggered *expvar.Int
warnsTriggered *expvar.Int
critsTriggered *expvar.Int

bufPool sync.Pool
}

Expand Down Expand Up @@ -322,7 +333,21 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
a.statMap.Add(statsAlertsTriggered, 0)
a.alertsTriggered = &expvar.Int{}
a.statMap.Set(statsAlertsTriggered, a.alertsTriggered)

a.oksTriggered = &expvar.Int{}
a.statMap.Set(statsOKsTriggered, a.oksTriggered)

a.infosTriggered = &expvar.Int{}
a.statMap.Set(statsInfosTriggered, a.infosTriggered)

a.warnsTriggered = &expvar.Int{}
a.statMap.Set(statsWarnsTriggered, a.warnsTriggered)

a.critsTriggered = &expvar.Int{}
a.statMap.Set(statsCritsTriggered, a.critsTriggered)

switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand Down Expand Up @@ -483,7 +508,17 @@ func (a *AlertNode) runAlert([]byte) error {
return nil
}
func (a *AlertNode) handleAlert(ad *AlertData) {
a.statMap.Add(statsAlertsTriggered, 1)
a.alertsTriggered.Add(1)
switch ad.Level {
case OKAlert:
a.oksTriggered.Add(1)
case InfoAlert:
a.infosTriggered.Add(1)
case WarnAlert:
a.warnsTriggered.Add(1)
case CritAlert:
a.critsTriggered.Add(1)
}
a.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", ad.Level, ad.ID, ad.Message, ad.Data.Series[0])
for _, h := range a.handlers {
h(ad)
Expand Down
44 changes: 31 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ import (
"github.com/gorhill/cronexpr"
client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
statsBatchesQueried = "batches_queried"
statsPointsQueried = "points_queried"
)

type BatchNode struct {
node
s *pipeline.BatchNode
Expand Down Expand Up @@ -105,11 +113,6 @@ func (s *BatchNode) collectedCount() (count int64) {
return
}

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
)

type QueryNode struct {
node
b *pipeline.QueryNode
Expand All @@ -119,6 +122,11 @@ type QueryNode struct {
queryErr chan error
closing chan struct{}
aborting chan struct{}

queryErrors *expvar.Int
connectErrors *expvar.Int
batchesQueried *expvar.Int
pointsQueried *expvar.Int
}

func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*QueryNode, error) {
Expand Down Expand Up @@ -259,7 +267,7 @@ func (b *QueryNode) doQuery() error {
if err != nil {
b.logger.Println("E! failed to connect to InfluxDB:", err)
b.timer.Stop()
b.statMap.Add(statsConnectErrors, 1)
b.connectErrors.Add(1)
break
}
q := client.Query{
Expand All @@ -271,14 +279,14 @@ func (b *QueryNode) doQuery() error {
if err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
break
}

if err := resp.Error(); err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
break
}

Expand All @@ -287,26 +295,36 @@ func (b *QueryNode) doQuery() error {
batches, err := models.ResultToBatches(res)
if err != nil {
b.logger.Println("E! failed to understand query result:", err)
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
continue
}
b.timer.Pause()
for _, bch := range batches {
b.batchesQueried.Add(1)
b.pointsQueried.Add(int64(len(bch.Points)))
b.timer.Pause()
err := b.ins[0].CollectBatch(bch)
if err != nil {
return err
}
b.timer.Resume()
}
b.timer.Resume()
}
b.timer.Stop()
}
}
}

func (b *QueryNode) runBatch([]byte) error {
b.statMap.Add(statsQueryErrors, 0)
b.statMap.Add(statsConnectErrors, 0)
b.queryErrors = &expvar.Int{}
b.connectErrors = &expvar.Int{}
b.batchesQueried = &expvar.Int{}
b.pointsQueried = &expvar.Int{}

b.statMap.Set(statsQueryErrors, b.queryErrors)
b.statMap.Set(statsConnectErrors, b.connectErrors)
b.statMap.Set(statsBatchesQueried, b.batchesQueried)
b.statMap.Set(statsPointsQueried, b.pointsQueried)

errC := make(chan error, 1)
go func() {
defer func() {
Expand Down
16 changes: 16 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package kapacitor
import (
"log"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsFieldsDefaulted = "fields_defaulted"
statsTagsDefaulted = "tags_defaulted"
)

type DefaultNode struct {
node
d *pipeline.DefaultNode

fieldsDefaulted *expvar.Int
tagsDefaulted *expvar.Int
}

// Create a new DefaultNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -23,6 +32,11 @@ func newDefaultNode(et *ExecutingTask, n *pipeline.DefaultNode, l *log.Logger) (
}

func (e *DefaultNode) runDefault(snapshot []byte) error {
e.fieldsDefaulted = &expvar.Int{}
e.tagsDefaulted = &expvar.Int{}

e.statMap.Set(statsFieldsDefaulted, e.fieldsDefaulted)
e.statMap.Set(statsTagsDefaulted, e.tagsDefaulted)
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
Expand Down Expand Up @@ -63,6 +77,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model
newFields = newFields.Copy()
fieldsCopied = true
}
d.fieldsDefaulted.Add(1)
newFields[field] = value
}
}
Expand All @@ -74,6 +89,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model
newTags = newTags.Copy()
tagsCopied = true
}
d.tagsDefaulted.Add(1)
newTags[tag] = value
}
}
Expand Down
3 changes: 2 additions & 1 deletion eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type EvalNode struct {
e *pipeline.EvalNode
expressions []stateful.Expression
scopePool stateful.ScopePool
evalErrors *expvar.Int

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand Down
15 changes: 13 additions & 2 deletions influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ import (
"time"

client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsInfluxDBPointsWritten = "points_written"
statsInfluxDBWriteErrors = "write_errors"
)

type InfluxDBOutNode struct {
node
i *pipeline.InfluxDBOutNode
wb *writeBuffer

pointsWritten *expvar.Int
writeErrors *expvar.Int
}

func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.Logger) (*InfluxDBOutNode, error) {
Expand All @@ -33,7 +38,12 @@ func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.L
}

func (i *InfluxDBOutNode) runOut([]byte) error {
i.statMap.Add(statsInfluxDBPointsWritten, 0)
i.pointsWritten = &expvar.Int{}
i.writeErrors = &expvar.Int{}

i.statMap.Set(statsInfluxDBPointsWritten, i.pointsWritten)
i.statMap.Set(statsInfluxDBWriteErrors, i.writeErrors)

// Start the write buffer
i.wb.start()

Expand Down Expand Up @@ -222,6 +232,7 @@ func (w *writeBuffer) writeAll() {
for bpc, bp := range w.buffer {
err := w.write(bp)
if err != nil {
w.i.writeErrors.Add(1)
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, bpc)
Expand All @@ -240,6 +251,6 @@ func (w *writeBuffer) write(bp client.BatchPoints) error {
return err
}
}
w.i.statMap.Add(statsInfluxDBPointsWritten, int64(len(bp.Points())))
w.i.pointsWritten.Add(int64(len(bp.Points())))
return w.conn.Write(bp)
}
9 changes: 9 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ const defaultLogFileMode = 0600
// WARNING expressions would be evaluated, but not the
// CRITICAL expression.
// Each expression maintains its own state.
//
// Available Statistics:
//
// * alerts_triggered -- Total number of alerts triggered
// * oks_triggered -- Number of OK alerts triggered
// * infos_triggered -- Number of Info alerts triggered
// * warns_triggered -- Number of Warn alerts triggered
// * crits_triggered -- Number of Crit alerts triggered
//
type AlertNode struct {
chainnode

Expand Down
7 changes: 7 additions & 0 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
// |query('SELECT value from views')
// ...
//
// Available Statistics:
//
// * query_errors -- number of errors when querying
// * connect_errors -- number of errors connecting to InfluxDB
// * batches_queried -- number of batches returned from queries
// * points_queried -- total number of points in batches
//
type BatchNode struct {
node
}
Expand Down
6 changes: 6 additions & 0 deletions pipeline/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import "fmt"
//
// The above example will set the field `value` to float64(0) if it does not already exist
// It will also set the tag `host` to string("") if it does not already exist.
//
// Available Statistics:
//
// * fields_defaulted -- number of fields that were missing
// * tags_defaulted -- number of tags that were missing
//
type DefaultNode struct {
chainnode

Expand Down
4 changes: 4 additions & 0 deletions pipeline/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
// data point with the result of `error_count / total_count` where
// `error_count` and `total_count` are existing fields on the data point.
//
// Available Statistics:
//
// * eval_errors -- number of errors evaluating any expressions.
//
type EvalNode struct {
chainnode

Expand Down
5 changes: 5 additions & 0 deletions pipeline/influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const DefaultFlushInterval = time.Second * 10
// .tag('kapacitor', 'true')
// .tag('version', '0.2')
//
// Available Statistics:
//
// * points_written -- number of points written to InfluxDB
// * write_errors -- number of errors attempting to write to InfluxDB
//
type InfluxDBOutNode struct {
node

Expand Down

0 comments on commit 3d4cbd1

Please sign in to comment.