Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update node stats #533

Merged
merged 2 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#504](https://github.com/influxdata/kapacitor/pull/504): BREAKING: Many changes to the API and underlying storage system. This release requires a special upgrade process.
- [#511](https://github.com/influxdata/kapacitor/pull/511): Adds DefaultNode for providing default values for missing fields or tags.
- [#285](https://github.com/influxdata/kapacitor/pull/285): Track created,modified and last enabled dates on tasks.
- [#533](https://github.com/influxdata/kapacitor/pull/533): Add useful statistics for nodes.

### Bugfixes

Expand Down
39 changes: 37 additions & 2 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ import (

"github.com/influxdata/influxdb/influxql"
imodels "github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
)

const (
statsAlertsTriggered = "alerts_triggered"
statsOKsTriggered = "oks_triggered"
statsInfosTriggered = "infos_triggered"
statsWarnsTriggered = "warns_triggered"
statsCritsTriggered = "crits_triggered"
)

// The newest state change is weighted 'weightDiff' times more than oldest state change.
Expand Down Expand Up @@ -104,6 +109,12 @@ type AlertNode struct {
messageTmpl *text.Template
detailsTmpl *html.Template

alertsTriggered *expvar.Int
oksTriggered *expvar.Int
infosTriggered *expvar.Int
warnsTriggered *expvar.Int
critsTriggered *expvar.Int

bufPool sync.Pool
}

Expand Down Expand Up @@ -322,7 +333,21 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
a.statMap.Add(statsAlertsTriggered, 0)
a.alertsTriggered = &expvar.Int{}
a.statMap.Set(statsAlertsTriggered, a.alertsTriggered)

a.oksTriggered = &expvar.Int{}
a.statMap.Set(statsOKsTriggered, a.oksTriggered)

a.infosTriggered = &expvar.Int{}
a.statMap.Set(statsInfosTriggered, a.infosTriggered)

a.warnsTriggered = &expvar.Int{}
a.statMap.Set(statsWarnsTriggered, a.warnsTriggered)

a.critsTriggered = &expvar.Int{}
a.statMap.Set(statsCritsTriggered, a.critsTriggered)

switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand Down Expand Up @@ -483,7 +508,17 @@ func (a *AlertNode) runAlert([]byte) error {
return nil
}
func (a *AlertNode) handleAlert(ad *AlertData) {
a.statMap.Add(statsAlertsTriggered, 1)
a.alertsTriggered.Add(1)
switch ad.Level {
case OKAlert:
a.oksTriggered.Add(1)
case InfoAlert:
a.infosTriggered.Add(1)
case WarnAlert:
a.warnsTriggered.Add(1)
case CritAlert:
a.critsTriggered.Add(1)
}
a.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", ad.Level, ad.ID, ad.Message, ad.Data.Series[0])
for _, h := range a.handlers {
h(ad)
Expand Down
43 changes: 30 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ import (
"github.com/gorhill/cronexpr"
client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
statsBatchesQueried = "batches_queried"
statsPointsQueried = "points_queried"
)

type BatchNode struct {
node
s *pipeline.BatchNode
Expand Down Expand Up @@ -105,11 +113,6 @@ func (s *BatchNode) collectedCount() (count int64) {
return
}

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
)

type QueryNode struct {
node
b *pipeline.QueryNode
Expand All @@ -119,6 +122,11 @@ type QueryNode struct {
queryErr chan error
closing chan struct{}
aborting chan struct{}

queryErrors *expvar.Int
connectErrors *expvar.Int
batchesQueried *expvar.Int
pointsQueried *expvar.Int
}

func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, l *log.Logger) (*QueryNode, error) {
Expand Down Expand Up @@ -226,6 +234,15 @@ func (b *QueryNode) Queries(start, stop time.Time) []string {
// Query InfluxDB and collect batches on batch collector.
func (b *QueryNode) doQuery() error {
defer b.ins[0].Close()
b.queryErrors = &expvar.Int{}
b.connectErrors = &expvar.Int{}
b.batchesQueried = &expvar.Int{}
b.pointsQueried = &expvar.Int{}

b.statMap.Set(statsQueryErrors, b.queryErrors)
b.statMap.Set(statsConnectErrors, b.connectErrors)
b.statMap.Set(statsBatchesQueried, b.batchesQueried)
b.statMap.Set(statsPointsQueried, b.pointsQueried)

if b.et.tm.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query")
Expand Down Expand Up @@ -259,7 +276,7 @@ func (b *QueryNode) doQuery() error {
if err != nil {
b.logger.Println("E! failed to connect to InfluxDB:", err)
b.timer.Stop()
b.statMap.Add(statsConnectErrors, 1)
b.connectErrors.Add(1)
break
}
q := client.Query{
Expand All @@ -271,14 +288,14 @@ func (b *QueryNode) doQuery() error {
if err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
break
}

if err := resp.Error(); err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
break
}

Expand All @@ -287,26 +304,26 @@ func (b *QueryNode) doQuery() error {
batches, err := models.ResultToBatches(res)
if err != nil {
b.logger.Println("E! failed to understand query result:", err)
b.statMap.Add(statsQueryErrors, 1)
b.queryErrors.Add(1)
continue
}
b.timer.Pause()
for _, bch := range batches {
b.batchesQueried.Add(1)
b.pointsQueried.Add(int64(len(bch.Points)))
b.timer.Pause()
err := b.ins[0].CollectBatch(bch)
if err != nil {
return err
}
b.timer.Resume()
}
b.timer.Resume()
}
b.timer.Stop()
}
}
}

func (b *QueryNode) runBatch([]byte) error {
b.statMap.Add(statsQueryErrors, 0)
b.statMap.Add(statsConnectErrors, 0)
errC := make(chan error, 1)
go func() {
defer func() {
Expand Down
16 changes: 16 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package kapacitor
import (
"log"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsFieldsDefaulted = "fields_defaulted"
statsTagsDefaulted = "tags_defaulted"
)

type DefaultNode struct {
node
d *pipeline.DefaultNode

fieldsDefaulted *expvar.Int
tagsDefaulted *expvar.Int
}

// Create a new DefaultNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -23,6 +32,11 @@ func newDefaultNode(et *ExecutingTask, n *pipeline.DefaultNode, l *log.Logger) (
}

func (e *DefaultNode) runDefault(snapshot []byte) error {
e.fieldsDefaulted = &expvar.Int{}
e.tagsDefaulted = &expvar.Int{}

e.statMap.Set(statsFieldsDefaulted, e.fieldsDefaulted)
e.statMap.Set(statsTagsDefaulted, e.tagsDefaulted)
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
Expand Down Expand Up @@ -63,6 +77,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model
newFields = newFields.Copy()
fieldsCopied = true
}
d.fieldsDefaulted.Add(1)
newFields[field] = value
}
}
Expand All @@ -74,6 +89,7 @@ func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (model
newTags = newTags.Copy()
tagsCopied = true
}
d.tagsDefaulted.Add(1)
newTags[tag] = value
}
}
Expand Down
3 changes: 2 additions & 1 deletion eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type EvalNode struct {
e *pipeline.EvalNode
expressions []stateful.Expression
scopePool stateful.ScopePool
evalErrors *expvar.Int

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand Down
15 changes: 13 additions & 2 deletions influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ import (
"time"

client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

const (
statsInfluxDBPointsWritten = "points_written"
statsInfluxDBWriteErrors = "write_errors"
)

type InfluxDBOutNode struct {
node
i *pipeline.InfluxDBOutNode
wb *writeBuffer

pointsWritten *expvar.Int
writeErrors *expvar.Int
}

func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.Logger) (*InfluxDBOutNode, error) {
Expand All @@ -33,7 +38,12 @@ func newInfluxDBOutNode(et *ExecutingTask, n *pipeline.InfluxDBOutNode, l *log.L
}

func (i *InfluxDBOutNode) runOut([]byte) error {
i.statMap.Add(statsInfluxDBPointsWritten, 0)
i.pointsWritten = &expvar.Int{}
i.writeErrors = &expvar.Int{}

i.statMap.Set(statsInfluxDBPointsWritten, i.pointsWritten)
i.statMap.Set(statsInfluxDBWriteErrors, i.writeErrors)

// Start the write buffer
i.wb.start()

Expand Down Expand Up @@ -222,6 +232,7 @@ func (w *writeBuffer) writeAll() {
for bpc, bp := range w.buffer {
err := w.write(bp)
if err != nil {
w.i.writeErrors.Add(1)
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, bpc)
Expand All @@ -240,6 +251,6 @@ func (w *writeBuffer) write(bp client.BatchPoints) error {
return err
}
}
w.i.statMap.Add(statsInfluxDBPointsWritten, int64(len(bp.Points())))
w.i.pointsWritten.Add(int64(len(bp.Points())))
return w.conn.Write(bp)
}
9 changes: 9 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ const defaultLogFileMode = 0600
// WARNING expressions would be evaluated, but not the
// CRITICAL expression.
// Each expression maintains its own state.
//
// Available Statistics:
//
// * alerts_triggered -- Total number of alerts triggered
// * oks_triggered -- Number of OK alerts triggered
// * infos_triggered -- Number of Info alerts triggered
// * warns_triggered -- Number of Warn alerts triggered
// * crits_triggered -- Number of Crit alerts triggered
//
type AlertNode struct {
chainnode

Expand Down
7 changes: 7 additions & 0 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
// |query('SELECT value from views')
// ...
//
// Available Statistics:
//
// * query_errors -- number of errors when querying
// * connect_errors -- number of errors connecting to InfluxDB
// * batches_queried -- number of batches returned from queries
// * points_queried -- total number of points in batches
//
type BatchNode struct {
node
}
Expand Down
6 changes: 6 additions & 0 deletions pipeline/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import "fmt"
//
// The above example will set the field `value` to float64(0) if it does not already exist
// It will also set the tag `host` to string("") if it does not already exist.
//
// Available Statistics:
//
// * fields_defaulted -- number of fields that were missing
// * tags_defaulted -- number of tags that were missing
//
type DefaultNode struct {
chainnode

Expand Down
4 changes: 4 additions & 0 deletions pipeline/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
// data point with the result of `error_count / total_count` where
// `error_count` and `total_count` are existing fields on the data point.
//
// Available Statistics:
//
// * eval_errors -- number of errors evaluating any expressions.
//
type EvalNode struct {
chainnode

Expand Down
5 changes: 5 additions & 0 deletions pipeline/influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const DefaultFlushInterval = time.Second * 10
// .tag('kapacitor', 'true')
// .tag('version', '0.2')
//
// Available Statistics:
//
// * points_written -- number of points written to InfluxDB
// * write_errors -- number of errors attempting to write to InfluxDB
//
type InfluxDBOutNode struct {
node

Expand Down