From 3d4cbd1f9ebb8f6582e0054db131854ff76c101a Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 10 May 2016 12:37:35 -0600 Subject: [PATCH 1/2] update node stats --- CHANGELOG.md | 1 + alert.go | 39 +++++++++++++++++++++++++++++++++-- batch.go | 44 ++++++++++++++++++++++++++++------------ default.go | 16 +++++++++++++++ eval.go | 3 ++- influxdb_out.go | 15 ++++++++++++-- pipeline/alert.go | 9 ++++++++ pipeline/batch.go | 7 +++++++ pipeline/default.go | 6 ++++++ pipeline/eval.go | 4 ++++ pipeline/influxdb_out.go | 5 +++++ 11 files changed, 131 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e52325b88..720b67559 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,6 +109,7 @@ For example, let's say we want to store all data that triggered an alert in Infl - [#504](https://github.com/influxdata/kapacitor/pull/504): BREAKING: Many changes to the API and underlying storage system. This release requires a special upgrade process. - [#511](https://github.com/influxdata/kapacitor/pull/511): Adds DefaultNode for providing default values for missing fields or tags. - [#285](https://github.com/influxdata/kapacitor/pull/285): Track created,modified and last enabled dates on tasks. +- [#533](https://github.com/influxdata/kapacitor/pull/533): Add useful statistics for nodes. ### Bugfixes diff --git a/alert.go b/alert.go index c09e34be5..7dfa1eeab 100644 --- a/alert.go +++ b/alert.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/influxql" imodels "github.com/influxdata/influxdb/models" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -24,6 +25,10 @@ import ( const ( statsAlertsTriggered = "alerts_triggered" + statsOKsTriggered = "oks_triggered" + statsInfosTriggered = "infos_triggered" + statsWarnsTriggered = "warns_triggered" + statsCritsTriggered = "crits_triggered" ) // The newest state change is weighted 'weightDiff' times more than oldest state change. @@ -104,6 +109,12 @@ type AlertNode struct { messageTmpl *text.Template detailsTmpl *html.Template + alertsTriggered *expvar.Int + oksTriggered *expvar.Int + infosTriggered *expvar.Int + warnsTriggered *expvar.Int + critsTriggered *expvar.Int + bufPool sync.Pool } @@ -322,7 +333,21 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } func (a *AlertNode) runAlert([]byte) error { - a.statMap.Add(statsAlertsTriggered, 0) + a.alertsTriggered = &expvar.Int{} + a.statMap.Set(statsAlertsTriggered, a.alertsTriggered) + + a.oksTriggered = &expvar.Int{} + a.statMap.Set(statsOKsTriggered, a.oksTriggered) + + a.infosTriggered = &expvar.Int{} + a.statMap.Set(statsInfosTriggered, a.infosTriggered) + + a.warnsTriggered = &expvar.Int{} + a.statMap.Set(statsWarnsTriggered, a.warnsTriggered) + + a.critsTriggered = &expvar.Int{} + a.statMap.Set(statsCritsTriggered, a.critsTriggered) + switch a.Wants() { case pipeline.StreamEdge: for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() { @@ -483,7 +508,17 @@ func (a *AlertNode) runAlert([]byte) error { return nil } func (a *AlertNode) handleAlert(ad *AlertData) { - a.statMap.Add(statsAlertsTriggered, 1) + a.alertsTriggered.Add(1) + switch ad.Level { + case OKAlert: + a.oksTriggered.Add(1) + case InfoAlert: + a.infosTriggered.Add(1) + case WarnAlert: + a.warnsTriggered.Add(1) + case CritAlert: + a.critsTriggered.Add(1) + } a.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", ad.Level, ad.ID, ad.Message, ad.Data.Series[0]) for _, h := range a.handlers { h(ad) diff --git a/batch.go b/batch.go index 4749b11a4..e8ae83db0 100644 --- a/batch.go +++ b/batch.go @@ -11,10 +11,18 @@ import ( "github.com/gorhill/cronexpr" client "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) +const ( + statsQueryErrors = "query_errors" + statsConnectErrors = "connect_errors" + statsBatchesQueried = "batches_queried" + statsPointsQueried = "points_queried" +) + type BatchNode struct { node s *pipeline.BatchNode @@ -105,11 +113,6 @@ func (s *BatchNode) collectedCount() (count int64) { return } -const ( - statsQueryErrors = "query_errors" - statsConnectErrors = "connect_errors" -) - type QueryNode struct { node b *pipeline.QueryNode @@ -119,6 +122,11 @@ type QueryNode struct { queryErr chan error closing chan struct{} aborting chan struct{} + + queryErrors *expvar.Int + connectErrors *expvar.Int + batchesQueried *expvar.Int + pointsQueried *expvar.Int } func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*QueryNode, error) { @@ -259,7 +267,7 @@ func (b *QueryNode) doQuery() error { if err != nil { b.logger.Println("E! failed to connect to InfluxDB:", err) b.timer.Stop() - b.statMap.Add(statsConnectErrors, 1) + b.connectErrors.Add(1) break } q := client.Query{ @@ -271,14 +279,14 @@ func (b *QueryNode) doQuery() error { if err != nil { b.logger.Println("E! query failed:", err) b.timer.Stop() - b.statMap.Add(statsQueryErrors, 1) + b.queryErrors.Add(1) break } if err := resp.Error(); err != nil { b.logger.Println("E! query failed:", err) b.timer.Stop() - b.statMap.Add(statsQueryErrors, 1) + b.queryErrors.Add(1) break } @@ -287,17 +295,19 @@ func (b *QueryNode) doQuery() error { batches, err := models.ResultToBatches(res) if err != nil { b.logger.Println("E! failed to understand query result:", err) - b.statMap.Add(statsQueryErrors, 1) + b.queryErrors.Add(1) continue } - b.timer.Pause() for _, bch := range batches { + b.batchesQueried.Add(1) + b.pointsQueried.Add(int64(len(bch.Points))) + b.timer.Pause() err := b.ins[0].CollectBatch(bch) if err != nil { return err } + b.timer.Resume() } - b.timer.Resume() } b.timer.Stop() } @@ -305,8 +315,16 @@ func (b *QueryNode) doQuery() error { } func (b *QueryNode) runBatch([]byte) error { - b.statMap.Add(statsQueryErrors, 0) - b.statMap.Add(statsConnectErrors, 0) + b.queryErrors = &expvar.Int{} + b.connectErrors = &expvar.Int{} + b.batchesQueried = &expvar.Int{} + b.pointsQueried = &expvar.Int{} + + b.statMap.Set(statsQueryErrors, b.queryErrors) + b.statMap.Set(statsConnectErrors, b.connectErrors) + b.statMap.Set(statsBatchesQueried, b.batchesQueried) + b.statMap.Set(statsPointsQueried, b.pointsQueried) + errC := make(chan error, 1) go func() { defer func() { diff --git a/default.go b/default.go index 883fb1e7e..05082165a 100644 --- a/default.go +++ b/default.go @@ -3,13 +3,22 @@ package kapacitor import ( "log" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) +const ( + statsFieldsDefaulted = "fields_defaulted" + statsTagsDefaulted = "tags_defaulted" +) + type DefaultNode struct { node d *pipeline.DefaultNode + + fieldsDefaulted *expvar.Int + tagsDefaulted *expvar.Int } // Create a new DefaultNode which applies a transformation func to each point in a stream and returns a single point. @@ -23,6 +32,11 @@ func newDefaultNode(et *ExecutingTask, n *pipeline.DefaultNode, l *log.Logger) ( } func (e *DefaultNode) runDefault(snapshot []byte) error { + e.fieldsDefaulted = &expvar.Int{} + e.tagsDefaulted = &expvar.Int{} + + e.statMap.Set(statsFieldsDefaulted, e.fieldsDefaulted) + e.statMap.Set(statsTagsDefaulted, e.tagsDefaulted) switch e.Provides() { case pipeline.StreamEdge: for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() { @@ -63,6 +77,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model newFields = newFields.Copy() fieldsCopied = true } + d.fieldsDefaulted.Add(1) newFields[field] = value } } @@ -74,6 +89,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model newTags = newTags.Copy() tagsCopied = true } + d.tagsDefaulted.Add(1) newTags[tag] = value } } diff --git a/eval.go b/eval.go index 32db7ef79..e5bcb0e52 100644 --- a/eval.go +++ b/eval.go @@ -21,7 +21,8 @@ type EvalNode struct { e *pipeline.EvalNode expressions []stateful.Expression scopePool stateful.ScopePool - evalErrors *expvar.Int + + evalErrors *expvar.Int } // Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point. diff --git a/influxdb_out.go b/influxdb_out.go index db13c00ac..2386ccc11 100644 --- a/influxdb_out.go +++ b/influxdb_out.go @@ -6,18 +6,23 @@ import ( "time" client "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) const ( statsInfluxDBPointsWritten = "points_written" + statsInfluxDBWriteErrors = "write_errors" ) type InfluxDBOutNode struct { node i *pipeline.InfluxDBOutNode wb *writeBuffer + + pointsWritten *expvar.Int + writeErrors *expvar.Int } func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.Logger) (*InfluxDBOutNode, error) { @@ -33,7 +38,12 @@ func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.L } func (i *InfluxDBOutNode) runOut([]byte) error { - i.statMap.Add(statsInfluxDBPointsWritten, 0) + i.pointsWritten = &expvar.Int{} + i.writeErrors = &expvar.Int{} + + i.statMap.Set(statsInfluxDBPointsWritten, i.pointsWritten) + i.statMap.Set(statsInfluxDBWriteErrors, i.writeErrors) + // Start the write buffer i.wb.start() @@ -222,6 +232,7 @@ func (w *writeBuffer) writeAll() { for bpc, bp := range w.buffer { err := w.write(bp) if err != nil { + w.i.writeErrors.Add(1) w.i.logger.Println("E! failed to write points to InfluxDB:", err) } delete(w.buffer, bpc) @@ -240,6 +251,6 @@ func (w *writeBuffer) write(bp client.BatchPoints) error { return err } } - w.i.statMap.Add(statsInfluxDBPointsWritten, int64(len(bp.Points()))) + w.i.pointsWritten.Add(int64(len(bp.Points()))) return w.conn.Write(bp) } diff --git a/pipeline/alert.go b/pipeline/alert.go index 489d2091a..161aed8ea 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -86,6 +86,15 @@ const defaultLogFileMode = 0600 // WARNING expressions would be evaluated, but not the // CRITICAL expression. // Each expression maintains its own state. +// +// Available Statistics: +// +// * alerts_triggered -- Total number of alerts triggered +// * oks_triggered -- Number of OK alerts triggered +// * infos_triggered -- Number of Info alerts triggered +// * warns_triggered -- Number of Warn alerts triggered +// * crits_triggered -- Number of Crit alerts triggered +// type AlertNode struct { chainnode diff --git a/pipeline/batch.go b/pipeline/batch.go index 7a50ae083..f152b7d9c 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -20,6 +20,13 @@ import ( // |query('SELECT value from views') // ... // +// Available Statistics: +// +// * query_errors -- number of errors when querying +// * connect_errors -- number of errors connecting to InfluxDB +// * batches_queried -- number of batches returned from queries +// * points_queried -- total number of points in batches +// type BatchNode struct { node } diff --git a/pipeline/default.go b/pipeline/default.go index c2d538b8a..85782f0cb 100644 --- a/pipeline/default.go +++ b/pipeline/default.go @@ -12,6 +12,12 @@ import "fmt" // // The above example will set the field `value` to float64(0) if it does not already exist // It will also set the tag `host` to string("") if it does not already exist. +// +// Available Statistics: +// +// * fields_defaulted -- number of fields that were missing +// * tags_defaulted -- number of tags that were missing +// type DefaultNode struct { chainnode diff --git a/pipeline/eval.go b/pipeline/eval.go index 31641fd34..83cb3bcb8 100644 --- a/pipeline/eval.go +++ b/pipeline/eval.go @@ -18,6 +18,10 @@ import ( // data point with the result of `error_count / total_count` where // `error_count` and `total_count` are existing fields on the data point. // +// Available Statistics: +// +// * eval_errors -- number of errors evaluating any expressions. +// type EvalNode struct { chainnode diff --git a/pipeline/influxdb_out.go b/pipeline/influxdb_out.go index 8bee8da81..7d6a89865 100644 --- a/pipeline/influxdb_out.go +++ b/pipeline/influxdb_out.go @@ -19,6 +19,11 @@ const DefaultFlushInterval = time.Second * 10 // .tag('kapacitor', 'true') // .tag('version', '0.2') // +// Available Statistics: +// +// * points_written -- number of points written to InfluxDB +// * write_errors -- number of errors attempting to write to InfluxDB +// type InfluxDBOutNode struct { node From 0c333adb43dda5426aefd701a10a333736852300 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 10 May 2016 12:51:31 -0600 Subject: [PATCH 2/2] fix race --- batch.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/batch.go b/batch.go index e8ae83db0..de056e852 100644 --- a/batch.go +++ b/batch.go @@ -234,6 +234,15 @@ func (b *QueryNode) Queries(start, stop time.Time) []string { // Query InfluxDB and collect batches on batch collector. func (b *QueryNode) doQuery() error { defer b.ins[0].Close() + b.queryErrors = &expvar.Int{} + b.connectErrors = &expvar.Int{} + b.batchesQueried = &expvar.Int{} + b.pointsQueried = &expvar.Int{} + + b.statMap.Set(statsQueryErrors, b.queryErrors) + b.statMap.Set(statsConnectErrors, b.connectErrors) + b.statMap.Set(statsBatchesQueried, b.batchesQueried) + b.statMap.Set(statsPointsQueried, b.pointsQueried) if b.et.tm.InfluxDBService == nil { return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query") @@ -315,16 +324,6 @@ func (b *QueryNode) doQuery() error { } func (b *QueryNode) runBatch([]byte) error { - b.queryErrors = &expvar.Int{} - b.connectErrors = &expvar.Int{} - b.batchesQueried = &expvar.Int{} - b.pointsQueried = &expvar.Int{} - - b.statMap.Set(statsQueryErrors, b.queryErrors) - b.statMap.Set(statsConnectErrors, b.connectErrors) - b.statMap.Set(statsBatchesQueried, b.batchesQueried) - b.statMap.Set(statsPointsQueried, b.pointsQueried) - errC := make(chan error, 1) go func() { defer func() {