Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit a966630

Browse files
committed
bugfix: mergeSeries should normalize its inputs. fix #1673
1 parent 586e30f commit a966630

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

api/dataprocessor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/grafana/metrictank/expr"
13+
1214
"github.com/grafana/metrictank/api/models"
1315
"github.com/grafana/metrictank/consolidation"
1416
"github.com/grafana/metrictank/mdata"
@@ -624,7 +626,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageSta
624626
// each first uniquely-identified series's backing datapoints slice is reused
625627
// any subsequent non-uniquely-identified series is merged into the former and has its
626628
// datapoints slice returned to the pool. input series must be canonical
627-
func mergeSeries(in []models.Series) []models.Series {
629+
func mergeSeries(in []models.Series, data map[expr.Req][]models.Series) []models.Series {
628630
type segment struct {
629631
target string
630632
query string
@@ -656,6 +658,7 @@ func mergeSeries(in []models.Series) []models.Series {
656658
// we use the first series in the list as our result. We check over every
657659
// point and if it is null, we then check the other series for a non null
658660
// value to use instead.
661+
series = expr.Normalize(data, series)
659662
log.Debugf("DP mergeSeries: %s has multiple series.", series[0].Target)
660663
for i := range series[0].Datapoints {
661664
for j := 0; j < len(series); j++ {

api/dataprocessor_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/grafana/metrictank/cluster"
1212
"github.com/grafana/metrictank/conf"
1313
"github.com/grafana/metrictank/consolidation"
14+
"github.com/grafana/metrictank/expr"
1415
"github.com/grafana/metrictank/mdata"
1516
"github.com/grafana/metrictank/mdata/cache"
1617
"github.com/grafana/metrictank/mdata/cache/accnt"
@@ -628,7 +629,7 @@ func TestMergeSeries(t *testing.T) {
628629
Interval: 10,
629630
})
630631

631-
merged := mergeSeries(out)
632+
merged := mergeSeries(out, expr.NewDataMap())
632633
if len(merged) != 5 {
633634
t.Errorf("Expected data to be merged down to 5 series. got %d instead", len(merged))
634635
}

api/graphite.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,13 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
779779
meta.RenderStats.GetTargetsDuration = b.Sub(a)
780780
meta.StorageStats.Trace(span)
781781

782-
out = mergeSeries(out)
782+
// this map will contain all series that we will feed into the processing chain or generated therein:
783+
// * fetched series, grouped by their expr.Req, such that expr.FuncGet can find the data it needs and feed it into subsequent expr.GraphiteFunc functions
784+
// * additional series generated while handling the request (e.g. function processing, normalization), keyed by an empty expr.Req (such that can't be mistakenly picked up by FuncGet)
785+
// all of these series will need to be returned to the pool once we're done with all processing and have generated our response body, which will happen in plan.Clean()
786+
data := make(map[expr.Req][]models.Series)
787+
788+
out = mergeSeries(out, data)
783789

784790
if len(metaTagEnrichmentData) > 0 {
785791
for i := range out {
@@ -792,7 +798,6 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
792798
// instead of waiting for all data to come in and then start processing everything, we could consider starting processing earlier, at the risk of doing needless work
793799
// if we need to cancel the request due to a fetch error
794800

795-
data := make(map[expr.Req][]models.Series)
796801
for _, serie := range out {
797802
q := expr.NewReqFromSerie(serie)
798803
data[q] = append(data[q], serie)

0 commit comments

Comments
 (0)