Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit c525a2d

Browse files
committed
fixup! rework request planning
1 parent 448bb71 commit c525a2d

10 files changed

+138
-56
lines changed

api/graphite.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -749,14 +749,13 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
749749
// note: if 1 series has a movingAvg that requires a long time range extension, it may push other reqs into another archive. can be optimized later
750750
var err error
751751
var rp *ReqsPlan
752-
// TODO get rid of alignrequests and all "align" terminology
753752
rp, err = planRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs, plan.MaxDataPoints)
754753
meta.RenderStats.PointsFetch = rp.PointsFetch()
755-
meta.RenderStats.PointsReturn = rp.PointsReturn()
754+
meta.RenderStats.PointsReturn = rp.PointsReturn(plan.MaxDataPoints)
756755
reqsList := rp.List()
757756

758757
if err != nil {
759-
log.Errorf("HTTP Render alignReq error: %s", err.Error())
758+
log.Errorf("HTTP Render planRequests error: %s", err.Error())
760759
return nil, meta, err
761760
}
762761
span := opentracing.SpanFromContext(ctx)

api/graphite_req.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,35 @@ func (rp ReqsPlan) PointsFetch() uint32 {
7575
for _, r := range rp.single.mdpno {
7676
cnt += r.PointsFetch()
7777
}
78-
// TODO also the groups
78+
for _, split := range rp.pngroups {
79+
for _, r := range split.mdpyes {
80+
cnt += r.PointsFetch()
81+
}
82+
for _, r := range split.mdpno {
83+
cnt += r.PointsFetch()
84+
}
85+
}
7986
return cnt
8087
}
8188

82-
// TODO implement this
83-
func (rp ReqsPlan) PointsReturn() uint32 {
89+
// PointsReturn estimates the amount of points that will be returned for this request
90+
// best effort: not aware of summarize(), aggregation functions, runtime normalization. but does account for runtime consolidation
91+
func (rp ReqsPlan) PointsReturn(planMDP uint32) uint32 {
8492
var cnt uint32
8593
for _, r := range rp.single.mdpyes {
86-
cnt += r.PointsFetch()
94+
cnt += r.PointsReturn(planMDP)
8795
}
8896
for _, r := range rp.single.mdpno {
89-
cnt += r.PointsFetch()
97+
cnt += r.PointsReturn(planMDP)
98+
}
99+
for _, split := range rp.pngroups {
100+
for _, r := range split.mdpyes {
101+
cnt += r.PointsReturn(planMDP)
102+
}
103+
for _, r := range split.mdpno {
104+
cnt += r.PointsReturn(planMDP)
105+
}
90106
}
91-
// TODO also the groups
92107
return cnt
93108
}
94109

api/models/request.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Req struct {
5050
type PNGroup uint64
5151

5252
// Init initializes a request based on the metadata that we know of.
53-
// It sets all properties minus the ones that need request alignment
53+
// It sets all properties minus the ones that need request planning
5454
func (r *Req) Init(archive idx.Archive, cons consolidation.Consolidator, node cluster.Node) {
5555
r.MKey = archive.Id
5656
r.Target = archive.NameWithTags()
@@ -111,6 +111,20 @@ func (r *Req) AdjustTo(interval, from uint32, rets []conf.Retention) {
111111
func (r Req) PointsFetch() uint32 {
112112
return (r.To - r.From) / r.ArchInterval
113113
}
114+
115+
// PointsReturn estimates the amount of points that will be returned for this request
116+
// best effort: not aware of summarize(), runtime normalization. but does account for runtime consolidation
117+
func (r Req) PointsReturn(planMDP uint32) uint32 {
118+
points := (r.To - r.From) / r.OutInterval
119+
if planMDP > 0 && points > planMDP {
120+
// note that we don't assign to req.AggNum here, because that's only for normalization.
121+
// MDP runtime consolidation doesn't look at req.AggNum
122+
aggNum := consolidation.AggEvery(points, planMDP)
123+
points /= aggNum
124+
}
125+
return points
126+
}
127+
114128
func (r Req) String() string {
115129
return fmt.Sprintf("%s %d - %d (%s - %s) span:%ds. points <= %d. %s.", r.MKey.String(), r.From, r.To, util.TS(r.From), util.TS(r.To), r.To-r.From-1, r.MaxPoints, r.Consolidator)
116130
}
@@ -148,7 +162,7 @@ func (r Req) TraceLog(span opentracing.Span) {
148162

149163
// Equals compares all fields of a to b for equality.
150164
// Except
151-
// * TTL (because alignRequests may change it)
165+
// * TTL (because planRequests may change it)
152166
// for 100% correctness we may want to fix this in the future
153167
// but for now, should be harmless since the field is not
154168
// that important for archive fetching

api/query_engine.go

+92-38
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ var (
1717
reqRenderChosenArchive = stats.NewMeter32("api.request.render.chosen_archive", false)
1818
// metric api.request.render.points_fetched is the number of points that need to be fetched for a /render request.
1919
reqRenderPointsFetched = stats.NewMeter32("api.request.render.points_fetched", false)
20-
// metric api.request.render.points_returned is the number of points the request will return.
20+
// metric api.request.render.points_returned is the number of points the request will return
21+
// best effort: not aware of summarize(), aggregation functions, runtime normalization. but does account for runtime consolidation
2122
reqRenderPointsReturned = stats.NewMeter32("api.request.render.points_returned", false)
2223

2324
errUnSatisfiable = response.NewError(404, "request cannot be satisfied due to lack of available retentions")
@@ -69,13 +70,13 @@ func planRequests(now, from, to uint32, reqs *ReqMap, planMDP uint32) (*ReqsPlan
6970

7071
for group, split := range rp.pngroups {
7172
if len(split.mdpno) > 0 {
72-
split.mdpno, ok = initialHighestResMulti(now, from, to, split.mdpno)
73+
split.mdpno, ok = planHighestResMulti(now, from, to, split.mdpno)
7374
if !ok {
7475
return nil, errUnSatisfiable
7576
}
7677
}
7778
if len(split.mdpyes) > 0 {
78-
split.mdpyes, ok = initialLowestResForMDPMulti(now, from, to, split.mdpyes)
79+
split.mdpyes, ok = planLowestResForMDPMulti(now, from, to, split.mdpyes)
7980
if !ok {
8081
return nil, errUnSatisfiable
8182
}
@@ -84,30 +85,86 @@ func planRequests(now, from, to uint32, reqs *ReqMap, planMDP uint32) (*ReqsPlan
8485
}
8586
for i, req := range reqs.single {
8687
if req.MaxPoints == 0 {
87-
reqs.single[i], ok = initialHighestResSingle(now, from, to, req)
88+
reqs.single[i], ok = planHighestResSingle(now, from, to, req)
8889
} else {
89-
reqs.single[i], ok = initialLowestResForMDPSingle(now, from, to, req)
90+
reqs.single[i], ok = planLowestResForMDPSingle(now, from, to, req)
9091
}
9192
if !ok {
9293
return nil, errUnSatisfiable
9394
}
9495
}
9596

97+
if maxPointsPerReqSoft > 0 {
98+
// at this point, all MDP-optimizable series have already been optimized
99+
// we can try to reduce the resolution of non-MDP-optimizable series
100+
// if metrictank is already handling all, or most of your queries, then we have been able to determine
101+
// MDP-optimizability very well. If the request came from Graphite, we have to assume it may run GR-functions.
102+
// thus in the former case, we pretty much know that this is going to have an adverse effect on your queries,
103+
// and you should probably not use this option, or we should even get rid of it.
104+
// in the latter case though, it's quite likely we were too cautious and categorized many series as non-MDP
105+
// optimizable whereas in reality they should be, so in that case this option is a welcome way to reduce the
106+
// impact of big queries
107+
// we could do two approaches: gradually reduce the interval of all series/groups being read, or just aggressively
108+
// adjust one group at a time. The latter seems simpler, so for now we do just that.
109+
if rp.PointsFetch() > uint32(maxPointsPerReqSoft) {
110+
for group, split := range rp.pngroups {
111+
if len(split.mdpno) > 0 {
112+
split.mdpno, ok = planLowestResForMDPMulti(now, from, to, split.mdpno)
113+
if !ok {
114+
return nil, errUnSatisfiable
115+
}
116+
rp.pngroups[group] = split
117+
if rp.PointsFetch() <= uint32(maxPointsPerReqSoft) {
118+
goto HonoredSoft
119+
}
120+
}
121+
}
122+
for i, req := range reqs.single {
123+
if req.MaxPoints == 0 {
124+
reqs.single[i], ok = planLowestResForMDPSingle(now, from, to, req)
125+
}
126+
if !ok {
127+
return nil, errUnSatisfiable
128+
}
129+
// for every 10 requests we adjusted, check if we honor soft now.
130+
// note that there may be thousands of requests
131+
if i%10 == 9 {
132+
if rp.PointsFetch() <= uint32(maxPointsPerReqSoft) {
133+
goto HonoredSoft
134+
}
135+
}
136+
}
137+
}
138+
}
139+
HonoredSoft:
140+
141+
if int(rp.PointsFetch()) > maxPointsPerReqHard {
142+
return nil, errMaxPointsPerReq
143+
144+
}
145+
146+
// send out some metrics and we're done!
147+
for _, r := range rp.single.mdpyes {
148+
reqRenderChosenArchive.ValueUint32(uint32(r.Archive))
149+
}
150+
for _, r := range rp.single.mdpno {
151+
reqRenderChosenArchive.ValueUint32(uint32(r.Archive))
152+
}
153+
for _, split := range rp.pngroups {
154+
for _, r := range split.mdpyes {
155+
reqRenderChosenArchive.ValueUint32(uint32(r.Archive))
156+
}
157+
for _, r := range split.mdpno {
158+
reqRenderChosenArchive.ValueUint32(uint32(r.Archive))
159+
}
160+
}
96161
reqRenderPointsFetched.ValueUint32(rp.PointsFetch())
97-
reqRenderPointsReturned.ValueUint32(rp.PointsReturn())
162+
reqRenderPointsReturned.ValueUint32(rp.PointsReturn(planMDP))
98163

99164
return &rp, nil
100165
}
101166

102-
/*
103-
// requests in the same PNGroup can possibly receive a further tweak:
104-
// if they currently require runtime normalization, we may instead be able
105-
// to simply read from rollup archives and avoid runtime normalization
106-
reqs, _, _, _ := alignRequests(now, from, to, groupReqs)
107-
}
108-
*/
109-
110-
func initialHighestResSingle(now, from, to uint32, req models.Req) (models.Req, bool) {
167+
func planHighestResSingle(now, from, to uint32, req models.Req) (models.Req, bool) {
111168
rets := getRetentions(req)
112169
minTTL := now - from
113170
var ok bool
@@ -126,7 +183,7 @@ func initialHighestResSingle(now, from, to uint32, req models.Req) (models.Req,
126183
return req, ok
127184
}
128185

129-
func initialLowestResForMDPSingle(now, from, to uint32, req models.Req) (models.Req, bool) {
186+
func planLowestResForMDPSingle(now, from, to uint32, req models.Req) (models.Req, bool) {
130187
rets := getRetentions(req)
131188
var ok bool
132189
for i := len(rets) - 1; i >= 0; i-- {
@@ -142,7 +199,7 @@ func initialLowestResForMDPSingle(now, from, to uint32, req models.Req) (models.
142199
}
143200
return req, ok
144201
}
145-
func initialHighestResMulti(now, from, to uint32, reqs []models.Req) ([]models.Req, bool) {
202+
func planHighestResMulti(now, from, to uint32, reqs []models.Req) ([]models.Req, bool) {
146203
minTTL := now - from
147204

148205
var listIntervals []uint32
@@ -184,7 +241,7 @@ func initialHighestResMulti(now, from, to uint32, reqs []models.Req) ([]models.R
184241
}
185242

186243
// note: we can assume all reqs have the same MDP.
187-
func initialLowestResForMDPMulti(now, from, to uint32, reqs []models.Req) ([]models.Req, bool) {
244+
func planLowestResForMDPMulti(now, from, to uint32, reqs []models.Req) ([]models.Req, bool) {
188245
var ok bool
189246
minTTL := now - from
190247

@@ -250,29 +307,26 @@ func initialLowestResForMDPMulti(now, from, to uint32, reqs []models.Req) ([]mod
250307
}
251308
}
252309
}
253-
// now we finally found our optimal interval.
310+
// now we finally found our optimal interval that we want to use.
254311
// plan all our requests so that they result in the common output interval.
255312
for i := range reqs {
256313
req := &reqs[i]
257-
req.AdjustTo(interval, from, getRetentions(*req))
258-
}
259-
260-
return reqs, ok
261-
}
262-
263-
/*
264-
func alignRequests(now, from, to uint32, reqs []models.Req) ([]models.Req, uint32, uint32, error) {
265-
var pointsFetch uint32
266-
267-
pointsPerSerie := tsRange / interval
314+
rets := getRetentions(*req)
315+
for i := len(rets); i >= 0; i-- {
316+
ret := rets[i]
317+
if ret.Ready <= from && req.TTL >= minTTL {
318+
if uint32(ret.SecondsPerPoint) == interval {
319+
req.Plan(i, ret)
320+
break
321+
}
322+
if interval%uint32(ret.SecondsPerPoint) == 0 {
323+
req.Plan(i, ret)
324+
req.PlanNormalization(interval)
325+
break
326+
}
327+
}
328+
}
268329

269-
// TODO series are not same resolution, need to account for separate intervals
270-
if planMDP > 0 && pointsPerSerie > planMDP {
271-
// note that we don't assign to req.AggNum here, because that's only for normalization.
272-
// MDP runtime consolidation doesn't look at req.AggNum
273-
aggNum := consolidation.AggEvery(pointsPerSerie, reqs[0].MaxPoints)
274-
pointsPerSerie /= aggNum
330+
return reqs, ok
275331
}
276-
277332
}
278-
*/

devdocs/expr.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ So:
126126
- consolidateBy setting defined closest to the leaf without a special* function in between the setting and the leaf, if available
127127
- determined via storage-aggregation.conf (defaults to average)
128128
3) at execution time, the consolidation settings encountered in consolidateBy calls travel up to the root because it is configured on the series, which is passed through the various layers of processing until it hits the root and the output step. This becomes useful in two cases:
129-
- when series need to be normalized at runtime, e.g. for sumSeries or divideSeries with series that have different steps; they need to be normalized (consolidated) so that the series get a compatible step, and the default of "avg" may not suffice. (note that right now we have alignRequests which normalizes all series at fetch time, which can actually be a bit too eager, because some requests can use multiple targets with different processing - e.g. feed two different series into summarize(), so we actually don't need to normalize at runtime, but in the future we should make this better - TODO)
129+
- when series need to be normalized at runtime, e.g. for sumSeries or divideSeries with series that have different steps; they need to be normalized (consolidated) so that the series get a compatible step, and the default of "avg" may not suffice. (note that right now we have alignRequests which normalizes all series at fetch time, which can actually be a bit too eager, because some requests can use multiple targets with different processing - e.g. feed two different series into summarize(), so we actually don't need to normalize at runtime, but in the future we should make this better - TODO THIS IS OUT OF DATE)
130130
- when returning data back to the user via a json response and whatnot, we can consolidate down using the method requested by the user (or average, if not specified). Likewise here, when the setting encounters a special* function while traveling up to the root, the consolidation value is reset to the default (average)
131131
Note: some functions combine multiple series into a new one (e.g. sumSeries, avgSeries, ...). Your input series may use different consolidateBy settings, some may be explicitly specified while others are not. In this scenario, the output series will be given the first explicitly defined consolidateBy found by iterating the inputs, or the first default otherwise.
132132

devdocs/maxdatapoints.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ mdp set from GET param, but 0 if came from graphite
2121
-> executePlan() models.NewReq() -> models.Req.MaxPoints
2222
-> planRequests(): used for MDP-optimization
2323
-> plan.MaxDatapoints used for final runtime consolidation
24-
-> and also used in alignRequests() for reporting
24+
-> and also used in planRequests() for reporting

devdocs/render-request-handling.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
* finds all series by fanning out the query patterns to all other shards.
1313
this gives basically idx.Node back. has the path, leaf, metricdefinition, schema/aggregation(rollup) settings, for each series, as well as on which node it can be found.
1414
* construct models.Req objects for each serie. this uses the MKey to identify series, also sets from/to, maxdatapoints, etc.
15-
* `alignRequests`: this looks at all models.Req objects and aligns them to a common step.
16-
it selects the archive to use, consolidator settings etc (see NOTES in expr directory for more info)
15+
* `planRequests`: this plans at all models.Req objects, which means decide which archive to read from, whether to apply normalization, etc
16+
(see NOTES in expr directory for more info)
1717
* `getTargets`: gets the data from the local node and peer nodes based on the models.Req objects
1818
* `mergeSeries`: if there's multiple series with same name/tags, from, to and consolidator (e.g. because there's multiple series because users switched intervals), merge them together into one series
1919
* Sort each merged series so that the output of a function is well-defined and repeatable.

docs/consolidation.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ This further reduces data at runtime on an as-needed basis.
3636
It supports min, max, sum, average.
3737

3838

39-
## The request alignment algorithm
39+
## The request planning algorithm. OUT OF DATE AS OF https://github.com/grafana/metrictank/pull/951
4040

4141
Metrictank uses a function called `alignRequests` which will:
4242

docs/http-api.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ Each lineage section has these fields:
202202
| schema-retentions | Retentions defined in storage-schemas.conf |
203203
| archive-read | Which archive was read as defined in the retentions. (0 means raw, 1 first rollup, etc) |
204204
| archive-interval | The native interval of the archive that was read |
205-
| aggnum-norm | If >1, number of points aggregated together per point, as part of normalization (series alignment) |
205+
| aggnum-norm | If >1, number of points aggregated together per point, as part of normalization |
206206
| aggnum-rc | If >1, number of points aggregated together per output point, as part of runtime consolidation (MaxDataPoints) |
207207
| consolidator-normfetch | Consolidator used for normalization (if aggnum-norm > 1) and which rollup was read (if archive-read > 0) |
208208
| consolidator-rc | Consolidator used for runtime consolidation (MaxDataPoints) (if aggnum-rc > 1) |

docs/render-path

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,6 @@ such that they can be used together (for aggregating, merging, etc)
8888

8989

9090
TODO talk about
91-
alignRequests -> getTargets -> mergeSeries -> sort Series -> plan.Run (executes functions and does MDP consolidation with nudging)
91+
planRequests -> getTargets -> mergeSeries -> sort Series -> plan.Run (executes functions and does MDP consolidation with nudging)
9292

9393
talk more about what happens at each step, how data is manipulated etc

0 commit comments

Comments
 (0)