Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

duplicate series: deduplicate + fix runtime normalization #1855

Merged
merged 8 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,14 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
// the requested range is too narrow for the requested interval
return []schema.Point{}
}
// 3 attempts to get a sufficiently sized slice from the pool. if it fails, allocate a new one.
// try to get a sufficiently sized slice from the pool. if it fails, allocate a new one.
var out []schema.Point
neededCap := int((last-first)/interval + 1)
for attempt := 1; attempt < 4; attempt++ {
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= neededCap {
out = candidate[:neededCap]
break
}
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= neededCap {
out = candidate[:neededCap]
} else {
pointSlicePool.Put(candidate)
}
if out == nil {
out = make([]schema.Point, neededCap)
}

Expand Down
2 changes: 1 addition & 1 deletion api/graphite_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r ReqMap) Dump() string {
out := fmt.Sprintf("ReqsMap (%d entries):\n", r.cnt)
out += " Groups:\n"
for i, reqs := range r.pngroups {
out += fmt.Sprintf(" * group %d:", i)
out += fmt.Sprintf(" * group %d:\n", i)
for _, r := range reqs {
out += " " + r.DebugString() + "\n"
}
Expand Down
92 changes: 86 additions & 6 deletions devdocs/expr.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,70 @@
# Request deduplication throughout the render path

# Plan construction

During plan construction, identical requests can be deduplicated.
Requests are identical based on:
* metric name/patterns or SeriesByTag() call
* the context: depending on which functions have been passed through, the from/to or pngroup may have been adjusted (assuming pre-normalization is not disabled)

Examples:
* `target=(foo.*)&target=movingAverage(consolidateBy(sum(foo.*),"max"), "1min")` are different.
We request `foo.*` twice, but with different from/to, consolidator and pngroup.
(also MDP optimization may play a role here, but that is experimental / should always be disabled)
* `target=foo&target=sum(foo)`. PNGroup property will be 0 and >0 respectively.
* `target=sum(foo)&target=sum(foo)`. PNGroup property will different for both targets.

If they are identical, they lead to identical expr.Req objects, and will be saved in the plan only a single time.
Importantly, equivalent requests may appear (e.g. requests for the same series, once with and once without a pngroup)

# index lookups

For the purpose of index lookups we deduplicate more aggressively and only look at query/from/to.
Other properties such as pngroup, consolidator are not relevant for this purpose.
Note that a query like `target=foo.*&target=foo.bar` will return the series foo.bar twice, these are currently not deduplicated.

# ReqMap

For each combination of a plan.Req and all its resulting series, we generate a models.Req and save it into ReqMap.
Again, these requests may only differ by pngroup, consolidator or query pattern, despite covering the same data fetch

# Request planning: planRequests()

The ReqsPlan returned by planRequests() may have equivalent or identical requests.
E.g. a PNGroup may only change the effective fetch parameters if there's multiple different series, with different intervals, with the same PNGroup, because in that case, pre-normalization may kick in, and a different archive may be selected. (or the same archive/fetch but normalization at runtime)
Also, since max-points-per-req-soft gets applied group by group, and then the singles, breaching this condition may lead to different requests.

The point being, requests may be non-identical though equivalent in the ReqsPlan.

# getTargets

As described above, the list of models.Req to fetch, may contain requests that are non-identical though equivalent

* Pattern: as described in index lookups, we may fetch the exact same data twice, if it came in via different query patterns.
* PNgroup alone does not affect fetching, though honoring the PNGroup may have had an effect on e.g. the archive to fetch or AggNum
* ConsReq: requested consolidator does not matter for fetching or any processing
* Consolidator: the consolidator may have an effect on fetching (if we query a rollup) and/or on after-fetch normalization
* AggNum: affects after-fetch runtime-normalization, but not the fetching
* MaxPoints: only for mdp optimization (ignored here, experimental)

The fields that materially affect fetching are MKey, archive, from, to (and Consolidator if a rollup is fetched)

# datamap

The datamap is a list of series, grouped by expr.Req (which tracks the requested consolidator, but not the used consolidator)
There are a few cases where different expr.Req entries may have overlapping, or even identical lists of series:
* there may be two different requests with a Pattern such as `foo.*` and `foo`
* a request like `target=foo&target=sum(foo)` or `target=sum(foo)&target=sum(foo)` (and pre-normalization enabled) led to different requests to the pngroup

Note: even if the same series is present elsewhere in the datamap, each copy is a full/independent/deep copy.

# function processing

Loading data and feeding into the function processing chain happens through FuncGet, which looks up data based on the expr.Req coming from the user, which maps 1:1 to the datamap above.
In particular: it takes into account pngroups (even if they lead to equivalent fetches and thus identical series). But each copy of a series is a distinct, deep copy.
It is important to note that any series may be processed or returned more than once. E.g. a query like `target=a&target=a`. For this reason, any function or operation such as runtime consolidation, needs to make sure not to effect the contents of the datamap.
How we solve this is described in the section "Considerations around Series changes and reuse and why we chose copy-on-write" below.

# Considerations when writing a Graphite processing function in metrictank

## constructors should return a pointer
Expand Down Expand Up @@ -32,13 +99,26 @@ example: an averageSeries() of 3 series:

## Introduction

The `models.Series` type, even when passed by value, has a few fields that need special attention:
* `Datapoints []schema.Point`
* `Tags map[string]string`
* `Meta SeriesMeta`
`plan.Run()` runs all function processing as well as any needed follup, such as runtime consolidation.
To do this, it has a dataMap as its source of data. For more information on how this structure is generated,
see above. But the TLDR is that it is the source of truth of series data, keyed by the expr.Req,
and that each entry in the datamap may be read more than once (if the same expression is used multiple times in a query)

Many processing functions (or runtime consolidation) will want to return an output series that differs from the input,
e.g. a different target or interval, tags, datapoints, etc.

This requires 2 special considerations:

1. Since in most of the processing chain, series are passed as series lists (`in []models.Series`),
any change such as `in[0].Interval = <newvalue>` will impact the entry in the datamap.
2. The `models.Series` type, even when copied or passed by value, still has a few fields that need special attention, because
the underlying datastructures are shared. These are:
- `Datapoints []schema.Point`
- `Tags map[string]string`
- `Meta SeriesMeta`

Many processing functions will want to return an output series that differs from the input in terms of datapoints, tags or metadata.
They need a place to store their output but we cannot simply operate on the input series, or even a copy of it, as the underlying datastructures are shared.
Thus, when making changes, we need a place to store the new output. For 1. it suffices to take a copy of the series struct itself,
for 2, we need to copy the underlying datastructures.

## Goals

Expand Down
31 changes: 28 additions & 3 deletions expr/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,30 @@ func NewPlan(exprs []*expr, from, to, mdp uint32, stable bool, optimizations Opt

// newplan adds requests as needed for the given expr, resolving function calls as needed
func newplan(e *expr, context Context, stable bool, reqs []Req) (GraphiteFunc, []Req, error) {

// suppress duplicate queries such as target=foo&target=foo
// note that unless `pre-normalization = false`,
// this cannot suppress duplicate reqs in these cases:
// target=foo&target=sum(foo) // reqs are different, one has a PNGroup set
// target=sum(foo)&target=sum(foo) // reqs get different PNGroups
// perhaps in the future we can improve on this and
// deduplicate the largest common (sub)expressions

addReqIfNew := func(req Req) {
for _, r := range reqs {
if r == req {
return
}
}
reqs = append(reqs, req)
}

if e.etype != etFunc && e.etype != etName {
return nil, nil, errors.NewBadRequest("request must be a function call or metric pattern")
}
if e.etype == etName {
req := NewReqFromContext(e.str, context)
reqs = append(reqs, req)
addReqIfNew(req)
return NewGet(req), reqs, nil
} else if e.etype == etFunc && e.str == "seriesByTag" {
// `seriesByTag` function requires resolving expressions to series
Expand All @@ -191,7 +209,7 @@ func newplan(e *expr, context Context, stable bool, reqs []Req) (GraphiteFunc, [
// TODO - find a way to prevent this parse/encode/parse/encode loop
expressionStr := "seriesByTag(" + e.argsStr + ")"
req := NewReqFromContext(expressionStr, context)
reqs = append(reqs, req)
addReqIfNew(req)
return NewGet(req), reqs, nil
}
// here e.type is guaranteed to be etFunc
Expand Down Expand Up @@ -312,19 +330,26 @@ func (p *Plan) Run(dataMap DataMap) ([]models.Series, error) {
}
out = append(out, series...)
}

// while 'out' contains copies of the series, the datapoints, meta and tags properties need COW
// see devdocs/expr.md
for i, o := range out {
if p.MaxDataPoints != 0 && len(o.Datapoints) > int(p.MaxDataPoints) {
// series may have been created by a function that didn't know which consolidation function to default to.
// in the future maybe we can do more clever things here. e.g. perSecond maybe consolidate by max.
if o.Consolidator == 0 {
o.Consolidator = consolidation.Avg
}
out[i].Datapoints, out[i].Interval = consolidation.ConsolidateNudged(o.Datapoints, o.Interval, p.MaxDataPoints, o.Consolidator)
pointsCopy := pointSlicePoolGet(len(o.Datapoints))
pointsCopy = pointsCopy[:len(o.Datapoints)]
copy(pointsCopy, o.Datapoints)
out[i].Datapoints, out[i].Interval = consolidation.ConsolidateNudged(pointsCopy, o.Interval, p.MaxDataPoints, o.Consolidator)
out[i].Meta = out[i].Meta.CopyWithChange(func(in models.SeriesMetaProperties) models.SeriesMetaProperties {
in.AggNumRC = consolidation.AggEvery(uint32(len(o.Datapoints)), p.MaxDataPoints)
in.ConsolidatorRC = o.Consolidator
return in
})
dataMap.Add(Req{}, out[i])
}
}
return out, nil
Expand Down
17 changes: 16 additions & 1 deletion expr/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package expr

import "sync"
import (
"sync"

"github.com/grafana/metrictank/schema"
)

var pointSlicePool *sync.Pool

Expand All @@ -12,3 +16,14 @@ var pointSlicePool *sync.Pool
func Pool(p *sync.Pool) {
pointSlicePool = p
}

// pointSlicePoolGet returns a pointslice of at least minCap capacity.
// similar code lives also in api.Fix(). at some point we should really clean up our pool code.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that the pool is created in the api package and passed into the expr package.
the original idea was to make the expr library reusable in different software, hence the need to pass the pool into it.

But it's starting to make more sense to just have one global pool singleton with a couple methods and directly access that everywhere. Out of scope for this PR though

func pointSlicePoolGet(minCap int) []schema.Point {
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= minCap {
return candidate
}
pointSlicePool.Put(candidate)
return make([]schema.Point, 0, minCap)
}