Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit e9e09da

Browse files
committed
Better implementation of Aggregator ingestFrom. Fixes failing test.
1 parent 4819abf commit e9e09da

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

mdata/aggregator.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Aggregator struct {
2727
sumMetric *AggMetric
2828
cntMetric *AggMetric
2929
lstMetric *AggMetric
30+
ingestFrom uint32
3031
}
3132

3233
func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, ret conf.Retention, agg conf.Aggregation, dropFirstChunk bool, ingestFrom int64) *Aggregator {
@@ -38,36 +39,39 @@ func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey,
3839
span: span,
3940
agg: NewAggregation(),
4041
}
42+
if ingestFrom > 0 {
43+
aggregator.ingestFrom = AggBoundary(uint32(ingestFrom), ret.ChunkSpan) - span
44+
}
4145
for _, agg := range agg.AggregationMethod {
4246
switch agg {
4347
case conf.Avg:
4448
if aggregator.sumMetric == nil {
4549
key.Archive = schema.NewArchive(schema.Sum, span)
46-
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
50+
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
4751
}
4852
if aggregator.cntMetric == nil {
4953
key.Archive = schema.NewArchive(schema.Cnt, span)
50-
aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
54+
aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
5155
}
5256
case conf.Sum:
5357
if aggregator.sumMetric == nil {
5458
key.Archive = schema.NewArchive(schema.Sum, span)
55-
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
59+
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
5660
}
5761
case conf.Lst:
5862
if aggregator.lstMetric == nil {
5963
key.Archive = schema.NewArchive(schema.Lst, span)
60-
aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
64+
aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
6165
}
6266
case conf.Max:
6367
if aggregator.maxMetric == nil {
6468
key.Archive = schema.NewArchive(schema.Max, span)
65-
aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
69+
aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
6670
}
6771
case conf.Min:
6872
if aggregator.minMetric == nil {
6973
key.Archive = schema.NewArchive(schema.Min, span)
70-
aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, ingestFrom)
74+
aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk, 0)
7175
}
7276
}
7377
}
@@ -98,6 +102,9 @@ func (agg *Aggregator) flush() {
98102
// Add adds the point to the in-progress aggregation, and flushes it if we reached the boundary
99103
// points going back in time are accepted, unless they go into a previous bucket, in which case they are ignored
100104
func (agg *Aggregator) Add(ts uint32, val float64) {
105+
if ts <= agg.ingestFrom {
106+
return
107+
}
101108
boundary := AggBoundary(ts, agg.span)
102109

103110
if boundary == agg.currentBoundary {

0 commit comments

Comments
 (0)