Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a3706f6

Browse files
authoredFeb 13, 2020
Merge pull request #1674 from grafana/mergeSeries-runtime-normalize
fix: mergeSeries() should normalize its inputs + normalize() bugfix + cleanups
2 parents bbe0c21 + 644b756 commit a3706f6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+232
-181
lines changed
 

‎api/dataprocessor.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/grafana/metrictank/expr"
13+
1214
"github.com/grafana/metrictank/api/models"
1315
"github.com/grafana/metrictank/consolidation"
1416
"github.com/grafana/metrictank/mdata"
@@ -619,11 +621,12 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageSta
619621
return iters, nil
620622
}
621623

622-
// check for duplicate series names for the same query target. If found merge the results.
624+
// mergeSeries merges series together if applicable. It does this by categorizing
625+
// series into groups based on their target, query, consolidator etc. If they collide, they get merged.
623626
// each first uniquely-identified series's backing datapoints slice is reused
624627
// any subsequent non-uniquely-identified series is merged into the former and has its
625628
// datapoints slice returned to the pool. input series must be canonical
626-
func mergeSeries(in []models.Series) []models.Series {
629+
func mergeSeries(in []models.Series, dataMap expr.DataMap) []models.Series {
627630
type segment struct {
628631
target string
629632
query string
@@ -655,6 +658,7 @@ func mergeSeries(in []models.Series) []models.Series {
655658
// we use the first series in the list as our result. We check over every
656659
// point and if it is null, we then check the other series for a non null
657660
// value to use instead.
661+
series = expr.Normalize(dataMap, series)
658662
log.Debugf("DP mergeSeries: %s has multiple series.", series[0].Target)
659663
for i := range series[0].Datapoints {
660664
for j := 0; j < len(series); j++ {

‎api/dataprocessor_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/grafana/metrictank/cluster"
1212
"github.com/grafana/metrictank/conf"
1313
"github.com/grafana/metrictank/consolidation"
14+
"github.com/grafana/metrictank/expr"
1415
"github.com/grafana/metrictank/mdata"
1516
"github.com/grafana/metrictank/mdata/cache"
1617
"github.com/grafana/metrictank/mdata/cache/accnt"
@@ -628,7 +629,7 @@ func TestMergeSeries(t *testing.T) {
628629
Interval: 10,
629630
})
630631

631-
merged := mergeSeries(out)
632+
merged := mergeSeries(out, expr.NewDataMap())
632633
if len(merged) != 5 {
633634
t.Errorf("Expected data to be merged down to 5 series. got %d instead", len(merged))
634635
}

0 commit comments

Comments
 (0)
This repository has been archived.