diff --git a/docs/graphite.md b/docs/graphite.md index 9d15aea8db..11d7daa00f 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -49,4 +49,5 @@ rangeOfSeries(seriesList) series | | Stable scale(seriesLists, num) series | | Stable stddevSeries(seriesList) series | | Stable sumSeries(seriesLists) series | sum | Stable +summarize(seriesList) seriesList | | Stable transformNull(seriesList, default=0) seriesList | | Stable diff --git a/expr/func_consolidateby.go b/expr/func_consolidateby.go index a972d8d763..ae0e64c845 100644 --- a/expr/func_consolidateby.go +++ b/expr/func_consolidateby.go @@ -17,12 +17,9 @@ func NewConsolidateBy() GraphiteFunc { } func (s *FuncConsolidateBy) Signature() ([]Arg, []Arg) { - validConsol := func(e *expr) error { - return consolidation.Validate(e.str) - } return []Arg{ ArgSeriesList{val: &s.in}, - ArgString{val: &s.by, validator: []Validator{validConsol}}, + ArgString{val: &s.by, validator: []Validator{IsConsolFunc}}, }, []Arg{ArgSeriesList{}} } diff --git a/expr/func_summarize.go b/expr/func_summarize.go new file mode 100644 index 0000000000..ec9c0a37bc --- /dev/null +++ b/expr/func_summarize.go @@ -0,0 +1,109 @@ +package expr + +import ( + "fmt" + "math" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/batch" + "github.com/grafana/metrictank/consolidation" + "github.com/grafana/metrictank/util" + "github.com/raintank/dur" + "gopkg.in/raintank/schema.v1" +) + +type FuncSummarize struct { + in GraphiteFunc + intervalString string + fn string + alignToFrom bool +} + +func NewSummarize() GraphiteFunc { + return &FuncSummarize{fn: "sum", alignToFrom: false} +} + +func (s *FuncSummarize) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgString{val: &s.intervalString, validator: []Validator{IsIntervalString}}, + ArgString{key: "func", opt: true, val: &s.fn, validator: []Validator{IsConsolFunc}}, + ArgBool{key: "alignToFrom", opt: true, val: &s.alignToFrom}, + }, []Arg{ArgSeriesList{}} +} + +func (s *FuncSummarize) Context(context Context) Context { + context.consol = 0 + return context +} + +func (s *FuncSummarize) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + interval, _ := dur.ParseDuration(s.intervalString) + aggFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) + + var alignToFromTarget string + if s.alignToFrom { + alignToFromTarget = ", true" + } + newName := func(oldName string) string { + return fmt.Sprintf("summarize(%s, \"%s\", \"%s\"%s)", oldName, s.intervalString, s.fn, alignToFromTarget) + } + + var outputs []models.Series + for _, serie := range series { + var newStart, newEnd uint32 = serie.QueryFrom, serie.QueryTo + if len(serie.Datapoints) > 0 { + newStart = serie.Datapoints[0].Ts + newEnd = serie.Datapoints[len(serie.Datapoints)-1].Ts + serie.Interval + } + if !s.alignToFrom { + newStart = newStart - (newStart % interval) + newEnd = newEnd - (newEnd % interval) + interval + } + + out := summarizeValues(serie, aggFunc, interval, newStart, newEnd) + + output := models.Series{ + Target: newName(serie.Target), + QueryPatt: newName(serie.QueryPatt), + Tags: serie.Tags, + Datapoints: out, + Interval: interval, + } + output.Tags["summarize"] = s.intervalString + output.Tags["summarizeFunction"] = s.fn + + outputs = append(outputs, output) + cache[Req{}] = append(cache[Req{}], output) + } + return outputs, nil +} + +func summarizeValues(serie models.Series, aggFunc batch.AggFunc, interval, start, end uint32) []schema.Point { + out := pointSlicePool.Get().([]schema.Point) + + numPoints := int(util.Min(uint32(len(serie.Datapoints)), (start-end)/interval)) + + for ts, i := start, 0; i < numPoints && ts < end; ts += interval { + s := i + for ; i < numPoints && serie.Datapoints[i].Ts < ts+interval; i++ { + if serie.Datapoints[i].Ts <= ts { + s = i + } + } + + aggPoint := schema.Point{Val: math.NaN(), Ts: ts} + if s != i { + aggPoint.Val = aggFunc(serie.Datapoints[s:i]) + } + + out = append(out, aggPoint) + } + + return out +} diff --git a/expr/func_summarize_test.go b/expr/func_summarize_test.go new file mode 100644 index 0000000000..a596c262de --- /dev/null +++ b/expr/func_summarize_test.go @@ -0,0 +1,772 @@ +package expr + +import ( + "math" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + schema "gopkg.in/raintank/schema.v1" +) + +var abSummarize = []schema.Point{ + {Val: 0, Ts: 10}, + {Val: 0, Ts: 15}, + {Val: 0, Ts: 20}, + {Val: math.MaxFloat64, Ts: 25}, + {Val: 5.5, Ts: 30}, + {Val: math.MaxFloat64 - 20, Ts: 35}, + {Val: math.NaN(), Ts: 40}, + {Val: math.NaN(), Ts: 55}, + {Val: math.NaN(), Ts: 50}, + {Val: 1234567890, Ts: 55}, + {Val: 1234567890, Ts: 60}, + {Val: math.NaN(), Ts: 65}, +} + +var abcSummarize = []schema.Point{ + {Val: 0, Ts: 10}, + {Val: 0, Ts: 12}, + {Val: 0, Ts: 14}, + {Val: math.NaN(), Ts: 16}, + {Val: math.NaN(), Ts: 18}, + {Val: 0, Ts: 20}, + {Val: math.MaxFloat64, Ts: 22}, + {Val: 0, Ts: 24}, + {Val: math.NaN(), Ts: 26}, + {Val: math.NaN(), Ts: 28}, + {Val: 5.5, Ts: 30}, + {Val: math.MaxFloat64 - 20, Ts: 32}, + {Val: 1, Ts: 34}, + {Val: math.NaN(), Ts: 36}, + {Val: math.NaN(), Ts: 38}, + {Val: math.NaN(), Ts: 40}, + {Val: math.NaN(), Ts: 42}, + {Val: 2, Ts: 44}, + {Val: math.NaN(), Ts: 46}, + {Val: math.NaN(), Ts: 48}, + {Val: math.NaN(), Ts: 50}, + {Val: 1234567890, Ts: 52}, + {Val: 3, Ts: 54}, + {Val: math.NaN(), Ts: 56}, + {Val: math.NaN(), Ts: 58}, + {Val: 1234567890, Ts: 60}, + {Val: math.NaN(), Ts: 62}, + {Val: 4, Ts: 64}, +} + +func TestSummarizeDefaultInterval(t *testing.T) { + input := []models.Series{ + { + Target: "a", + QueryPatt: "a", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(a, \"10\", \"sum\")", + QueryPatt: "summarize(a, \"10\", \"sum\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + { + { + Target: "summarize(a, \"10\", \"sum\", true)", + QueryPatt: "summarize(a, \"10\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(a, \"10\", \"max\")", + QueryPatt: "summarize(a, \"10\", \"max\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + { + { + Target: "summarize(a, \"10\", \"max\", true)", + QueryPatt: "summarize(a, \"10\", \"max\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + } + // Note that graphite does not accept 10 as default seconds, but dur lib defaults to seconds without units! + testSummarize("Default Interval", input, outputSum[0], "10", "sum", false, t) + testSummarize("Default Interval", input, outputSum[1], "10", "sum", true, t) + testSummarize("Default Interval", input, outputMax[0], "10", "max", false, t) + testSummarize("Default Interval", input, outputMax[1], "10", "max", true, t) +} + +func TestSummarizeOversampled(t *testing.T) { + + var aOversampled = []schema.Point{ + {Val: 0, Ts: 10}, + {Val: math.NaN(), Ts: 15}, + {Val: 0, Ts: 20}, + {Val: math.NaN(), Ts: 25}, + {Val: 5.5, Ts: 30}, + {Val: math.NaN(), Ts: 35}, + {Val: math.NaN(), Ts: 40}, + {Val: math.NaN(), Ts: 45}, + {Val: math.NaN(), Ts: 50}, + {Val: math.NaN(), Ts: 55}, + {Val: 1234567890, Ts: 60}, + } + + input := []models.Series{ + { + Target: "a", + QueryPatt: "a", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(a, \"5\", \"sum\")", + QueryPatt: "summarize(a, \"5\", \"sum\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 5, + Datapoints: getCopy(aOversampled), + }, + }, + { + { + Target: "summarize(a, \"5\", \"sum\", true)", + QueryPatt: "summarize(a, \"5\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 5, + Datapoints: getCopy(aOversampled), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(a, \"5\", \"max\")", + QueryPatt: "summarize(a, \"5\", \"max\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 5, + Datapoints: getCopy(aOversampled), + }, + }, + { + { + Target: "summarize(a, \"5\", \"max\", true)", + QueryPatt: "summarize(a, \"5\", \"max\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 5, + Datapoints: getCopy(aOversampled), + }, + }, + } + // Note that graphite does not accept 10 as default seconds, but dur lib defaults to seconds without units! + testSummarize("Oversampled Identity", input, outputSum[0], "5", "sum", false, t) + testSummarize("Oversampled Identity", input, outputSum[1], "5", "sum", true, t) + testSummarize("Oversampled Identity", input, outputMax[0], "5", "max", false, t) + testSummarize("Oversampled Identity", input, outputMax[1], "5", "max", true, t) +} + +func TestSummarizeNyquistSingleIdentity(t *testing.T) { + input := []models.Series{ + { + Target: "a", + QueryPatt: "a", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(a, \"10s\", \"sum\")", + QueryPatt: "summarize(a, \"10s\", \"sum\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + { + { + Target: "summarize(a, \"10s\", \"sum\", true)", + QueryPatt: "summarize(a, \"10s\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(a, \"10s\", \"max\")", + QueryPatt: "summarize(a, \"10s\", \"max\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + { + { + Target: "summarize(a, \"10s\", \"max\", true)", + QueryPatt: "summarize(a, \"10s\", \"max\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + }, + } + testSummarize("Nyquist Single Identity", input, outputSum[0], "10s", "sum", false, t) + testSummarize("Nyquist Single Identity", input, outputSum[1], "10s", "sum", true, t) + testSummarize("Nyquist Single Identity", input, outputMax[0], "10s", "max", false, t) + testSummarize("Nyquist Single Identity", input, outputMax[1], "10s", "max", true, t) +} + +func TestSummarizeMultipleIdentity(t *testing.T) { + input := []models.Series{ + { + Target: "a", + QueryPatt: "a", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + { + Target: "b", + QueryPatt: "b", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(b), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(a, \"10s\", \"sum\")", + QueryPatt: "summarize(a, \"10s\", \"sum\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + { + Target: "summarize(b, \"10s\", \"sum\")", + QueryPatt: "summarize(b, \"10s\", \"sum\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(b), + }, + }, + { + { + Target: "summarize(a, \"10s\", \"sum\", true)", + QueryPatt: "summarize(a, \"10s\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + { + Target: "summarize(b, \"10s\", \"sum\", true)", + QueryPatt: "summarize(b, \"10s\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(b), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(a, \"10s\", \"max\")", + QueryPatt: "summarize(a, \"10s\", \"max\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + { + Target: "summarize(b, \"10s\", \"max\")", + QueryPatt: "summarize(b, \"10s\", \"max\")", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(b), + }, + }, + { + { + Target: "summarize(a, \"10s\", \"max\", true)", + QueryPatt: "summarize(a, \"10s\", \"max\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(a), + }, + { + Target: "summarize(b, \"10s\", \"max\", true)", + QueryPatt: "summarize(b, \"10s\", \"max\", true)", + QueryFrom: 10, + QueryTo: 60, + Interval: 10, + Datapoints: getCopy(b), + }, + }, + } + + testSummarize("identity Multiple", input, outputSum[0], "10s", "sum", false, t) + testSummarize("identity Multiple", input, outputSum[1], "10s", "sum", true, t) + testSummarize("identity Multiple", input, outputMax[0], "10s", "max", false, t) + testSummarize("identity Multiple", input, outputMax[1], "10s", "max", true, t) +} + +func TestSummarizeConsolidated(t *testing.T) { + input := []models.Series{ + { + Target: "ab", + QueryPatt: "ab", + QueryFrom: 10, + QueryTo: 65, + Interval: 5, + Datapoints: getCopy(abSummarize), + }, + { + Target: "abc", + QueryPatt: "abc", + QueryFrom: 10, + QueryTo: 64, + Interval: 2, + Datapoints: getCopy(abcSummarize), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(ab, \"10s\", \"sum\")", + QueryPatt: "summarize(ab, \"10s\", \"sum\")", + QueryFrom: 10, + QueryTo: 65, + Interval: 10, + Datapoints: getCopy(sumab), + }, + { + Target: "summarize(abc, \"10s\", \"sum\")", + QueryPatt: "summarize(abc, \"10s\", \"sum\")", + QueryFrom: 10, + QueryTo: 64, + Interval: 10, + Datapoints: getCopy(sumabc), + }, + }, + { + { + Target: "summarize(ab, \"10s\", \"sum\", true)", + QueryPatt: "summarize(ab, \"10s\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 65, + Interval: 10, + Datapoints: getCopy(sumab), + }, + { + Target: "summarize(abc, \"10s\", \"sum\", true)", + QueryPatt: "summarize(abc, \"10s\", \"sum\", true)", + QueryFrom: 10, + QueryTo: 64, + Interval: 10, + Datapoints: getCopy(sumabc), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(ab, \"10s\", \"max\")", + QueryPatt: "summarize(ab, \"10s\", \"max\")", + QueryFrom: 10, + QueryTo: 65, + Interval: 10, + Datapoints: getCopy(maxab), + }, + { + Target: "summarize(abc, \"10s\", \"max\")", + QueryPatt: "summarize(abc, \"10s\", \"max\")", + QueryFrom: 10, + QueryTo: 64, + Interval: 10, + Datapoints: getCopy(maxabc), + }, + }, + { + { + Target: "summarize(ab, \"10s\", \"max\", true)", + QueryPatt: "summarize(ab, \"10s\", \"max\", true)", + QueryFrom: 10, + QueryTo: 65, + Interval: 10, + Datapoints: getCopy(maxab), + }, + { + Target: "summarize(abc, \"10s\", \"max\", true)", + QueryPatt: "summarize(abc, \"10s\", \"max\", true)", + QueryFrom: 10, + QueryTo: 64, + Interval: 10, + Datapoints: getCopy(maxabc), + }, + }, + } + testSummarize("Consolidated", input, outputSum[0], "10s", "sum", false, t) + testSummarize("Consolidated", input, outputSum[1], "10s", "sum", true, t) + testSummarize("Consolidated", input, outputMax[0], "10s", "max", false, t) + testSummarize("Consolidated", input, outputMax[1], "10s", "max", true, t) +} + +// Tests misaligned QueryFrom/QueryTo with Interval and IntervalString +func TestSummarizeMisAligned(t *testing.T) { + input := []models.Series{ + { + Target: "ab", + QueryPatt: "ab", + QueryFrom: 7, + QueryTo: 67, + Interval: 5, + Datapoints: getCopy(abSummarize), + }, + { + Target: "abc", + QueryPatt: "abc", + QueryFrom: 1, + QueryTo: 67, + Interval: 2, + Datapoints: getCopy(abcSummarize), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(ab, \"10s\", \"sum\")", + QueryPatt: "summarize(ab, \"10s\", \"sum\")", + QueryFrom: 7, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(sumab), + }, + { + Target: "summarize(abc, \"10s\", \"sum\")", + QueryPatt: "summarize(abc, \"10s\", \"sum\")", + QueryFrom: 1, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(sumabc), + }, + }, + { + { + Target: "summarize(ab, \"10s\", \"sum\", true)", + QueryPatt: "summarize(ab, \"10s\", \"sum\", true)", + QueryFrom: 7, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(sumab), + }, + { + Target: "summarize(abc, \"10s\", \"sum\", true)", + QueryPatt: "summarize(abc, \"10s\", \"sum\", true)", + QueryFrom: 1, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(sumabc), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(ab, \"10s\", \"max\")", + QueryPatt: "summarize(ab, \"10s\", \"max\")", + QueryFrom: 7, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(maxab), + }, + { + Target: "summarize(abc, \"10s\", \"max\")", + QueryPatt: "summarize(abc, \"10s\", \"max\")", + QueryFrom: 1, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(maxabc), + }, + }, + { + { + Target: "summarize(ab, \"10s\", \"max\", true)", + QueryPatt: "summarize(ab, \"10s\", \"max\", true)", + QueryFrom: 7, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(maxab), + }, + { + Target: "summarize(abc, \"10s\", \"max\", true)", + QueryPatt: "summarize(abc, \"10s\", \"max\", true)", + QueryFrom: 1, + QueryTo: 67, + Interval: 10, + Datapoints: getCopy(maxabc), + }, + }, + } + + testSummarize("MisAligned Multiple", input, outputSum[0], "10s", "sum", false, t) + testSummarize("MisAligned Multiple", input, outputSum[1], "10s", "sum", true, t) + testSummarize("MisAligned Multiple", input, outputMax[0], "10s", "max", false, t) + testSummarize("MisAligned Multiple", input, outputMax[1], "10s", "max", true, t) +} + +func TestSummarizeAlignToFrom(t *testing.T) { + var aligned30 = []schema.Point{ + {Val: 1, Ts: 30}, + {Val: 2, Ts: 60}, + {Val: 3, Ts: 90}, + {Val: 4, Ts: 120}, + {Val: 5, Ts: 150}, + {Val: 6, Ts: 180}, + {Val: 7, Ts: 210}, + {Val: 8, Ts: 240}, + } + var unaligned45sum, unaligned45max = []schema.Point{ + {Val: 1, Ts: 0}, + {Val: 2, Ts: 45}, + {Val: 7, Ts: 90}, + {Val: 5, Ts: 135}, + {Val: 13, Ts: 180}, + {Val: 8, Ts: 225}, + }, + []schema.Point{ + {Val: 1, Ts: 0}, + {Val: 2, Ts: 45}, + {Val: 4, Ts: 90}, + {Val: 5, Ts: 135}, + {Val: 7, Ts: 180}, + {Val: 8, Ts: 225}, + } + var aligned45sum, aligned45max = []schema.Point{ + {Val: 3, Ts: 30}, + {Val: 3, Ts: 75}, + {Val: 9, Ts: 120}, + {Val: 6, Ts: 165}, + {Val: 15, Ts: 210}, + }, + []schema.Point{ + {Val: 2, Ts: 30}, + {Val: 3, Ts: 75}, + {Val: 5, Ts: 120}, + {Val: 6, Ts: 165}, + {Val: 8, Ts: 210}, + } + + input := []models.Series{ + { + Target: "align", + QueryPatt: "align", + QueryFrom: 30, + QueryTo: 240, + Interval: 30, + Datapoints: getCopy(aligned30), + }, + } + outputSum := [][]models.Series{ + { + { + Target: "summarize(align, \"45s\", \"sum\")", + QueryPatt: "summarize(align, \"45s\", \"sum\")", + QueryFrom: 30, + QueryTo: 240, + Interval: 45, + Datapoints: getCopy(unaligned45sum), + }, + }, + { + { + Target: "summarize(align, \"45s\", \"sum\", true)", + QueryPatt: "summarize(align, \"45s\", \"sum\", true)", + QueryFrom: 30, + QueryTo: 240, + Interval: 45, + Datapoints: getCopy(aligned45sum), + }, + }, + } + outputMax := [][]models.Series{ + { + { + Target: "summarize(align, \"45s\", \"max\")", + QueryPatt: "summarize(align, \"45s\", \"max\")", + QueryFrom: 30, + QueryTo: 240, + Interval: 45, + Datapoints: getCopy(unaligned45max), + }, + }, + { + { + Target: "summarize(align, \"45s\", \"max\", true)", + QueryPatt: "summarize(align, \"45s\", \"max\", true)", + QueryFrom: 30, + QueryTo: 240, + Interval: 45, + Datapoints: getCopy(aligned45max), + }, + }, + } + + testSummarize("AlignToFrom", input, outputSum[0], "45s", "sum", false, t) + testSummarize("AlignToFrom", input, outputSum[1], "45s", "sum", true, t) + testSummarize("AlignToFrom", input, outputMax[0], "45s", "max", false, t) + testSummarize("AlignToFrom", input, outputMax[1], "45s", "max", true, t) +} + +func testSummarize(name string, in []models.Series, out []models.Series, intervalString, fn string, alignToFrom bool, t *testing.T) { + f := NewSummarize() + + summarize := f.(*FuncSummarize) + summarize.in = NewMock(in) + summarize.intervalString = intervalString + summarize.fn = fn + summarize.alignToFrom = alignToFrom + + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q: err should be nil. got %q", name, err) + } + + if len(gots) != len(out) { + t.Fatalf("case %q (%q, %q, %t): len output expected %d, got %d", name, intervalString, fn, alignToFrom, len(out), len(gots)) + } + + for i, got := range gots { + exp := out[i] + if got.Target != exp.Target { + t.Fatalf("case %q (%q, %q, %t): expected target %q, got %q", name, intervalString, fn, alignToFrom, exp.Target, got.Target) + } + if len(got.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q (%q, %q, %t): len output expected %#v, got %#v", name, intervalString, fn, alignToFrom, (exp.Datapoints), (got.Datapoints)) + } + for j, p := range exp.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(got.Datapoints[j].Val) + if (bothNaN || p.Val == got.Datapoints[j].Val) && p.Ts == got.Datapoints[j].Ts { + continue + } + + t.Fatalf("case %q (%q, %q, %t): output point %d - expected %v got %v", name, intervalString, fn, alignToFrom, j, p, got.Datapoints[j]) + } + } +} + +func BenchmarkSummarize10k_1NoNulls(b *testing.B) { + benchmarkSummarize(b, 1, test.RandFloats10k, test.RandFloats10k, "1h") +} +func BenchmarkSummarize10k_10NoNulls(b *testing.B) { + benchmarkSummarize(b, 10, test.RandFloats10k, test.RandFloats10k, "1h") +} +func BenchmarkSummarize10k_100NoNulls(b *testing.B) { + benchmarkSummarize(b, 100, test.RandFloats10k, test.RandFloats10k, "1h") +} +func BenchmarkSummarize10k_1000NoNulls(b *testing.B) { + benchmarkSummarize(b, 1000, test.RandFloats10k, test.RandFloats10k, "1h") +} + +func BenchmarkSummarize10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k, "1h") +} + +func BenchmarkSummarize10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k, "1h") +} +func BenchmarkSummarize10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkSummarize(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k, "1h") +} + +func benchmarkSummarize(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point, intervalString string) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + Target: strconv.Itoa(i), + } + if i%1 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + var err error + for i := 0; i < b.N; i++ { + f := NewSummarize() + + summarize := f.(*FuncSummarize) + summarize.in = NewMock(input) + summarize.intervalString = intervalString + + results, err = f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + } + b.SetBytes(int64(numSeries * len(results[0].Datapoints) * 12)) +} diff --git a/expr/funcs.go b/expr/funcs.go index c39831e034..79c52895d4 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -73,6 +73,7 @@ func init() { "stddevSeries": {NewAggregateConstructor("stddev", crossSeriesStddev), true}, "sum": {NewAggregateConstructor("sum", crossSeriesSum), true}, "sumSeries": {NewAggregateConstructor("sum", crossSeriesSum), true}, + "summarize": {NewSummarize, true}, "transformNull": {NewTransformNull, true}, } } diff --git a/expr/validator.go b/expr/validator.go index dddba2ff60..108bc898cb 100644 --- a/expr/validator.go +++ b/expr/validator.go @@ -2,6 +2,9 @@ package expr import ( "errors" + + "github.com/grafana/metrictank/consolidation" + "github.com/raintank/dur" ) var ErrIntPositive = errors.New("integer must be positive") @@ -23,3 +26,12 @@ func IsAggFunc(e *expr) error { } return nil } + +func IsConsolFunc(e *expr) error { + return consolidation.Validate(e.str) +} + +func IsIntervalString(e *expr) error { + _, err := dur.ParseDuration(e.str) + return err +}