diff --git a/edge/consumer.go b/edge/consumer.go index 11ba44f54..64f12b5ec 100644 --- a/edge/consumer.go +++ b/edge/consumer.go @@ -54,8 +54,7 @@ func (ec *consumer) Consume() error { return err } case BufferedBatchMessage: - err := receiveBufferedBatch(ec.r, m) - if err != nil { + if err := receiveBufferedBatch(ec.r, m); err != nil { return err } case PointMessage: diff --git a/flatten.go b/flatten.go index 57ee64bb1..0afa264fd 100644 --- a/flatten.go +++ b/flatten.go @@ -61,7 +61,6 @@ type flattenBuffer struct { func (b *flattenBuffer) BeginBatch(begin edge.BeginBatchMessage) error { b.n.timer.Start() defer b.n.timer.Stop() - b.name = begin.Name() b.time = time.Time{} if s := begin.SizeHint(); s > cap(b.points) { @@ -122,7 +121,6 @@ func (b *flattenBuffer) EndBatch(end edge.EndBatchMessage) error { } b.points = b.points[0:0] } - b.n.timer.Pause() err := edge.Forward(b.n.outs, end) b.n.timer.Resume() @@ -153,6 +151,10 @@ func (b *flattenBuffer) Point(p edge.PointMessage) error { b.groupInfo.Tags, t, ) + // update the time + if t.After(b.time) { + b.time = t + } b.n.timer.Pause() err = edge.Forward(b.n.outs, flatP) b.n.timer.Resume() @@ -178,11 +180,40 @@ func (b *flattenBuffer) addPoint(p edge.FieldsTagsTimeGetter) (next time.Time, f } func (b *flattenBuffer) Barrier(barrier edge.BarrierMessage) error { - return edge.Forward(b.n.outs, barrier) + b.n.timer.Start() + defer b.n.timer.Stop() + + if barrier.Time().After(b.time) && len(b.points) > 0 { + fields, err := b.n.flatten(b.points) + if err != nil { + return err + } + msg := edge.NewPointMessage( + b.name, "", "", + b.groupInfo.Dimensions, + fields, + b.groupInfo.Tags, + b.time, + ) + b.n.timer.Pause() + err = edge.Forward(b.n.outs, msg) + b.n.timer.Resume() + if err != nil { + return err + } + b.points = b.points[0:0] + } + b.n.timer.Pause() + err := edge.Forward(b.n.outs, barrier) + b.n.timer.Resume() + return err } + func (b *flattenBuffer) DeleteGroup(d edge.DeleteGroupMessage) error { + b.points = b.points[0:0] return edge.Forward(b.n.outs, d) } + func (b *flattenBuffer) Done() {} func (n *FlattenNode) flatten(points []edge.FieldsTagsTimeGetter) (models.Fields, error) { diff --git a/group_by.go b/group_by.go index 6dfc3c530..b122e47a2 100644 --- a/group_by.go +++ b/group_by.go @@ -77,7 +77,9 @@ func (n *GroupByNode) BeginBatch(begin edge.BeginBatchMessage) error { n.timer.Start() defer n.timer.Stop() - n.emit(begin.Time()) + if err := n.emit(begin.Time()); err != nil { + return err + } n.begin = begin n.dimensions = begin.Dimensions() diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index d7b73a0fd..f348e38c3 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/influxdb" @@ -112,8 +113,8 @@ func compareResultsIgnoreSeriesOrder(exp, got models.Result) (bool, string) { if !reflect.DeepEqual(exp.Series[i].Columns, got.Series[j].Columns) { return false, fmt.Sprintf("unexpected series columns: i: %d \nexp %v \ngot %v", i, exp.Series[i].Columns, got.Series[j].Columns) } - if !reflect.DeepEqual(exp.Series[i].Values, got.Series[j].Values) { - return false, fmt.Sprintf("unexpected series values: i: %d \nexp %v \ngot %v", i, exp.Series[i].Values, got.Series[j].Values) + if !cmp.Equal(exp.Series[i].Values, got.Series[j].Values) { + return false, fmt.Sprintf("unexpected series values: i: %d \n %s", i, cmp.Diff(exp.Series[i].Values, got.Series[j].Values)) } } return true, ""