Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Add support for summarize #837

Merged
merged 8 commits into from
Mar 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ rangeOfSeries(seriesList) series | | Stable
scale(seriesLists, num) series | | Stable
stddevSeries(seriesList) series | | Stable
sumSeries(seriesLists) series | sum | Stable
summarize(seriesList) seriesList | | Stable
transformNull(seriesList, default=0) seriesList | | Stable
5 changes: 1 addition & 4 deletions expr/func_consolidateby.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ func NewConsolidateBy() GraphiteFunc {
}

func (s *FuncConsolidateBy) Signature() ([]Arg, []Arg) {
validConsol := func(e *expr) error {
return consolidation.Validate(e.str)
}
return []Arg{
ArgSeriesList{val: &s.in},
ArgString{val: &s.by, validator: []Validator{validConsol}},
ArgString{val: &s.by, validator: []Validator{IsConsolFunc}},
}, []Arg{ArgSeriesList{}}
}

Expand Down
109 changes: 109 additions & 0 deletions expr/func_summarize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package expr

import (
"fmt"
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/batch"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/util"
"github.com/raintank/dur"
"gopkg.in/raintank/schema.v1"
)

type FuncSummarize struct {
in GraphiteFunc
intervalString string
fn string
alignToFrom bool
}

func NewSummarize() GraphiteFunc {
return &FuncSummarize{fn: "sum", alignToFrom: false}
}

func (s *FuncSummarize) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in},
ArgString{val: &s.intervalString, validator: []Validator{IsIntervalString}},
ArgString{key: "func", opt: true, val: &s.fn, validator: []Validator{IsConsolFunc}},
ArgBool{key: "alignToFrom", opt: true, val: &s.alignToFrom},
}, []Arg{ArgSeriesList{}}
}

func (s *FuncSummarize) Context(context Context) Context {
context.consol = 0
return context
}

func (s *FuncSummarize) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

interval, _ := dur.ParseDuration(s.intervalString)
aggFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn))

var alignToFromTarget string
if s.alignToFrom {
alignToFromTarget = ", true"
}
newName := func(oldName string) string {
return fmt.Sprintf("summarize(%s, \"%s\", \"%s\"%s)", oldName, s.intervalString, s.fn, alignToFromTarget)
}

var outputs []models.Series
for _, serie := range series {
var newStart, newEnd uint32 = serie.QueryFrom, serie.QueryTo
if len(serie.Datapoints) > 0 {
newStart = serie.Datapoints[0].Ts
newEnd = serie.Datapoints[len(serie.Datapoints)-1].Ts + serie.Interval
}
if !s.alignToFrom {
newStart = newStart - (newStart % interval)
newEnd = newEnd - (newEnd % interval) + interval
}

out := summarizeValues(serie, aggFunc, interval, newStart, newEnd)

output := models.Series{
Target: newName(serie.Target),
QueryPatt: newName(serie.QueryPatt),
Tags: serie.Tags,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the python code also does:

    series.tags['summarize'] = intervalString
    series.tags['summarizeFunction'] = func

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does...which is odd IMO. Tags are something set at ingest and it seems odd to add more tags at query time (non-optionally). I get that this isn't the place to make these sorts of arguments, but it doesn't sit right with me to mess with the tags (especially in the case of name collision).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to make the info about how the series was processed available to functions further down the chain, and the rationale for modifying the tags is that when you run series x through a function f you are creating a new series f(x), and its tags should identify it and describe how it's different from x.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that, but it means a lot of "reserved" tag names that need to be kept track of and new ones for each function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Added in

Datapoints: out,
Interval: interval,
}
output.Tags["summarize"] = s.intervalString
output.Tags["summarizeFunction"] = s.fn

outputs = append(outputs, output)
cache[Req{}] = append(cache[Req{}], output)
}
return outputs, nil
}

func summarizeValues(serie models.Series, aggFunc batch.AggFunc, interval, start, end uint32) []schema.Point {
out := pointSlicePool.Get().([]schema.Point)

numPoints := int(util.Min(uint32(len(serie.Datapoints)), (start-end)/interval))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm confused here. the only reason to get the util.Min of these two, is just in case the input series had a low number of points wrt to the requested interval right?
e.g.
1 day worth of 2-hourly data
requesting summarize of 1h.
so this becomes numPoints = min(12,24) = 12
then why in the loop below do we increment by interval (1h), but only 12 times? that would cover only a 12h timerange?

Copy link
Contributor Author

@Aergonus Aergonus Feb 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If (serie.Datapoints[i].Ts < ts+interval) is violated, it doesn't increment i. So it would append NaN's for the inbetween. I'll push a test case that shows this.


for ts, i := start, 0; i < numPoints && ts < end; ts += interval {
Copy link
Contributor Author

@Aergonus Aergonus Jan 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be thorough, this is different from the python code. To mirror the python code you would remove i < numPoints from this line.
The only change is that graphite-mt would add an extra NaN value at the end of the results. Adding this would not remove an extra datapoint (only the extra NaN).

s := i
for ; i < numPoints && serie.Datapoints[i].Ts < ts+interval; i++ {
if serie.Datapoints[i].Ts <= ts {
s = i
}
}

aggPoint := schema.Point{Val: math.NaN(), Ts: ts}
if s != i {
aggPoint.Val = aggFunc(serie.Datapoints[s:i])
}

out = append(out, aggPoint)
}

return out
}
Loading