Skip to content

Commit

Permalink
Merge pull request #2474 from influxdata/flattenbarrier
Browse files Browse the repository at this point in the history
fix: add barrier handling to flatten
  • Loading branch information
docmerlin authored Feb 5, 2021
2 parents 23e1d6a + b442188 commit 0c6a14b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 8 deletions.
3 changes: 1 addition & 2 deletions edge/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (ec *consumer) Consume() error {
return err
}
case BufferedBatchMessage:
err := receiveBufferedBatch(ec.r, m)
if err != nil {
if err := receiveBufferedBatch(ec.r, m); err != nil {
return err
}
case PointMessage:
Expand Down
37 changes: 34 additions & 3 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type flattenBuffer struct {
func (b *flattenBuffer) BeginBatch(begin edge.BeginBatchMessage) error {
b.n.timer.Start()
defer b.n.timer.Stop()

b.name = begin.Name()
b.time = time.Time{}
if s := begin.SizeHint(); s > cap(b.points) {
Expand Down Expand Up @@ -122,7 +121,6 @@ func (b *flattenBuffer) EndBatch(end edge.EndBatchMessage) error {
}
b.points = b.points[0:0]
}

b.n.timer.Pause()
err := edge.Forward(b.n.outs, end)
b.n.timer.Resume()
Expand Down Expand Up @@ -153,6 +151,10 @@ func (b *flattenBuffer) Point(p edge.PointMessage) error {
b.groupInfo.Tags,
t,
)
// update the time
if t.After(b.time) {
b.time = t
}
b.n.timer.Pause()
err = edge.Forward(b.n.outs, flatP)
b.n.timer.Resume()
Expand All @@ -178,11 +180,40 @@ func (b *flattenBuffer) addPoint(p edge.FieldsTagsTimeGetter) (next time.Time, f
}

func (b *flattenBuffer) Barrier(barrier edge.BarrierMessage) error {
return edge.Forward(b.n.outs, barrier)
b.n.timer.Start()
defer b.n.timer.Stop()

if barrier.Time().After(b.time) && len(b.points) > 0 {
fields, err := b.n.flatten(b.points)
if err != nil {
return err
}
msg := edge.NewPointMessage(
b.name, "", "",
b.groupInfo.Dimensions,
fields,
b.groupInfo.Tags,
b.time,
)
b.n.timer.Pause()
err = edge.Forward(b.n.outs, msg)
b.n.timer.Resume()
if err != nil {
return err
}
b.points = b.points[0:0]
}
b.n.timer.Pause()
err := edge.Forward(b.n.outs, barrier)
b.n.timer.Resume()
return err
}

func (b *flattenBuffer) DeleteGroup(d edge.DeleteGroupMessage) error {
b.points = b.points[0:0]
return edge.Forward(b.n.outs, d)
}

func (b *flattenBuffer) Done() {}

func (n *FlattenNode) flatten(points []edge.FieldsTagsTimeGetter) (models.Fields, error) {
Expand Down
4 changes: 3 additions & 1 deletion group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (n *GroupByNode) BeginBatch(begin edge.BeginBatchMessage) error {
n.timer.Start()
defer n.timer.Stop()

n.emit(begin.Time())
if err := n.emit(begin.Time()); err != nil {
return err
}

n.begin = begin
n.dimensions = begin.Dimensions()
Expand Down
5 changes: 3 additions & 2 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/influxdb"
Expand Down Expand Up @@ -112,8 +113,8 @@ func compareResultsIgnoreSeriesOrder(exp, got models.Result) (bool, string) {
if !reflect.DeepEqual(exp.Series[i].Columns, got.Series[j].Columns) {
return false, fmt.Sprintf("unexpected series columns: i: %d \nexp %v \ngot %v", i, exp.Series[i].Columns, got.Series[j].Columns)
}
if !reflect.DeepEqual(exp.Series[i].Values, got.Series[j].Values) {
return false, fmt.Sprintf("unexpected series values: i: %d \nexp %v \ngot %v", i, exp.Series[i].Values, got.Series[j].Values)
if !cmp.Equal(exp.Series[i].Values, got.Series[j].Values) {
return false, fmt.Sprintf("unexpected series values: i: %d \n %s", i, cmp.Diff(exp.Series[i].Values, got.Series[j].Values))
}
}
return true, ""
Expand Down

0 comments on commit 0c6a14b

Please sign in to comment.