Skip to content

Commit

Permalink
fixes #387 #400 #401
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 4, 2016
1 parent e30ec41 commit f495dee
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 179 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ format a TICKscript file according to a common standard.
- [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases.
- [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions.
- [#381](https://github.com/influxdata/kapacitor/pull/381): Adding enable/disable/delete/reload tasks by glob.

- [#401](https://github.com/influxdata/kapacitor/issues/401): Add `.align()` property to BatchNode so you can align query start and stop times.

### Bugfixes

- [#378](https://github.com/influxdata/kapacitor/issues/378): Fix issue where derivative would divide by zero.
- [#387](https://github.com/influxdata/kapacitor/issues/387): Add `.quiet()` option to EvalNode so errors can be suppressed if expected.
- [#400](https://github.com/influxdata/kapacitor/issues/400): All query/connection errors are counted and reported in BatchNode stats.
- [#412](https://github.com/influxdata/kapacitor/pull/412): Fix issues with batch queries dropping points because of nil fields.


## v0.11.0 [2016-03-22]
Expand Down
88 changes: 64 additions & 24 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (s *SourceBatchNode) collectedCount() (count int64) {
return
}

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
)

type BatchNode struct {
node
b *pipeline.BatchNode
Expand Down Expand Up @@ -160,23 +165,7 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*Bat
}
switch {
case n.Every != 0:
bn.ticker = newTimeTicker(n.Every)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
if err != nil {
return nil, err
}
default:
return nil, errors.New("must define one of 'every' or 'cron'")
}

if n.Every != 0 && n.Cron != "" {
return nil, errors.New("must not set both 'every' and 'cron' properties")
}
switch {
case n.Every != 0:
bn.ticker = newTimeTicker(n.Every)
bn.ticker = newTimeTicker(n.Every, n.AlignFlag)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
Expand Down Expand Up @@ -270,6 +259,7 @@ func (b *BatchNode) doQuery() error {
if err != nil {
b.logger.Println("E! failed to connect to InfluxDB:", err)
b.timer.Stop()
b.statMap.Add(statsConnectErrors, 1)
break
}
q := client.Query{
Expand All @@ -281,12 +271,14 @@ func (b *BatchNode) doQuery() error {
if err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
break
}

if err := resp.Error(); err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
break
}

Expand All @@ -295,6 +287,7 @@ func (b *BatchNode) doQuery() error {
batches, err := models.ResultToBatches(res)
if err != nil {
b.logger.Println("E! failed to understand query result:", err)
b.statMap.Add(statsQueryErrors, 1)
continue
}
b.timer.Pause()
Expand All @@ -312,6 +305,8 @@ func (b *BatchNode) doQuery() error {
}

func (b *BatchNode) runBatch([]byte) error {
b.statMap.Add(statsQueryErrors, 0)
b.statMap.Add(statsConnectErrors, 0)
errC := make(chan error, 1)
go func() {
defer func() {
Expand Down Expand Up @@ -371,20 +366,61 @@ type ticker interface {
}

type timeTicker struct {
every time.Duration
ticker *time.Ticker
mu sync.Mutex
every time.Duration
alignChan chan time.Time
stopping chan struct{}
ticker *time.Ticker
mu sync.Mutex
wg sync.WaitGroup
}

func newTimeTicker(every time.Duration) *timeTicker {
return &timeTicker{every: every}
func newTimeTicker(every time.Duration, align bool) *timeTicker {
t := &timeTicker{
every: every,
}
if align {
t.alignChan = make(chan time.Time)
t.stopping = make(chan struct{})
}
return t
}

func (t *timeTicker) Start() <-chan time.Time {
t.mu.Lock()
defer t.mu.Unlock()
t.ticker = time.NewTicker(t.every)
return t.ticker.C
if t.alignChan != nil {
t.wg.Add(1)
go func() {
defer t.wg.Done()
// Sleep until we are roughly aligned
now := time.Now()
next := now.Truncate(t.every).Add(t.every)
after := time.NewTicker(next.Sub(now))
select {
case <-after.C:
after.Stop()
case <-t.stopping:
after.Stop()
return
}
t.ticker = time.NewTicker(t.every)
// Send first event since we waited for it explicitly
t.alignChan <- next
for {
select {
case <-t.stopping:
return
case now := <-t.ticker.C:
now = now.Truncate(t.every)
t.alignChan <- now
}
}
}()
return t.alignChan
} else {
t.ticker = time.NewTicker(t.every)
return t.ticker.C
}
}

func (t *timeTicker) Stop() {
Expand All @@ -393,6 +429,10 @@ func (t *timeTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
}
if t.alignChan != nil {
close(t.stopping)
}
t.wg.Wait()
}

func (t *timeTicker) Next(now time.Time) time.Time {
Expand Down
70 changes: 43 additions & 27 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ import (
"log"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick"
)

const (
statsEvalErrors = "eval_errors"
)

type EvalNode struct {
node
e *pipeline.EvalNode
expressions []*tick.StatefulExpr
evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -36,15 +42,13 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
e.evalErrors = &expvar.Int{}
e.statMap.Set(statsEvalErrors, e.evalErrors)
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
p.Fields = fields
p.Fields = e.eval(p.Time, p.Fields, p.Tags)
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
Expand All @@ -57,11 +61,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i, p := range b.Points {
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
b.Points[i].Fields = fields
b.Points[i].Fields = e.eval(p.Time, p.Fields, p.Tags)
}
e.timer.Stop()
for _, child := range e.outs {
Expand All @@ -75,27 +75,35 @@ func (e *EvalNode) runEval(snapshot []byte) error {
return nil
}

func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) {
func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields {
vars, err := mergeFieldsAndTags(now, fields, tags)
if err != nil {
return nil, err
e.logger.Println("E!", err)
return nil
}
for i, expr := range e.expressions {
v, err := expr.EvalNum(vars)
if err != nil {
return nil, err
if v, err := expr.EvalNum(vars); err == nil {
name := e.e.AsList[i]
vars.Set(name, v)
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
name := e.e.AsList[i]
vars.Set(name, v)
}
var newFields models.Fields
if e.e.KeepFlag {
if l := len(e.e.KeepList); l != 0 {
newFields = make(models.Fields, l)
for _, f := range e.e.KeepList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
} else {
Expand All @@ -104,20 +112,28 @@ func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]str
newFields[f] = v
}
for _, f := range e.e.AsList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
}
} else {
newFields = make(models.Fields, len(e.e.AsList))
for _, f := range e.e.AsList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
}
return newFields, nil
return newFields
}
19 changes: 19 additions & 0 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"expvar"
"fmt"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -174,6 +175,24 @@ func (v *Map) Do(f func(expvar.KeyValue)) {
v.doLocked(f)
}

// DoSorted calls f for each entry in the map in sorted order.
// The map is locked during the iteration,
// but existing entries may be concurrently updated.
func (v *Map) DoSorted(f func(expvar.KeyValue)) {
v.mu.RLock()
defer v.mu.RUnlock()
keys := make([]string, len(v.m))
i := 0
for key := range v.m {
keys[i] = key
i++
}
sort.Strings(keys)
for _, k := range keys {
f(expvar.KeyValue{k, v.m[k]})
}
}

// doLocked calls f for each entry in the map.
// v.mu must be held for reads.
func (v *Map) doLocked(f func(expvar.KeyValue)) {
Expand Down
Loading

0 comments on commit f495dee

Please sign in to comment.