Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 21d1dcd

Browse files
authored
Merge pull request #951 from grafana/refactor-alignrequests
remove excessive request alignment, add MDP optimization and pre-normalisation
2 parents 2ce80c9 + 2f9369e commit 21d1dcd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2118
-863
lines changed

api/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/grafana/globalconf"
11+
"github.com/grafana/metrictank/expr"
1112
log "github.com/sirupsen/logrus"
1213
)
1314

@@ -28,6 +29,7 @@ var (
2829
getTargetsConcurrency int
2930
tagdbDefaultLimit uint
3031
speculationThreshold float64
32+
optimizations expr.Optimizations
3133

3234
graphiteProxy *httputil.ReverseProxy
3335
timeZone *time.Location
@@ -49,6 +51,8 @@ func ConfigSetup() {
4951
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
5052
apiCfg.UintVar(&tagdbDefaultLimit, "tagdb-default-limit", 100, "default limit for tagdb query results, can be overridden with query parameter \"limit\"")
5153
apiCfg.Float64Var(&speculationThreshold, "speculation-threshold", 1, "ratio of peer responses after which speculation is used. Set to 1 to disable.")
54+
apiCfg.BoolVar(&optimizations.PreNormalization, "pre-normalization", true, "enable pre-normalization optimization")
55+
apiCfg.BoolVar(&optimizations.MDP, "mdp-optimization", false, "enable MaxDataPoints optimization (experimental)")
5256
globalconf.Register("http", apiCfg, flag.ExitOnError)
5357
}
5458

api/dataprocessor.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ LOOP:
329329

330330
}
331331

332-
// getTarget returns the series for the request in canonical form.
332+
// getTarget returns the series for the request in canonical form with respect to their OutInterval
333333
// as ConsolidateContext just processes what it's been given (not "stable" or bucket-aligned to the output interval)
334334
// we simply make sure to pass it the right input such that the output is canonical.
335335
func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req models.Req) (out models.Series, err error) {
@@ -352,6 +352,8 @@ func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req mod
352352
QueryTo: req.To,
353353
QueryCons: req.ConsReq,
354354
Consolidator: req.Consolidator,
355+
QueryMDP: req.MaxPoints,
356+
QueryPNGroup: req.PNGroup,
355357
Meta: []models.SeriesMetaProperties{
356358
{
357359
// note that for simplicity, we pretend that a read of rollup avg data is a read of 1 "avg series"
@@ -405,7 +407,7 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
405407
log.Debugf("DP load from %-6s %20s %d - %d (%s - %s) span:%ds", typ, key, from, to, util.TS(from), util.TS(to), to-from-1)
406408
}
407409

408-
// getSeriesFixed fetches the series and returns it in quantized, pre-canonical form.
410+
// getSeriesFixed fetches the series and returns it in quantized, pre-canonical form with respect to their OutInterval
409411
// TODO: we can probably forego Fix if archive > 0, because only raw chunks are not quantized yet.
410412
// the requested consolidator is the one that will be used for selecting the archive to read from
411413
func (s *Server) getSeriesFixed(ctx context.Context, ss *models.StorageStats, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
@@ -617,17 +619,19 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageSta
617619
return iters, nil
618620
}
619621

620-
// check for duplicate series names for the same query. If found merge the results.
622+
// check for duplicate series names for the same query target. If found merge the results.
621623
// each first uniquely-identified series's backing datapoints slice is reused
622624
// any subsequent non-uniquely-identified series is merged into the former and has its
623625
// datapoints slice returned to the pool. input series must be canonical
624626
func mergeSeries(in []models.Series) []models.Series {
625627
type segment struct {
626-
target string
627-
query string
628-
from uint32
629-
to uint32
630-
con consolidation.Consolidator
628+
target string
629+
query string
630+
from uint32
631+
to uint32
632+
con consolidation.Consolidator
633+
mdp uint32
634+
pngroup models.PNGroup
631635
}
632636
seriesByTarget := make(map[segment][]models.Series)
633637
for _, series := range in {
@@ -637,6 +641,8 @@ func mergeSeries(in []models.Series) []models.Series {
637641
series.QueryFrom,
638642
series.QueryTo,
639643
series.Consolidator,
644+
series.QueryMDP,
645+
series.QueryPNGroup,
640646
}
641647
seriesByTarget[s] = append(seriesByTarget[s], series)
642648
}
@@ -767,7 +773,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
767773
// if the series has some excess at the end, it may aggregate into a bucket with a timestamp out of the desired range.
768774
// for example: imagine we take the case from above, and the user specified a `to` of 115.
769775
// a native 30s series would end with point 90. We should not include any points that would go into an aggregation bucket with timestamp higher than 90.
770-
// (such as 100 or 110 which would technically be allowed by the `to` specification)
776+
// (such as 100 or 110 which would technically be allowed by the `to` specification but land in the bucket with ts=120 which is out of bounds)
771777
// so the proper to value is the highest value that does not result in points going into an out-of-bounds bucket.
772778

773779
// example: for 10s data (note that the 2 last colums should always match!)

api/dataprocessor_test.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func TestGetSeriesFixed(t *testing.T) {
390390
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
391391
metric.Add(30+offset, 40) // this point will always be quantized to 40
392392
metric.Add(40+offset, 50) // this point will always be quantized to 50
393-
req := models.NewReq(id, "", "", from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
393+
req := models.NewReq(id, "", "", from, to, 1000, 10, 0, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
394394
req.Archive = 0
395395
req.ArchInterval = 10
396396
points, err := srv.getSeriesFixed(test.NewContext(), &models.StorageStats{}, req, consolidation.None)
@@ -582,7 +582,7 @@ func TestGetSeriesFixedVariableOutInterval(t *testing.T) {
582582
metric.Add(dataPoint.Ts, dataPoint.Val)
583583
}
584584

585-
req := models.NewReq(id, "", "", testCase.from, testCase.to, 1000, testCase.archInterval, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
585+
req := models.NewReq(id, "", "", testCase.from, testCase.to, 1000, testCase.archInterval, 0, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
586586
req.Archive = 0
587587
req.ArchInterval = testCase.archInterval
588588
req.OutInterval = testCase.outInterval
@@ -598,19 +598,10 @@ func TestGetSeriesFixedVariableOutInterval(t *testing.T) {
598598
}
599599

600600
func reqRaw(key schema.MKey, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16) models.Req {
601-
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
601+
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, 0, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
602602
req.Archive = 0
603603
return req
604604
}
605-
func reqOut(key schema.MKey, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16, archive uint8, archInterval, ttl, outInterval, aggNum uint32) models.Req {
606-
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
607-
req.Archive = archive
608-
req.ArchInterval = archInterval
609-
req.TTL = ttl
610-
req.OutInterval = outInterval
611-
req.AggNum = aggNum
612-
return req
613-
}
614605

615606
func TestMergeSeries(t *testing.T) {
616607
out := make([]models.Series, 0)

api/graphite.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,13 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
225225
// as graphite needs high-res data to perform its processing.
226226
mdp = 0
227227
}
228-
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, nil)
228+
229+
opts, err := optimizations.ApplyUserPrefs(request.Optimizations)
230+
if err != nil {
231+
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
232+
return
233+
}
234+
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, opts)
229235
if err != nil {
230236
if fun, ok := err.(expr.ErrUnknownFunction); ok {
231237
if request.NoProxy {
@@ -661,7 +667,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
661667

662668
minFrom := uint32(math.MaxUint32)
663669
var maxTo uint32
664-
var reqs []models.Req
670+
reqs := NewReqMap()
665671
metaTagEnrichmentData := make(map[string]tagquery.Tags)
666672

667673
// note that different patterns to query can have different from / to, so they require different index lookups
@@ -686,7 +692,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
686692
if err != nil {
687693
return nil, meta, err
688694
}
689-
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-len(reqs), false)
695+
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-int(reqs.cnt), false)
690696
} else {
691697
series, err = s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From))
692698
}
@@ -718,9 +724,9 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
718724
cons = closestAggMethod(consReq, mdata.Aggregations.Get(archive.AggId).AggregationMethod)
719725
}
720726

721-
newReq := models.NewReq(
722-
archive.Id, archive.NameWithTags(), r.Query, r.From, r.To, plan.MaxDataPoints, uint32(archive.Interval), cons, consReq, s.Node, archive.SchemaId, archive.AggId)
723-
reqs = append(reqs, newReq)
727+
newReq := r.ToModel()
728+
newReq.Init(archive, cons, s.Node)
729+
reqs.Add(newReq)
724730
}
725731

726732
if tagquery.MetaTagSupport && len(metric.Defs) > 0 && len(metric.MetaTags) > 0 {
@@ -739,31 +745,35 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
739745
default:
740746
}
741747

742-
reqRenderSeriesCount.Value(len(reqs))
743-
if len(reqs) == 0 {
748+
reqRenderSeriesCount.ValueUint32(reqs.cnt)
749+
if reqs.cnt == 0 {
744750
return nil, meta, nil
745751
}
746752

747-
meta.RenderStats.SeriesFetch = uint32(len(reqs))
753+
meta.RenderStats.SeriesFetch = reqs.cnt
748754

749755
// note: if 1 series has a movingAvg that requires a long time range extension, it may push other reqs into another archive. can be optimized later
750756
var err error
751-
reqs, meta.RenderStats.PointsFetch, meta.RenderStats.PointsReturn, err = alignRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs)
757+
var rp *ReqsPlan
758+
rp, err = planRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs, plan.MaxDataPoints, maxPointsPerReqSoft, maxPointsPerReqHard)
752759
if err != nil {
753-
log.Errorf("HTTP Render alignReq error: %s", err.Error())
754760
return nil, meta, err
755761
}
762+
meta.RenderStats.PointsFetch = rp.PointsFetch()
763+
meta.RenderStats.PointsReturn = rp.PointsReturn(plan.MaxDataPoints)
764+
reqsList := rp.List()
765+
756766
span := opentracing.SpanFromContext(ctx)
757-
span.SetTag("num_reqs", len(reqs))
767+
span.SetTag("num_reqs", len(reqsList))
758768
span.SetTag("points_fetch", meta.RenderStats.PointsFetch)
759769
span.SetTag("points_return", meta.RenderStats.PointsReturn)
760770

761-
for _, req := range reqs {
771+
for _, req := range reqsList {
762772
log.Debugf("HTTP Render %s - arch:%d archI:%d outI:%d aggN: %d from %s", req, req.Archive, req.ArchInterval, req.OutInterval, req.AggNum, req.Node.GetName())
763773
}
764774

765775
a := time.Now()
766-
out, err := s.getTargets(ctx, &meta.StorageStats, reqs)
776+
out, err := s.getTargets(ctx, &meta.StorageStats, reqsList)
767777
if err != nil {
768778
log.Errorf("HTTP Render %s", err.Error())
769779
return nil, meta, err
@@ -787,7 +797,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
787797

788798
data := make(map[expr.Req][]models.Series)
789799
for _, serie := range out {
790-
q := expr.NewReq(serie.QueryPatt, serie.QueryFrom, serie.QueryTo, serie.QueryCons)
800+
q := expr.NewReqFromSerie(serie)
791801
data[q] = append(data[q], serie)
792802
}
793803

@@ -1410,7 +1420,12 @@ func (s *Server) showPlan(ctx *middleware.Context, request models.GraphiteRender
14101420
stable := request.Process == "stable"
14111421
mdp := request.MaxDataPoints
14121422

1413-
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, nil)
1423+
opts, err := optimizations.ApplyUserPrefs(request.Optimizations)
1424+
if err != nil {
1425+
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
1426+
return
1427+
}
1428+
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, opts)
14141429
if err != nil {
14151430
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
14161431
return

0 commit comments

Comments
 (0)