Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit f1fb97d

Browse files
authored
Merge pull request #1425 from grafana/flush-boundary-in-both-cases
when adding new point to aggregate, always see if we can flush
2 parents e2c64e2 + 479aceb commit f1fb97d

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

mdata/aggmetric_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ func TestGetAggregated(t *testing.T) {
407407
{Val: 21 + 22, Ts: 25},
408408
{Val: 30, Ts: 30},
409409
{Val: 31 + 32, Ts: 35},
410+
{Val: 40, Ts: 40},
410411
}
411412
assertPointsEqual(t, got, expected)
412413
}
@@ -454,6 +455,7 @@ func TestGetAggregatedIngestFrom(t *testing.T) {
454455
expected := []schema.Point{
455456
{Val: 26 + 30, Ts: 30},
456457
{Val: 31 + 32, Ts: 35},
458+
{Val: 40, Ts: 40},
457459
}
458460
assertPointsEqual(t, got, expected)
459461
}

mdata/aggregator.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,26 @@ func (agg *Aggregator) flush() {
100100
func (agg *Aggregator) Add(ts uint32, val float64) {
101101
boundary := AggBoundary(ts, agg.span)
102102

103-
if boundary == agg.currentBoundary {
104-
agg.agg.Add(val)
105-
if ts == boundary {
106-
agg.flush()
107-
}
103+
if boundary < agg.currentBoundary {
104+
// ignore the point it was for a previous bucket. we can't process it
105+
return
108106
} else if boundary > agg.currentBoundary {
109-
// store current totals as a new point in their series
107+
// point is for a more recent bucket
108+
// store current aggregates as a new point in their series and start the new bucket
110109
// if the cnt is still 0, the numbers are invalid, not to be flushed and we can simply reuse the aggregation
111110
if agg.agg.Cnt != 0 {
112111
agg.flush()
113112
}
114113
agg.currentBoundary = boundary
115-
agg.agg.Add(val)
114+
}
115+
agg.agg.Add(val)
116+
117+
// if the ts of the point is a boundary, it means no more point can possibly come in for the same aggregation.
118+
// e.g. if aggspan is 10s and we're adding a point with timestamp 12:34:10, then any subsequent point will go
119+
// in the bucket for 12:34:20 or later.
120+
// so it is time to flush the result
121+
if ts == boundary {
122+
agg.flush()
116123
}
117124
}
118125

0 commit comments

Comments
 (0)