Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

remove excessive request alignment, add MDP optimization and pre-normalisation #951

Merged
merged 46 commits into from
Jan 30, 2020
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6e5f050
clarifications/simplification
Dieterbe Dec 2, 2019
29d86ce
WIP: don't over-align + implement MDP-optimization, pre-normalization
Dieterbe Dec 2, 2019
752cd66
simplify NewPlan()
Dieterbe Dec 2, 2019
41bca10
docs
Dieterbe Dec 2, 2019
d1fb28a
aggregators now need to be able to normalize at runtime
Dieterbe Dec 2, 2019
6b5c991
helper to generate combinations of uint32's
Dieterbe Jan 16, 2020
89ad210
rework request planning
Dieterbe Jan 15, 2020
f0f9b17
Loop new request identifiers/differentiators through request path
Dieterbe Jan 16, 2020
5dd295e
make fetch optimizations optional
Dieterbe Jan 17, 2020
0f28be9
mt-explain: support new req information
Dieterbe Jan 17, 2020
531a5f5
clean up expr docs.
Dieterbe Jan 18, 2020
1c17a8a
random clarification related to COW
Dieterbe Jan 19, 2020
8f11677
clarify
Dieterbe Jan 19, 2020
9b2db02
clarify limitations of pre-canonical, canonicalize when runtime norma…
Dieterbe Jan 19, 2020
35a725c
WIP fix tests
Dieterbe Jan 19, 2020
cb220e8
deal with non-pre-canonical normalizing like graphite does
Dieterbe Jan 20, 2020
4a7de6a
add msgp generate directives to request for PNGroup
robert-milan Jan 20, 2020
6e7a145
make PN- and MDP-optimizations separately configurable
Dieterbe Jan 20, 2020
80001d4
make it simpler to use models.Series in tests
Dieterbe Jan 20, 2020
9e7156c
fix dataprocessor tests
Dieterbe Jan 20, 2020
bc73917
Newreq for dataprocessor and
Dieterbe Jan 20, 2020
29a4054
testPlan tests
Dieterbe Jan 20, 2020
c863a42
update consolidaton tests for NewPlan taking optimizations
Dieterbe Jan 20, 2020
8ba064f
getModel(): set interval!
Dieterbe Jan 20, 2020
c041be3
fix TestConsolidateBy
Dieterbe Jan 20, 2020
95d51c8
fix tests: MDP is now used to mark MDP optimizations, don't set it
Dieterbe Jan 21, 2020
83da44d
fix planRequests tests to match new behavior
Dieterbe Jan 21, 2020
1929cb6
better query engine tests
Dieterbe Jan 21, 2020
44fdd93
fix query engine tests
Dieterbe Jan 21, 2020
dd85a3d
Req.Equals should also check TTL. we only used this for unit tests
Dieterbe Jan 22, 2020
ae2ff09
make maxPointsPerReq{Soft,Hard} explicit args, rather than globals
Dieterbe Jan 22, 2020
ca80978
do maxPointsPerReq testing same as other tests
Dieterbe Jan 22, 2020
163e222
tests for normalization
Dieterbe Jan 22, 2020
32d7ce2
expr.NewReq should take PNGroup and MDP fields also
Dieterbe Jan 22, 2020
4c8a573
unit test for planner optimizations
Dieterbe Jan 22, 2020
3054f19
note
Dieterbe Jan 22, 2020
427bb5f
refer to http errors by their name as per contribution docs
Dieterbe Jan 22, 2020
3052eb7
mt-explain docs
Dieterbe Jan 22, 2020
38ba464
cleanup docs and devdocs
Dieterbe Jan 23, 2020
076a490
allow passing optimizations as query parameters
Dieterbe Jan 23, 2020
526b443
asPercent: accommodate an extra case
Dieterbe Jan 24, 2020
635a4bb
sean feedback
Dieterbe Jan 24, 2020
d5c4228
asPercent safe again
Dieterbe Jan 24, 2020
7198dfb
robert feedback
Dieterbe Jan 24, 2020
260c7eb
sean feedback 2
Dieterbe Jan 24, 2020
2f9369e
planRequests() erroring is a user error
Dieterbe Jan 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rework request planning
* rethink alignRequests heuristic
* cleanup code for DRY
Dieterbe committed Jan 27, 2020
commit 89ad2108783e9d6b228c17a797e54845376d05f1
11 changes: 7 additions & 4 deletions api/graphite.go
Original file line number Diff line number Diff line change
@@ -748,11 +748,14 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)

// note: if 1 series has a movingAvg that requires a long time range extension, it may push other reqs into another archive. can be optimized later
var err error
var reqsList []models.Req
// TODO get rid of alignrequests and all "align" terminology
reqsList, meta.RenderStats.PointsFetch, meta.RenderStats.PointsReturn, err = planRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs, plan.MaxDataPoints)
var rp *ReqsPlan
rp, err = planRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs, plan.MaxDataPoints)
meta.RenderStats.PointsFetch = rp.PointsFetch()
meta.RenderStats.PointsReturn = rp.PointsReturn(plan.MaxDataPoints)
reqsList := rp.List()

if err != nil {
log.Errorf("HTTP Render alignReq error: %s", err.Error())
log.Errorf("HTTP Render planRequests error: %s", err.Error())
return nil, meta, err
}
span := opentracing.SpanFromContext(ctx)
114 changes: 114 additions & 0 deletions api/graphite_req.go
Original file line number Diff line number Diff line change
@@ -44,3 +44,117 @@ func (r ReqMap) Dump() string {
}
return out
}

// PNGroupSplit embodies a PNGroup broken down by whether requests are MDP-optimizable
type PNGroupSplit struct {
mdpyes []models.Req // MDP-optimizable requests
mdpno []models.Req // not MDP-optimizable reqs
}

// ReqsPlan holds requests that have been planned
type ReqsPlan struct {
pngroups map[expr.PNGroup]PNGroupSplit
single PNGroupSplit
cnt uint32
}

func NewReqsPlan(reqs ReqMap) ReqsPlan {
rp := ReqsPlan{
pngroups: make(map[models.PNGroup]PNGroupSplit),
cnt: reqs.cnt,
}
for group, groupReqs := range reqs.pngroups {
var split PNGroupSplit
for _, req := range groupReqs {
if req.MaxPoints > 0 {
split.mdpyes = append(split.mdpyes, req)
} else {
split.mdpno = append(split.mdpno, req)
}
}
rp.pngroups[group] = split
}
for _, req := range reqs.single {
if req.MaxPoints > 0 {
rp.single.mdpyes = append(rp.single.mdpyes, req)
} else {
rp.single.mdpno = append(rp.single.mdpno, req)
}
}
return rp
}

func (rp ReqsPlan) PointsFetch() uint32 {
var cnt uint32
for _, r := range rp.single.mdpyes {
cnt += r.PointsFetch()
}
for _, r := range rp.single.mdpno {
cnt += r.PointsFetch()
}
for _, split := range rp.pngroups {
for _, r := range split.mdpyes {
cnt += r.PointsFetch()
}
for _, r := range split.mdpno {
cnt += r.PointsFetch()
}
}
return cnt
}

func (rp ReqsPlan) Dump() string {
out := fmt.Sprintf("ReqsPlan (%d entries):\n", rp.cnt)
out += " Groups:\n"
for i, split := range rp.pngroups {
out += fmt.Sprintf(" * group %d\nMDP-yes:\n", i)
for _, r := range split.mdpyes {
out += " " + r.DebugString() + "\n"
}
out += " MDP-no:\n"
for _, r := range split.mdpno {
out += " " + r.DebugString() + "\n"
}
}
out += " Single MDP-yes:\n"
for _, r := range rp.single.mdpyes {
out += " " + r.DebugString() + "\n"
}
out += " Single MDP-no:\n"
for _, r := range rp.single.mdpno {
out += " " + r.DebugString() + "\n"
}
return out
}

// PointsReturn estimates the amount of points that will be returned for this request
// best effort: not aware of summarize(), aggregation functions, runtime normalization. but does account for runtime consolidation
func (rp ReqsPlan) PointsReturn(planMDP uint32) uint32 {
var cnt uint32
for _, r := range rp.single.mdpyes {
cnt += r.PointsReturn(planMDP)
}
for _, r := range rp.single.mdpno {
cnt += r.PointsReturn(planMDP)
}
for _, split := range rp.pngroups {
for _, r := range split.mdpyes {
cnt += r.PointsReturn(planMDP)
}
for _, r := range split.mdpno {
cnt += r.PointsReturn(planMDP)
}
}
return cnt
}

func (rp ReqsPlan) List() []models.Req {
l := make([]models.Req, 0, rp.cnt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, is this why we had requests with Archinterval 0 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we just had the len and cap reversed

l = append(l, rp.single.mdpno...)
l = append(l, rp.single.mdpyes...)
for _, g := range rp.pngroups {
l = append(l, g.mdpno...)
l = append(l, g.mdpyes...)
}
return l
}
73 changes: 69 additions & 4 deletions api/models/request.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package models
import (
"fmt"

"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/schema"

"github.com/grafana/metrictank/cluster"
@@ -22,7 +23,7 @@ type Req struct {
To uint32 `json:"to"`
MaxPoints uint32 `json:"maxPoints"`
RawInterval uint32 `json:"rawInterval"` // the interval of the raw metric before any consolidation
// the consolidation method for rollup archive and normalization. (not runtime consolidation)
// the consolidation method for rollup archive and normalization (pre-normalization and runtime normalization). (but not runtime consolidation)
// ConsReq 0 -> configured value
// Conseq != 0 -> closest value we can offer based on config
Consolidator consolidation.Consolidator `json:"consolidator"`
@@ -35,15 +36,15 @@ type Req struct {
SchemaId uint16 `json:"schemaId"`
AggId uint16 `json:"aggId"`

// these fields need some more coordination and are typically set later (after request alignment)
// these fields need some more coordination and are typically set later (after request planning)
Archive uint8 `json:"archive"` // 0 means original data, 1 means first agg level, 2 means 2nd, etc.
ArchInterval uint32 `json:"archInterval"` // the interval corresponding to the archive we'll fetch
TTL uint32 `json:"ttl"` // the ttl of the archive we'll fetch
OutInterval uint32 `json:"outInterval"` // the interval of the output data, after any runtime consolidation
AggNum uint32 `json:"aggNum"` // how many points to consolidate together at runtime, after fetching from the archive (normalization)
}

// NewReq creates a new request. It sets all properties minus the ones that need request alignment
// NewReq creates a new request. It sets all properties minus the ones that need request planning
func NewReq(key schema.MKey, target, patt string, from, to, maxPoints, rawInterval uint32, cons, consReq consolidation.Consolidator, node cluster.Node, schemaId, aggId uint16) Req {
return Req{
MKey: key,
@@ -61,6 +62,70 @@ func NewReq(key schema.MKey, target, patt string, from, to, maxPoints, rawInterv
}
}

// Plan updates the planning parameters to match the i'th archive in its retention rules
func (r *Req) Plan(i int, ret conf.Retention) {
r.Archive = uint8(i)
if i == 0 {
// The first retention is raw data, so use its native interval
r.ArchInterval = r.RawInterval
} else {
r.ArchInterval = uint32(ret.SecondsPerPoint)
}
r.TTL = uint32(ret.MaxRetention())
r.OutInterval = r.ArchInterval
r.AggNum = 1
}

func (r *Req) PlanNormalization(interval uint32) {
r.OutInterval = interval
r.AggNum = interval / r.ArchInterval
}

// AdjustTo adjusts the request to accommodate the requested interval
// notes:
// * the Req MUST have been Plan()'d already!
// * interval MUST be a multiple of the ArchInterval (so we can normalize if needed)
// * the TTL of lower resolution archives is always assumed to be at least as long as the current archive
func (r *Req) AdjustTo(interval, from uint32, rets []conf.Retention) {

// if we satisfy the interval with our current settings, nothing left to do
if r.ArchInterval == interval {
return
}

// let's see if we can deliver it via a lower-res rollup archive.
for i, ret := range rets[r.Archive+1:] {
if interval == uint32(ret.SecondsPerPoint) && ret.Ready <= from {
// we're in luck. this will be more efficient than runtime consolidation
r.Plan(int(r.Archive)+1+i, ret)
return
}
}

// we will have to apply normalization
// we use the initially found archive as starting point. there could be some cases - if you have exotic settings -
// where it may be more efficient to pick a lower res archive as starting point (it would still require an interval
// divisible by the output interval) but let's not worry about that edge case.
r.PlanNormalization(interval)
}

func (r Req) PointsFetch() uint32 {
return (r.To - r.From) / r.ArchInterval
}

// PointsReturn estimates the amount of points that will be returned for this request
// best effort: not aware of summarize(), runtime normalization. but does account for runtime consolidation
func (r Req) PointsReturn(planMDP uint32) uint32 {
points := (r.To - r.From) / r.OutInterval
if planMDP > 0 && points > planMDP {
// note that we don't assign to req.AggNum here, because that's only for normalization.
// MDP runtime consolidation doesn't look at req.AggNum
aggNum := consolidation.AggEvery(points, planMDP)
points /= aggNum
}
return points
}

func (r Req) String() string {
return fmt.Sprintf("%s %d - %d (%s - %s) span:%ds. points <= %d. %s.", r.MKey.String(), r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.Consolidator)
}
@@ -98,7 +163,7 @@ func (r Req) TraceLog(span opentracing.Span) {

// Equals compares all fields of a to b for equality.
// Except
// * TTL (because alignRequests may change it)
// * TTL (because planRequests may change it)
// for 100% correctness we may want to fix this in the future
// but for now, should be harmless since the field is not
// that important for archive fetching
385 changes: 253 additions & 132 deletions api/query_engine.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/query_engine_test.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (
func testAlign(reqs []models.Req, retentions []conf.Retentions, outReqs []models.Req, outErr error, now uint32, t *testing.T) {
var schemas []conf.Schema
oriMaxPointsPerReqSoft := maxPointsPerReqSoft
oriMaxPointsPerHardReq := maxPointsPerReqHard

for _, ret := range retentions {
schemas = append(schemas, conf.Schema{
@@ -28,6 +29,7 @@ func testAlign(reqs []models.Req, retentions []conf.Retentions, outReqs []models
maxPointsPerReqSoft = points
}
}
maxPointsPerReqHard = maxPointsPerReqSoft * 10
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? was it rejecting requests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was failing during planning because it was always set to 0

if int(rp.PointsFetch()) > maxPointsPerReqHard {
return nil, errMaxPointsPerReq
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but 0 means disabled. I will update the code to implement 0/disabled. then we can probably undo that change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes more sense, sounds good.


mdata.Schemas = conf.NewSchemas(schemas)
out, _, _, err := alignRequests(now, reqs[0].From, reqs[0].To, reqs)
@@ -45,6 +47,7 @@ func testAlign(reqs []models.Req, retentions []conf.Retentions, outReqs []models
}

maxPointsPerReqSoft = oriMaxPointsPerReqSoft
maxPointsPerReqHard = oriMaxPointsPerHardReq
}

// 2 series requested with equal raw intervals. req 0-30. now 1200. one archive of ttl=1200 does it
5 changes: 3 additions & 2 deletions devdocs/expr.md
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@ However, multiple series getting fetched that then get aggregated together, may
The mechanics here are:
* we set PNGroup to 0 by default on the context, which gets inherited down the tree
* as we traverse down tree: transparant aggregations set PNGroups to the pointer value of that function, to uniquely identify any further data requests that will be fed into the same transparant aggregation.
* as we traverse down, any opaque aggregation functions and IA-functions reset PNGroup back to 0.
* as we traverse down, any opaque aggregation functions and IA-functions reset PNGroup back to 0. Note that currently all known IA functions are also GR functions and vice versa. Meaning,
as we pass functions like smartSummarize which should undo MDP-optimization, they also undo pre-normalization.

## Management of point slices

@@ -125,7 +126,7 @@ So:
- consolidateBy setting defined closest to the leaf without a special* function in between the setting and the leaf, if available
- determined via storage-aggregation.conf (defaults to average)
3) at execution time, the consolidation settings encountered in consolidateBy calls travel up to the root because it is configured on the series, which is passed through the various layers of processing until it hits the root and the output step. This becomes useful in two cases:
- when series need to be normalized at runtime, e.g. for sumSeries or divideSeries with series that have different steps; they need to be normalized (consolidated) so that the series get a compatible step, and the default of "avg" may not suffice. (note that right now we have alignRequests which normalizes all series at fetch time, which can actually be a bit too eager, because some requests can use multiple targets with different processing - e.g. feed two different series into summarize(), so we actually don't need to normalize at runtime, but in the future we should make this better - TODO)
- when series need to be normalized at runtime, e.g. for sumSeries or divideSeries with series that have different steps; they need to be normalized (consolidated) so that the series get a compatible step, and the default of "avg" may not suffice. (note that right now we have alignRequests which normalizes all series at fetch time, which can actually be a bit too eager, because some requests can use multiple targets with different processing - e.g. feed two different series into summarize(), so we actually don't need to normalize at runtime, but in the future we should make this better - TODO THIS IS OUT OF DATE)
- when returning data back to the user via a json response and whatnot, we can consolidate down using the method requested by the user (or average, if not specified). Likewise here, when the setting encounters a special* function while traveling up to the root, the consolidation value is reset to the default (average)
Note: some functions combine multiple series into a new one (e.g. sumSeries, avgSeries, ...). Your input series may use different consolidateBy settings, some may be explicitly specified while others are not. In this scenario, the output series will be given the first explicitly defined consolidateBy found by iterating the inputs, or the first default otherwise.

2 changes: 1 addition & 1 deletion devdocs/maxdatapoints.txt
Original file line number Diff line number Diff line change
@@ -21,4 +21,4 @@ mdp set from GET param, but 0 if came from graphite
-> executePlan() models.NewReq() -> models.Req.MaxPoints
-> planRequests(): used for MDP-optimization
-> plan.MaxDatapoints used for final runtime consolidation
-> and also used in alignRequests() for reporting
-> and also used in planRequests() for reporting
4 changes: 2 additions & 2 deletions devdocs/render-request-handling.md
Original file line number Diff line number Diff line change
@@ -12,8 +12,8 @@
* finds all series by fanning out the query patterns to all other shards.
this gives basically idx.Node back. has the path, leaf, metricdefinition, schema/aggregation(rollup) settings, for each series, as well as on which node it can be found.
* construct models.Req objects for each serie. this uses the MKey to identify series, also sets from/to, maxdatapoints, etc.
* `alignRequests`: this looks at all models.Req objects and aligns them to a common step.
it selects the archive to use, consolidator settings etc (see NOTES in expr directory for more info)
* `planRequests`: this plans at all models.Req objects, which means decide which archive to read from, whether to apply normalization, etc
(see NOTES in expr directory for more info)
* `getTargets`: gets the data from the local node and peer nodes based on the models.Req objects
* `mergeSeries`: if there's multiple series with same name/tags, from, to and consolidator (e.g. because there's multiple series because users switched intervals), merge them together into one series
* Sort each merged series so that the output of a function is well-defined and repeatable.
2 changes: 1 addition & 1 deletion docs/consolidation.md
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ This further reduces data at runtime on an as-needed basis.
It supports min, max, sum, average.


## The request alignment algorithm
## The request planning algorithm. OUT OF DATE AS OF https://github.com/grafana/metrictank/pull/951

Metrictank uses a function called `alignRequests` which will:

2 changes: 1 addition & 1 deletion docs/http-api.md
Original file line number Diff line number Diff line change
@@ -234,7 +234,7 @@ Each lineage section has these fields:
| schema-retentions | Retentions defined in storage-schemas.conf |
| archive-read | Which archive was read as defined in the retentions. (0 means raw, 1 first rollup, etc) |
| archive-interval | The native interval of the archive that was read |
| aggnum-norm | If >1, number of points aggregated together per point, as part of normalization (series alignment) |
| aggnum-norm | If >1, number of points aggregated together per point, as part of normalization |
| aggnum-rc | If >1, number of points aggregated together per output point, as part of runtime consolidation (MaxDataPoints) |
| consolidator-normfetch | Consolidator used for normalization (if aggnum-norm > 1) and which rollup was read (if archive-read > 0) |
| consolidator-rc | Consolidator used for runtime consolidation (MaxDataPoints) (if aggnum-rc > 1) |
3 changes: 2 additions & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
@@ -24,7 +24,8 @@ the archive chosen for the request.
* `api.request.render.points_fetched`:
the number of points that need to be fetched for a /render request.
* `api.request.render.points_returned`:
the number of points the request will return.
the number of points the request will return
best effort: not aware of summarize(), aggregation functions, runtime normalization. but does account for runtime consolidation
* `api.request.render.series`:
the number of series a /render request is handling. This is the number
of metrics after all of the targets in the request have expanded by searching the index.
2 changes: 1 addition & 1 deletion docs/render-path.md
Original file line number Diff line number Diff line change
@@ -89,6 +89,6 @@ such that they can be used together (for aggregating, merging, etc)


TODO talk about
alignRequests -> getTargets -> mergeSeries -> sort Series -> plan.Run (executes functions and does MDP consolidation with nudging)
planRequests -> getTargets -> mergeSeries -> sort Series -> plan.Run (executes functions and does MDP consolidation with nudging)

talk more about what happens at each step, how data is manipulated etc