From 57c5eaced853cc0f4f12ebcb99fac3048f5feb30 Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Thu, 2 Aug 2018 15:02:24 -0400 Subject: [PATCH] working function --- docs/graphite.md | 1 + expr/func_countseries.go | 60 +++++++++++++ expr/func_countseries_test.go | 164 ++++++++++++++++++++++++++++++++++ expr/funcs.go | 1 + 4 files changed, 226 insertions(+) create mode 100644 expr/func_countseries.go create mode 100644 expr/func_countseries_test.go diff --git a/docs/graphite.md b/docs/graphite.md index 5ea5331de2..baa0984ac8 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -34,6 +34,7 @@ Here are the currently included functions: | aliasSub(seriesList, pattern, replacement) seriesList | | Stable | | averageSeries(seriesLists) series | avg | Stable | | consolidateBy(seriesList, func) seriesList | | Stable | +| countSeries(seriesLists) series | | Stable | | diffSeries(seriesLists) series | | Stable | | divideSeries(dividend, divisor) seriesList | | Stable | | divideSeriesLists(dividends, divisors) seriesList | | Stable | diff --git a/expr/func_countseries.go b/expr/func_countseries.go new file mode 100644 index 0000000000..092b3b6620 --- /dev/null +++ b/expr/func_countseries.go @@ -0,0 +1,60 @@ +package expr + +import ( + "fmt" + "strings" + + "github.com/grafana/metrictank/api/models" + schema "gopkg.in/raintank/schema.v1" +) + +type FuncCountSeries struct { + in []GraphiteFunc +} + +func NewCountSeries() GraphiteFunc { + return &FuncCountSeries{} +} + +func (s *FuncCountSeries) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesLists{val: &s.in}, + }, []Arg{ArgSeries{}} +} + +func (s *FuncCountSeries) Context(context Context) Context { + return context +} + +func (s *FuncCountSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, queryPatts, err := consumeFuncs(cache, s.in) + if err != nil { + return nil, err + } + + if len(series) == 0 { + return series, nil + } + + cons, queryCons := summarizeCons(series) + name := fmt.Sprintf("countSeries(%s)", strings.Join(queryPatts, ",")) + out := pointSlicePool.Get().([]schema.Point) + + for _, p := range series[0].Datapoints { + p.Val = float64(len(series)) + out = append(out, p) + } + + output := models.Series{ + Target: name, + QueryPatt: name, + Tags: map[string]string{"name": name}, + Datapoints: out, + Interval: series[0].Interval, + Consolidator: cons, + QueryCons: queryCons, + } + cache[Req{}] = append(cache[Req{}], output) + + return []models.Series{output}, nil +} diff --git a/expr/func_countseries_test.go b/expr/func_countseries_test.go new file mode 100644 index 0000000000..98db864dc1 --- /dev/null +++ b/expr/func_countseries_test.go @@ -0,0 +1,164 @@ +package expr + +import ( + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestCountSeriesFive(t *testing.T) { + out := []schema.Point{ + {Val: 5, Ts: 10}, + {Val: 5, Ts: 20}, + {Val: 5, Ts: 30}, + {Val: 5, Ts: 40}, + {Val: 5, Ts: 50}, + {Val: 5, Ts: 60}, + } + testCountSeries( + "five", + [][]models.Series{{ + { + Interval: 10, + QueryPatt: "abc", + Datapoints: getCopy(a), + }, + { + Interval: 10, + QueryPatt: "abc", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "abc", + Datapoints: getCopy(c), + }, + }, + { + { + Interval: 10, + QueryPatt: "ad", + Datapoints: getCopy(d), + }, + { + Interval: 10, + QueryPatt: "ad", + Datapoints: getCopy(a), + }, + }}, + + []models.Series{ + { + Interval: 10, + QueryPatt: "countSeries(abc,ad)", + Datapoints: out, + }, + }, + t, + ) +} +func TestCountSeriesNone(t *testing.T) { + testCountSeries( + "none", + [][]models.Series{}, + + []models.Series{}, + t, + ) +} + +func testCountSeries(name string, in [][]models.Series, out []models.Series, t *testing.T) { + f := NewCountSeries() + for _, i := range in { + f.(*FuncCountSeries).in = append(f.(*FuncCountSeries).in, NewMock(i)) + } + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q: err should be nil. got %q", name, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q: isNonNull len output expected %d, got %d", name, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.QueryPatt != exp.QueryPatt { + t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q: len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + if (p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkCountSeries10k_1NoNulls(b *testing.B) { + benchmarkCountSeries(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkCountSeries10k_10NoNulls(b *testing.B) { + benchmarkCountSeries(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkCountSeries10k_100NoNulls(b *testing.B) { + benchmarkCountSeries(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkCountSeries10k_1000NoNulls(b *testing.B) { + benchmarkCountSeries(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkCountSeries10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkCountSeries10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkCountSeries10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkCountSeries(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkCountSeries(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewCountSeries() + f.(*FuncCountSeries).in = append(f.(*FuncCountSeries).in, NewMock(input)) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/funcs.go b/expr/funcs.go index 4f12367938..5b0572e92b 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -55,6 +55,7 @@ func init() { "avg": {NewAggregateConstructor("average", crossSeriesAvg), true}, "averageSeries": {NewAggregateConstructor("average", crossSeriesAvg), true}, "consolidateBy": {NewConsolidateBy, true}, + "countSeries": {NewCountSeries, true}, "diffSeries": {NewAggregateConstructor("diff", crossSeriesDiff), true}, "divideSeries": {NewDivideSeries, true}, "divideSeriesLists": {NewDivideSeriesLists, true},