Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 6254bdc

Browse files
committed
aggregators now need to be able to normalize at runtime
1 parent 81cdbbc commit 6254bdc

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

expr/func_aggregate.go

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (s *FuncAggregate) Exec(cache map[Req][]models.Series) ([]models.Series, er
4848
return series, nil
4949
}
5050
out := pointSlicePool.Get().([]schema.Point)
51+
series = normalize(series)
5152
s.agg.function(series, &out)
5253

5354
// The tags for the aggregated series is only the tags that are

expr/func_groupbytags.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
130130
newSeries.SetTags()
131131

132132
newSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
133-
133+
group.s = normalize(group.s)
134134
aggFunc(group.s, &newSeries.Datapoints)
135135
cache[Req{}] = append(cache[Req{}], newSeries)
136136

expr/normalize.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package expr
2+
3+
import (
4+
"github.com/grafana/metrictank/api/models"
5+
"github.com/grafana/metrictank/consolidation"
6+
"github.com/grafana/metrictank/schema"
7+
"github.com/grafana/metrictank/util"
8+
)
9+
10+
// normalize normalizes series to the same common LCM interval - if they don't already have the same interval
11+
func normalize(in []models.Series) []models.Series {
12+
var intervals []uint32
13+
for _, s := range in {
14+
intervals = append(intervals, s.Interval)
15+
}
16+
lcm := util.Lcm(intervals)
17+
for i, s := range in {
18+
if s.Interval != lcm {
19+
// we need to copy the datapoints first because the consolidater will reuse the input slice
20+
datapoints := pointSlicePool.Get().([]schema.Point)
21+
datapoints = append(datapoints, s.Datapoints...)
22+
consolidation.Consolidate(datapoints, lcm/s.Interval, s.Consolidator) // TODO: not sure if we should use s.Consolidator here
23+
in[i].Datapoints = datapoints
24+
}
25+
}
26+
return in
27+
}

0 commit comments

Comments
 (0)