Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

remove excessive request alignment, add MDP optimization and pre-normalisation #951

Merged
merged 46 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6e5f050
clarifications/simplification
Dieterbe Dec 2, 2019
29d86ce
WIP: don't over-align + implement MDP-optimization, pre-normalization
Dieterbe Dec 2, 2019
752cd66
simplify NewPlan()
Dieterbe Dec 2, 2019
41bca10
docs
Dieterbe Dec 2, 2019
d1fb28a
aggregators now need to be able to normalize at runtime
Dieterbe Dec 2, 2019
6b5c991
helper to generate combinations of uint32's
Dieterbe Jan 16, 2020
89ad210
rework request planning
Dieterbe Jan 15, 2020
f0f9b17
Loop new request identifiers/differentiators through request path
Dieterbe Jan 16, 2020
5dd295e
make fetch optimizations optional
Dieterbe Jan 17, 2020
0f28be9
mt-explain: support new req information
Dieterbe Jan 17, 2020
531a5f5
clean up expr docs.
Dieterbe Jan 18, 2020
1c17a8a
random clarification related to COW
Dieterbe Jan 19, 2020
8f11677
clarify
Dieterbe Jan 19, 2020
9b2db02
clarify limitations of pre-canonical, canonicalize when runtime norma…
Dieterbe Jan 19, 2020
35a725c
WIP fix tests
Dieterbe Jan 19, 2020
cb220e8
deal with non-pre-canonical normalizing like graphite does
Dieterbe Jan 20, 2020
4a7de6a
add msgp generate directives to request for PNGroup
robert-milan Jan 20, 2020
6e7a145
make PN- and MDP-optimizations separately configurable
Dieterbe Jan 20, 2020
80001d4
make it simpler to use models.Series in tests
Dieterbe Jan 20, 2020
9e7156c
fix dataprocessor tests
Dieterbe Jan 20, 2020
bc73917
Newreq for dataprocessor and
Dieterbe Jan 20, 2020
29a4054
testPlan tests
Dieterbe Jan 20, 2020
c863a42
update consolidaton tests for NewPlan taking optimizations
Dieterbe Jan 20, 2020
8ba064f
getModel(): set interval!
Dieterbe Jan 20, 2020
c041be3
fix TestConsolidateBy
Dieterbe Jan 20, 2020
95d51c8
fix tests: MDP is now used to mark MDP optimizations, don't set it
Dieterbe Jan 21, 2020
83da44d
fix planRequests tests to match new behavior
Dieterbe Jan 21, 2020
1929cb6
better query engine tests
Dieterbe Jan 21, 2020
44fdd93
fix query engine tests
Dieterbe Jan 21, 2020
dd85a3d
Req.Equals should also check TTL. we only used this for unit tests
Dieterbe Jan 22, 2020
ae2ff09
make maxPointsPerReq{Soft,Hard} explicit args, rather than globals
Dieterbe Jan 22, 2020
ca80978
do maxPointsPerReq testing same as other tests
Dieterbe Jan 22, 2020
163e222
tests for normalization
Dieterbe Jan 22, 2020
32d7ce2
expr.NewReq should take PNGroup and MDP fields also
Dieterbe Jan 22, 2020
4c8a573
unit test for planner optimizations
Dieterbe Jan 22, 2020
3054f19
note
Dieterbe Jan 22, 2020
427bb5f
refer to http errors by their name as per contribution docs
Dieterbe Jan 22, 2020
3052eb7
mt-explain docs
Dieterbe Jan 22, 2020
38ba464
cleanup docs and devdocs
Dieterbe Jan 23, 2020
076a490
allow passing optimizations as query parameters
Dieterbe Jan 23, 2020
526b443
asPercent: accommodate an extra case
Dieterbe Jan 24, 2020
635a4bb
sean feedback
Dieterbe Jan 24, 2020
d5c4228
asPercent safe again
Dieterbe Jan 24, 2020
7198dfb
robert feedback
Dieterbe Jan 24, 2020
260c7eb
sean feedback 2
Dieterbe Jan 24, 2020
2f9369e
planRequests() erroring is a user error
Dieterbe Jan 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/globalconf"
"github.com/grafana/metrictank/expr"
log "github.com/sirupsen/logrus"
)

Expand All @@ -28,6 +29,7 @@ var (
getTargetsConcurrency int
tagdbDefaultLimit uint
speculationThreshold float64
optimizations expr.Optimizations

graphiteProxy *httputil.ReverseProxy
timeZone *time.Location
Expand All @@ -49,6 +51,8 @@ func ConfigSetup() {
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
apiCfg.UintVar(&tagdbDefaultLimit, "tagdb-default-limit", 100, "default limit for tagdb query results, can be overridden with query parameter \"limit\"")
apiCfg.Float64Var(&speculationThreshold, "speculation-threshold", 1, "ratio of peer responses after which speculation is used. Set to 1 to disable.")
apiCfg.BoolVar(&optimizations.PreNormalization, "pre-normalization", true, "enable pre-normalization optimization")
apiCfg.BoolVar(&optimizations.MDP, "mdp-optimization", false, "enable MaxDataPoints optimization (experimental)")
globalconf.Register("http", apiCfg, flag.ExitOnError)
}

Expand Down
24 changes: 15 additions & 9 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ LOOP:

}

// getTarget returns the series for the request in canonical form.
// getTarget returns the series for the request in canonical form with respect to their OutInterval
// as ConsolidateContext just processes what it's been given (not "stable" or bucket-aligned to the output interval)
// we simply make sure to pass it the right input such that the output is canonical.
func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req models.Req) (out models.Series, err error) {
Expand All @@ -352,6 +352,8 @@ func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req mod
QueryTo: req.To,
QueryCons: req.ConsReq,
Consolidator: req.Consolidator,
QueryMDP: req.MaxPoints,
QueryPNGroup: req.PNGroup,
Meta: []models.SeriesMetaProperties{
{
// note that for simplicity, we pretend that a read of rollup avg data is a read of 1 "avg series"
Expand Down Expand Up @@ -405,7 +407,7 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
log.Debugf("DP load from %-6s %20s %d - %d (%s - %s) span:%ds", typ, key, from, to, util.TS(from), util.TS(to), to-from-1)
}

// getSeriesFixed fetches the series and returns it in quantized, pre-canonical form.
// getSeriesFixed fetches the series and returns it in quantized, pre-canonical form with respect to their OutInterval
// TODO: we can probably forego Fix if archive > 0, because only raw chunks are not quantized yet.
// the requested consolidator is the one that will be used for selecting the archive to read from
func (s *Server) getSeriesFixed(ctx context.Context, ss *models.StorageStats, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
Expand Down Expand Up @@ -617,17 +619,19 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageSta
return iters, nil
}

// check for duplicate series names for the same query. If found merge the results.
// check for duplicate series names for the same query target. If found merge the results.
// each first uniquely-identified series's backing datapoints slice is reused
// any subsequent non-uniquely-identified series is merged into the former and has its
// datapoints slice returned to the pool. input series must be canonical
func mergeSeries(in []models.Series) []models.Series {
type segment struct {
target string
query string
from uint32
to uint32
con consolidation.Consolidator
target string
query string
from uint32
to uint32
con consolidation.Consolidator
mdp uint32
pngroup models.PNGroup
}
seriesByTarget := make(map[segment][]models.Series)
for _, series := range in {
Expand All @@ -637,6 +641,8 @@ func mergeSeries(in []models.Series) []models.Series {
series.QueryFrom,
series.QueryTo,
series.Consolidator,
series.QueryMDP,
series.QueryPNGroup,
}
seriesByTarget[s] = append(seriesByTarget[s], series)
}
Expand Down Expand Up @@ -767,7 +773,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// if the series has some excess at the end, it may aggregate into a bucket with a timestamp out of the desired range.
// for example: imagine we take the case from above, and the user specified a `to` of 115.
// a native 30s series would end with point 90. We should not include any points that would go into an aggregation bucket with timestamp higher than 90.
// (such as 100 or 110 which would technically be allowed by the `to` specification)
// (such as 100 or 110 which would technically be allowed by the `to` specification but land in the bucket with ts=120 which is out of bounds)
// so the proper to value is the highest value that does not result in points going into an out-of-bounds bucket.

// example: for 10s data (note that the 2 last colums should always match!)
Expand Down
15 changes: 3 additions & 12 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func TestGetSeriesFixed(t *testing.T) {
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
metric.Add(30+offset, 40) // this point will always be quantized to 40
metric.Add(40+offset, 50) // this point will always be quantized to 50
req := models.NewReq(id, "", "", from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
req := models.NewReq(id, "", "", from, to, 1000, 10, 0, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
req.Archive = 0
req.ArchInterval = 10
points, err := srv.getSeriesFixed(test.NewContext(), &models.StorageStats{}, req, consolidation.None)
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestGetSeriesFixedVariableOutInterval(t *testing.T) {
metric.Add(dataPoint.Ts, dataPoint.Val)
}

req := models.NewReq(id, "", "", testCase.from, testCase.to, 1000, testCase.archInterval, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
req := models.NewReq(id, "", "", testCase.from, testCase.to, 1000, testCase.archInterval, 0, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
req.Archive = 0
req.ArchInterval = testCase.archInterval
req.OutInterval = testCase.outInterval
Expand All @@ -598,19 +598,10 @@ func TestGetSeriesFixedVariableOutInterval(t *testing.T) {
}

func reqRaw(key schema.MKey, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16) models.Req {
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, 0, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
req.Archive = 0
return req
}
func reqOut(key schema.MKey, from, to, maxPoints, rawInterval uint32, consolidator consolidation.Consolidator, schemaId, aggId uint16, archive uint8, archInterval, ttl, outInterval, aggNum uint32) models.Req {
req := models.NewReq(key, "", "", from, to, maxPoints, rawInterval, consolidator, 0, cluster.Manager.ThisNode(), schemaId, aggId)
req.Archive = archive
req.ArchInterval = archInterval
req.TTL = ttl
req.OutInterval = outInterval
req.AggNum = aggNum
return req
}

func TestMergeSeries(t *testing.T) {
out := make([]models.Series, 0)
Expand Down
47 changes: 31 additions & 16 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,13 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
// as graphite needs high-res data to perform its processing.
mdp = 0
}
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, nil)

opts, err := optimizations.ApplyUserPrefs(request.Optimizations)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
}
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, opts)
if err != nil {
if fun, ok := err.(expr.ErrUnknownFunction); ok {
if request.NoProxy {
Expand Down Expand Up @@ -661,7 +667,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)

minFrom := uint32(math.MaxUint32)
var maxTo uint32
var reqs []models.Req
reqs := NewReqMap()
metaTagEnrichmentData := make(map[string]tagquery.Tags)

// note that different patterns to query can have different from / to, so they require different index lookups
Expand All @@ -686,7 +692,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
if err != nil {
return nil, meta, err
}
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-len(reqs), false)
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-int(reqs.cnt), false)
} else {
series, err = s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From))
}
Expand Down Expand Up @@ -718,9 +724,9 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
cons = closestAggMethod(consReq, mdata.Aggregations.Get(archive.AggId).AggregationMethod)
}

newReq := models.NewReq(
archive.Id, archive.NameWithTags(), r.Query, r.From, r.To, plan.MaxDataPoints, uint32(archive.Interval), cons, consReq, s.Node, archive.SchemaId, archive.AggId)
reqs = append(reqs, newReq)
newReq := r.ToModel()
newReq.Init(archive, cons, s.Node)
reqs.Add(newReq)
}

if tagquery.MetaTagSupport && len(metric.Defs) > 0 && len(metric.MetaTags) > 0 {
Expand All @@ -739,31 +745,35 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
default:
}

reqRenderSeriesCount.Value(len(reqs))
if len(reqs) == 0 {
reqRenderSeriesCount.ValueUint32(reqs.cnt)
if reqs.cnt == 0 {
return nil, meta, nil
}

meta.RenderStats.SeriesFetch = uint32(len(reqs))
meta.RenderStats.SeriesFetch = reqs.cnt

// note: if 1 series has a movingAvg that requires a long time range extension, it may push other reqs into another archive. can be optimized later
var err error
reqs, meta.RenderStats.PointsFetch, meta.RenderStats.PointsReturn, err = alignRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs)
var rp *ReqsPlan
rp, err = planRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs, plan.MaxDataPoints, maxPointsPerReqSoft, maxPointsPerReqHard)
if err != nil {
log.Errorf("HTTP Render alignReq error: %s", err.Error())
return nil, meta, err
}
meta.RenderStats.PointsFetch = rp.PointsFetch()
meta.RenderStats.PointsReturn = rp.PointsReturn(plan.MaxDataPoints)
reqsList := rp.List()

span := opentracing.SpanFromContext(ctx)
span.SetTag("num_reqs", len(reqs))
span.SetTag("num_reqs", len(reqsList))
span.SetTag("points_fetch", meta.RenderStats.PointsFetch)
span.SetTag("points_return", meta.RenderStats.PointsReturn)

for _, req := range reqs {
for _, req := range reqsList {
log.Debugf("HTTP Render %s - arch:%d archI:%d outI:%d aggN: %d from %s", req, req.Archive, req.ArchInterval, req.OutInterval, req.AggNum, req.Node.GetName())
}

a := time.Now()
out, err := s.getTargets(ctx, &meta.StorageStats, reqs)
out, err := s.getTargets(ctx, &meta.StorageStats, reqsList)
if err != nil {
log.Errorf("HTTP Render %s", err.Error())
return nil, meta, err
Expand All @@ -787,7 +797,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)

data := make(map[expr.Req][]models.Series)
for _, serie := range out {
q := expr.NewReq(serie.QueryPatt, serie.QueryFrom, serie.QueryTo, serie.QueryCons)
q := expr.NewReqFromSerie(serie)
data[q] = append(data[q], serie)
}

Expand Down Expand Up @@ -1415,7 +1425,12 @@ func (s *Server) showPlan(ctx *middleware.Context, request models.GraphiteRender
stable := request.Process == "stable"
mdp := request.MaxDataPoints

plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, nil)
opts, err := optimizations.ApplyUserPrefs(request.Optimizations)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
}
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, mdp, stable, opts)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
return
Expand Down
Loading