Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit f99dfef

Browse files
committed
revisit copy-on-write series processing
* update docs to include tags and meta section * fix a bunch of bugs where previously data wasn't being deeply copied * cleanly copy the meta section where needed * clean up code with some helper functions
1 parent 1d05ea2 commit f99dfef

20 files changed

+101
-70
lines changed

api/models/series.go

+32-8
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
//go:generate msgp
1717

1818
type Series struct {
19-
Target string // for fetched data, set from models.Req.Target, i.e. the metric graphite key. for function output, whatever should be shown as target string (legend)
20-
Datapoints []schema.Point
19+
Target string // for fetched data, set from models.Req.Target, i.e. the metric graphite key. for function output, whatever should be shown as target string (legend)
2120
Tags map[string]string // Must be set initially via call to `SetTags()`
21+
Datapoints []schema.Point
2222
Interval uint32
2323
QueryPatt string // to tie series back to request it came from. e.g. foo.bar.*, or if series outputted by func it would be e.g. scale(foo.bar.*,0.123456)
2424
QueryFrom uint32 // to tie series back to request it came from
@@ -163,26 +163,50 @@ func (s *Series) buildTargetFromTags() {
163163
s.Target = buf.String()
164164
}
165165

166+
// Copy returns a deep copy.
167+
// The returned value does not link to the same memory space for any of the properties
166168
func (s Series) Copy(emptyDatapoints []schema.Point) Series {
167-
newSeries := Series{
169+
return Series{
168170
Target: s.Target,
169-
Datapoints: emptyDatapoints,
170-
Tags: make(map[string]string, len(s.Tags)),
171+
Datapoints: append(emptyDatapoints, s.Datapoints...),
172+
Tags: s.CopyTags(),
171173
Interval: s.Interval,
172174
QueryPatt: s.QueryPatt,
173175
QueryFrom: s.QueryFrom,
174176
QueryTo: s.QueryTo,
175177
QueryCons: s.QueryCons,
176178
Consolidator: s.Consolidator,
179+
Meta: s.Meta.Copy(),
177180
}
181+
}
178182

179-
newSeries.Datapoints = append(newSeries.Datapoints, s.Datapoints...)
183+
// CopyBare returns a bare copy.
184+
// The returned value does not link to the same memory space for any of the properties
185+
// because it resets all reference types
186+
func (s Series) CopyBare() Series {
187+
s.Datapoints = nil
188+
s.Tags = nil
189+
s.Meta = nil
190+
return s
191+
}
180192

193+
// CopyTags makes a deep copy of the tags
194+
func (s *Series) CopyTags() map[string]string {
195+
out := make(map[string]string, len(s.Tags))
181196
for k, v := range s.Tags {
182-
newSeries.Tags[k] = v
197+
out[k] = v
183198
}
199+
return out
200+
}
184201

185-
return newSeries
202+
// CopyTagsWith makes a deep copy of the tags and sets the given tag
203+
func (s *Series) CopyTagsWith(key, val string) map[string]string {
204+
out := make(map[string]string, len(s.Tags)+1)
205+
for k, v := range s.Tags {
206+
out[k] = v
207+
}
208+
out[key] = val
209+
return out
186210
}
187211

188212
type SeriesByTarget []Series

devdocs/expr.md

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
## Management of point slices
22

3-
The `models.Series` attribute `Datapoints []schema.Point` needs special atention:
3+
The `models.Series` type, even when passed by value, has a few fields that need special attention:
4+
* `Datapoints []schema.Point`
5+
* `Tags map[string]string`
6+
* `Meta SeriesMeta`
47

5-
many processing functions will transform some of the points in datapoint slices. logically speaking, some output values are different than their input values,
6-
while some may remain the same. they need a place to store their output.
8+
Many processing functions will want to return an output series that differs from the input, in terms of (some of the) datapoints may have changed value, tags or metadata.
9+
They need a place to store their output but we cannot simply operate on the input series, or even a copy of it, as the underlying datastructures are shared.
710

811
Goals:
9-
* processing functions should not modify data in slices if those slices need to remain original (e.g. because they're re-used later)
10-
* minimize allocations of new slices foremost and data copying (if point in= point out) as a smaller concern
12+
* processing functions should not modify data if that data needs to remain original (e.g. because of re-use of the same input data elsewhere)
13+
* minimize allocations of new structures foremost
14+
* minimize data copying as a smaller concern
1115
* simple code
1216

1317
there's 2 main choices:
1418

1519
1) copy-on-write:
16-
- each function does not modify data in their inputs, they allocate new slices (or better: get from pool) in which they should store their output point values
20+
- each function does not modify data in their inputs, they allocate new structures (or possibly get from pool) if there's differences with input
1721
- storing output data into new slice can typically be done in same pass as processing the input data
1822
- if you have lots of processing steps (graphite function calls) in a row, we will be creating more slices and copy data (for unmodified points) than strictly necessary.
1923
- getting a slice from the pool may cause a stall if it's not large enough and runtime needs to re-allocate and copy
@@ -42,8 +46,8 @@ e.g. an avg of 3 series will create 1 new series (from pool), but won't put the
4246
another processing step may require the same input data.
4347

4448
function implementations:
45-
* must not modify existing slices
46-
* should use the pool to get new slices in which to store their new/modified data.
49+
* must not modify existing slices or maps or other composite datastructures (at the time of writing, it's only slices/maps)
50+
* should use the pool to get new slices in which to store their new/modified datapoints.
4751
* should add said new slices into the cache so it can later be cleaned
4852

4953
## consolidateBy

expr/func_absolute.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,14 @@ func (s *FuncAbsolute) Exec(cache map[Req][]models.Series) ([]models.Series, err
3535
for i, serie := range series {
3636
transformed := &out[i]
3737
transformed.Target = fmt.Sprintf("absolute(%s)", serie.Target)
38-
transformed.QueryPatt = fmt.Sprintf("absolute(%s)", serie.QueryPatt)
39-
transformed.Tags = make(map[string]string, len(serie.Tags)+1)
38+
transformed.Tags = serie.CopyTagsWith("absolute", "1")
4039
transformed.Datapoints = pointSlicePool.Get().([]schema.Point)
4140
transformed.Interval = serie.Interval
42-
transformed.Consolidator = serie.Consolidator
41+
transformed.QueryPatt = fmt.Sprintf("absolute(%s)", serie.QueryPatt)
4342
transformed.QueryCons = serie.QueryCons
44-
transformed.Meta = serie.Meta.Copy()
43+
transformed.Consolidator = serie.Consolidator
44+
transformed.Meta = serie.Meta
4545

46-
for k, v := range serie.Tags {
47-
transformed.Tags[k] = v
48-
}
49-
transformed.Tags["absolute"] = "1"
5046
for _, p := range serie.Datapoints {
5147
p.Val = math.Abs(p.Val)
5248
transformed.Datapoints = append(transformed.Datapoints, p)

expr/func_aggregate.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ func (s *FuncAggregate) Exec(cache map[Req][]models.Series) ([]models.Series, er
5050

5151
// The tags for the aggregated series is only the tags that are
5252
// common to all input series
53-
commonTags := make(map[string]string, len(series[0].Tags))
54-
for k, v := range series[0].Tags {
55-
commonTags[k] = v
56-
}
53+
commonTags := series[0].CopyTags()
5754

5855
var meta models.SeriesMeta
5956

@@ -70,12 +67,12 @@ func (s *FuncAggregate) Exec(cache map[Req][]models.Series) ([]models.Series, er
7067
name := s.agg.name + "Series(" + strings.Join(queryPatts, ",") + ")"
7168
output := models.Series{
7269
Target: name,
73-
QueryPatt: name,
7470
Tags: commonTags,
7571
Datapoints: out,
7672
Interval: series[0].Interval,
77-
Consolidator: cons,
73+
QueryPatt: name,
7874
QueryCons: queryCons,
75+
Consolidator: cons,
7976
Meta: meta,
8077
}
8178
cache[Req{}] = append(cache[Req{}], output)

expr/func_alias.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (s *FuncAlias) Exec(cache map[Req][]models.Series) ([]models.Series, error)
3232
for i := range series {
3333
series[i].Target = s.alias
3434
series[i].QueryPatt = s.alias
35-
series[i].Tags["name"] = s.alias
35+
series[i].Tags = series[i].CopyTagsWith("name", s.alias)
3636
}
3737
return series, nil
3838
}

expr/func_aliasbynode.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *FuncAliasByNode) Exec(cache map[Req][]models.Series) ([]models.Series,
3333
n := aggKey(serie, s.nodes)
3434
series[i].Target = n
3535
series[i].QueryPatt = n
36-
series[i].Tags["name"] = n
36+
series[i].Tags = series[i].CopyTagsWith("name", n)
3737
}
3838
return series, nil
3939
}

expr/func_aliassub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *FuncAliasSub) Exec(cache map[Req][]models.Series) ([]models.Series, err
4848
name := s.search.ReplaceAllString(metric, replace)
4949
series[i].Target = name
5050
series[i].QueryPatt = name
51-
series[i].Tags["name"] = name
51+
series[i].Tags = series[i].CopyTagsWith("name", name)
5252
}
5353
return series, err
5454
}

expr/func_aspercent.go

+5
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (s *FuncAsPercent) execWithNodes(series, totals []models.Series, cache map[
119119
nonesSerie.QueryPatt = fmt.Sprintf("asPercent(%s,MISSING)", serie1.QueryPatt)
120120
nonesSerie.Target = fmt.Sprintf("asPercent(%s,MISSING)", serie1.Target)
121121
nonesSerie.Tags = map[string]string{"name": nonesSerie.Target}
122+
nonesSerie.Meta = serie1.Meta.Copy()
122123

123124
if nones == nil {
124125
nones = pointSlicePool.Get().([]schema.Point)
@@ -195,6 +196,7 @@ func (s *FuncAsPercent) execWithoutNodes(series, totals []models.Series, cache m
195196
}
196197
serie.Datapoints[i].Val = computeAsPercent(serie.Datapoints[i].Val, totalVal)
197198
}
199+
serie.Meta = serie.Meta.Merge(totalsSerie.Meta)
198200
outSeries = append(outSeries, serie)
199201
cache[Req{}] = append(cache[Req{}], serie)
200202
}
@@ -247,9 +249,11 @@ func sumSeries(series []models.Series, cache map[Req][]models.Series) models.Ser
247249
out := pointSlicePool.Get().([]schema.Point)
248250
crossSeriesSum(series, &out)
249251
var queryPatts []string
252+
var meta models.SeriesMeta
250253

251254
Loop:
252255
for _, v := range series {
256+
meta = meta.Merge(v.Meta)
253257
// avoid duplicates
254258
for _, qp := range queryPatts {
255259
if qp == v.QueryPatt {
@@ -268,6 +272,7 @@ Loop:
268272
Consolidator: cons,
269273
QueryCons: queryCons,
270274
Tags: map[string]string{"name": name},
275+
Meta: meta,
271276
}
272277
cache[Req{}] = append(cache[Req{}], sum)
273278
return sum

expr/func_countseries.go

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ func (s *FuncCountSeries) Exec(cache map[Req][]models.Series) ([]models.Series,
4545
out = append(out, p)
4646
}
4747

48+
var meta models.SeriesMeta
49+
for _, s := range series {
50+
meta = meta.Merge(s.Meta)
51+
}
52+
4853
output := models.Series{
4954
Target: name,
5055
QueryPatt: name,
@@ -53,6 +58,7 @@ func (s *FuncCountSeries) Exec(cache map[Req][]models.Series) ([]models.Series,
5358
Interval: series[0].Interval,
5459
Consolidator: cons,
5560
QueryCons: queryCons,
61+
Meta: meta,
5662
}
5763
cache[Req{}] = append(cache[Req{}], output)
5864

expr/func_derivative.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,10 @@ func (s *FuncDerivative) Exec(cache map[Req][]models.Series) ([]models.Series, e
3434
outSeries := make([]models.Series, len(series))
3535
for i, serie := range series {
3636
serie.Target = fmt.Sprintf("derivative(%s)", serie.Target)
37+
serie.Tags = serie.CopyTagsWith("derivative", "1")
3738
serie.QueryPatt = fmt.Sprintf("derivative(%s)", serie.QueryPatt)
3839
out := pointSlicePool.Get().([]schema.Point)
3940

40-
newTags := make(map[string]string, len(serie.Tags)+1)
41-
for k, v := range serie.Tags {
42-
newTags[k] = v
43-
}
44-
newTags["derivative"] = "1"
45-
serie.Tags = newTags
46-
4741
prev := math.NaN()
4842
for _, p := range serie.Datapoints {
4943
val := p.Val

expr/func_divideseries.go

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (s *FuncDivideSeries) Exec(cache map[Req][]models.Series) ([]models.Series,
6767
Interval: divisor.Interval,
6868
Consolidator: dividend.Consolidator,
6969
QueryCons: dividend.QueryCons,
70+
Meta: dividend.Meta.Copy().Merge(divisor.Meta),
7071
}
7172
cache[Req{}] = append(cache[Req{}], output)
7273
series = append(series, output)

expr/func_divideserieslists.go

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (s *FuncDivideSeriesLists) Exec(cache map[Req][]models.Series) ([]models.Se
6767
Interval: divisor.Interval,
6868
Consolidator: dividend.Consolidator,
6969
QueryCons: dividend.QueryCons,
70+
Meta: dividend.Meta.Copy().Merge(divisor.Meta),
7071
}
7172
cache[Req{}] = append(cache[Req{}], output)
7273
series = append(series, output)

expr/func_get.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ func (s FuncGet) Context(context Context) Context {
2424
func (s FuncGet) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
2525
series := cache[s.req]
2626

27+
// this function is the only exception to the COW pattern
28+
// it is allowed to modify the series directly to set the needed tags
2729
for k := range series {
2830
series[k].SetTags()
2931
}

expr/func_groupbytags.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
4545
return nil, errors.New("No tags specified")
4646
}
4747

48-
groups := make(map[string][]models.Series)
48+
type Group struct {
49+
s []models.Series
50+
m models.SeriesMeta
51+
}
52+
groups := make(map[string]Group)
4953
useName := false
5054

5155
groupTags := s.tags
@@ -99,28 +103,32 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
99103

100104
key := buffer.String()
101105

102-
groups[key] = append(groups[key], serie)
106+
group := groups[key]
107+
group.s = append(group.s, serie)
108+
group.m = group.m.Merge(serie.Meta)
109+
groups[key] = group
103110
}
104111

105112
output := make([]models.Series, 0, len(groups))
106113
aggFunc := getCrossSeriesAggFunc(s.aggregator)
107114

108115
// Now, for each key perform the requested aggregation
109-
for name, groupSeries := range groups {
110-
cons, queryCons := summarizeCons(groupSeries)
116+
for name, group := range groups {
117+
cons, queryCons := summarizeCons(group.s)
111118

112119
newSeries := models.Series{
113120
Target: name,
114121
QueryPatt: name,
115122
Interval: series[0].Interval,
116123
Consolidator: cons,
117124
QueryCons: queryCons,
125+
Meta: group.m,
118126
}
119127
newSeries.SetTags()
120128

121129
newSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
122130

123-
aggFunc(groupSeries, &newSeries.Datapoints)
131+
aggFunc(group.s, &newSeries.Datapoints)
124132
cache[Req{}] = append(cache[Req{}], newSeries)
125133

126134
output = append(output, newSeries)

expr/func_integral.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,13 @@ func (s *FuncIntegral) Exec(cache map[Req][]models.Series) ([]models.Series, err
3535
for i, serie := range series {
3636
transformed := &out[i]
3737
transformed.Target = fmt.Sprintf("integral(%s)", serie.Target)
38-
transformed.QueryPatt = fmt.Sprintf("integral(%s)", serie.QueryPatt)
39-
transformed.Tags = make(map[string]string, len(serie.Tags)+1)
38+
transformed.Tags = serie.CopyTagsWith("integral", "1")
4039
transformed.Datapoints = pointSlicePool.Get().([]schema.Point)
40+
transformed.QueryPatt = fmt.Sprintf("integral(%s)", serie.QueryPatt)
4141
transformed.Interval = serie.Interval
4242
transformed.Consolidator = serie.Consolidator
4343
transformed.QueryCons = serie.QueryCons
44-
45-
for k, v := range serie.Tags {
46-
transformed.Tags[k] = v
47-
}
48-
transformed.Tags["integral"] = "1"
44+
transformed.Meta = serie.Meta
4945

5046
current := 0.0
5147
for _, p := range serie.Datapoints {

expr/func_isnonnull.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,13 @@ func (s *FuncIsNonNull) Exec(cache map[Req][]models.Series) ([]models.Series, er
3636
transformed := &out[i]
3737
transformed.Target = fmt.Sprintf("isNonNull(%s)", serie.Target)
3838
transformed.QueryPatt = fmt.Sprintf("isNonNull(%s)", serie.QueryPatt)
39-
transformed.Tags = make(map[string]string, len(serie.Tags)+1)
39+
transformed.Tags = serie.CopyTagsWith("isNonNull", "1")
4040
transformed.Datapoints = pointSlicePool.Get().([]schema.Point)
4141
transformed.Interval = serie.Interval
4242
transformed.Consolidator = serie.Consolidator
4343
transformed.QueryCons = serie.QueryCons
44+
transformed.Meta = serie.Meta.Copy()
4445

45-
for k, v := range serie.Tags {
46-
transformed.Tags[k] = v
47-
}
48-
transformed.Tags["isNonNull"] = "1"
4946
for _, p := range serie.Datapoints {
5047
if math.IsNaN(p.Val) {
5148
p.Val = 0

expr/func_keeplastvalue.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series
4646
}
4747
limit := int(s.limit)
4848
outSeries := make([]models.Series, len(series))
49-
for i, serie := range series {
50-
serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target)
49+
for i, in := range series {
50+
serie := in.CopyBare()
51+
serie.Target = fmt.Sprintf("keepLastValue(%s)", in.Target)
5152
serie.QueryPatt = serie.Target
52-
53+
serie.Tags = in.Tags
54+
serie.Meta = in.Meta
5355
out := pointSlicePool.Get().([]schema.Point)
5456

5557
var consecutiveNaNs int

0 commit comments

Comments
 (0)