diff --git a/docs/graphite.md b/docs/graphite.md index 5f78126473..c07f3e2186 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -32,7 +32,7 @@ Here are the currently included functions: | alias(seriesList, alias) seriesList | | Stable | | aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable | | aliasSub(seriesList, pattern, replacement) seriesList | | Stable | -| asPercent(seriesList, seriesList, nodeList) seriesList | | Stable | +| asPercent(seriesList, seriesList, nodeList) seriesList | | Stable | | averageSeries(seriesLists) series | avg | Stable | | consolidateBy(seriesList, func) seriesList | | Stable | | countSeries(seriesLists) series | | Stable | @@ -62,7 +62,11 @@ Here are the currently included functions: | removeAboveValue(seriesList, n) seriesList | | Stable | | removeBelowValue(seriesList, n) seriesList | | Stable | | scale(seriesList, num) series | | Stable | -| scaleToSeconds(seriesList, seconds) series | | Stable | +| scaleToSeconds(seriesList, seconds) seriesList | | Stable | +| sortBy(seriesList, func, reverse) seriesList | | Stable | +| sortByMaxima(seriesList) seriesList | | Stable | +| sortByName(seriesList, natural, reverse) seriesList | | Stable | +| sortByTotal(seriesList) seriesList | | Stable | | stddevSeries(seriesList) series | | Stable | | sumSeries(seriesLists) series | sum | Stable | | summarize(seriesList) seriesList | | Stable | diff --git a/expr/func_highestlowest.go b/expr/func_highestlowest.go index bc44f83c40..f50ac585e4 100644 --- a/expr/func_highestlowest.go +++ b/expr/func_highestlowest.go @@ -1,9 +1,6 @@ package expr import ( - "math" - "sort" - "github.com/grafana/metrictank/consolidation" "github.com/grafana/metrictank/api/models" @@ -40,11 +37,6 @@ func (s *FuncHighestLowest) Context(context Context) Context { return context } -type ScoredSeries struct { - score float64 - serie models.Series -} - func (s *FuncHighestLowest) Exec(cache map[Req][]models.Series) ([]models.Series, error) { series, err := s.in.Exec(cache) if err != nil { @@ -55,33 +47,11 @@ func (s *FuncHighestLowest) Exec(cache map[Req][]models.Series) ([]models.Series return series, nil } - consolidationFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) - - // score series by their consolidated value - scored := make([]ScoredSeries, len(series)) - for i, serie := range series { - scored[i] = ScoredSeries{ - score: consolidationFunc(serie.Datapoints), - serie: serie, - } - } - - sort.SliceStable(scored, func(i, j int) bool { - iVal := scored[i].score - jVal := scored[j].score - if s.highest { - return math.IsNaN(jVal) && !math.IsNaN(iVal) || iVal > jVal - } - return math.IsNaN(jVal) && !math.IsNaN(iVal) || iVal < jVal - }) + SortSeriesWithConsolidator(series, consolidation.FromConsolidateBy(s.fn), s.highest) if s.n > int64(len(series)) { s.n = int64(len(series)) } - for i := 0; i < int(s.n); i++ { - series[i] = scored[i].serie - } - return series[:s.n], nil } diff --git a/expr/func_sortby.go b/expr/func_sortby.go new file mode 100644 index 0000000000..60bec3af57 --- /dev/null +++ b/expr/func_sortby.go @@ -0,0 +1,80 @@ +package expr + +import ( + "math" + "sort" + + "github.com/grafana/metrictank/consolidation" + + "github.com/grafana/metrictank/api/models" +) + +type FuncSortBy struct { + in GraphiteFunc + fn string + reverse bool +} + +func NewSortByConstructor(fn string, reverse bool) func() GraphiteFunc { + return func() GraphiteFunc { + return &FuncSortBy{fn: fn, reverse: reverse} + } +} + +func (s *FuncSortBy) Signature() ([]Arg, []Arg) { + if s.fn != "" { + return []Arg{ + ArgSeriesList{val: &s.in}, + }, []Arg{ArgSeriesList{}} + } + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgString{key: "func", val: &s.fn, validator: []Validator{IsConsolFunc}}, + ArgBool{key: "reverse", val: &s.reverse, opt: true}, + }, []Arg{ArgSeriesList{}} +} + +func (s *FuncSortBy) Context(context Context) Context { + return context +} + +func (s *FuncSortBy) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + SortSeriesWithConsolidator(series, consolidation.FromConsolidateBy(s.fn), s.reverse) + + return series, nil +} + +type ScoredSeries struct { + score float64 + serie models.Series +} + +func SortSeriesWithConsolidator(series []models.Series, c consolidation.Consolidator, reverse bool) { + consolidationFunc := consolidation.GetAggFunc(c) + // score series by their consolidated value + scored := make([]ScoredSeries, len(series)) + for i, serie := range series { + scored[i] = ScoredSeries{ + score: consolidationFunc(serie.Datapoints), + serie: serie, + } + } + + sort.SliceStable(scored, func(i, j int) bool { + iVal := scored[i].score + jVal := scored[j].score + if reverse { + return math.IsNaN(jVal) && !math.IsNaN(iVal) || iVal > jVal + } + return math.IsNaN(iVal) && !math.IsNaN(jVal) || iVal < jVal + }) + + for i := range scored { + series[i] = scored[i].serie + } +} diff --git a/expr/func_sortby_test.go b/expr/func_sortby_test.go new file mode 100644 index 0000000000..295321e906 --- /dev/null +++ b/expr/func_sortby_test.go @@ -0,0 +1,392 @@ +package expr + +import ( + "math" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestSortByAverage(t *testing.T) { + testSortBy( + "sortBy(average,false)", + "average", + false, + []models.Series{ + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + }, + t, + ) +} + +func TestSortByAverageReverse(t *testing.T) { + testSortBy( + "sortBy(average,true)", + "average", + true, + []models.Series{ + { + Interval: 10, + QueryPatt: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + QueryPatt: "c", + Datapoints: getCopy(c), + }, + }, + t, + ) +} + +func TestSortByCurrent(t *testing.T) { + testSortBy( + "sortBy(current,false)", + "current", + false, + []models.Series{ + { + Interval: 10, + QueryPatt: "avg4a2b", + Datapoints: getCopy(avg4a2b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "avg4a2b", + Datapoints: getCopy(avg4a2b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + }, + t, + ) +} + +func TestSortByCurrentReverse(t *testing.T) { + testSortBy( + "sortBy(current,true)", + "current", + true, + []models.Series{ + { + Interval: 10, + QueryPatt: "sumab", + Datapoints: getCopy(sumab), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "sumab", + Datapoints: getCopy(sumab), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + }, + t, + ) +} + +func TestSortByMax(t *testing.T) { + testSortBy( + "sortBy(max,false)", + "max", + false, + []models.Series{ + { + Interval: 10, + QueryPatt: "avg4a2b", + Datapoints: getCopy(avg4a2b), + }, + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "avg4a2b", + Datapoints: getCopy(avg4a2b), + }, + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + }, + t, + ) +} + +func TestSortByMaxReverseLong(t *testing.T) { + testSortBy( + "sortBy(current,true)", + "current", + true, + []models.Series{ + { + Interval: 10, + QueryPatt: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + QueryPatt: "d", + Datapoints: getCopy(d), + }, + { + Interval: 10, + QueryPatt: "sumabc", + Datapoints: getCopy(sumabc), + }, + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + { + Interval: 10, + QueryPatt: "sumabc", + Datapoints: getCopy(sumabc), + }, + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + QueryPatt: "d", + Datapoints: getCopy(d), + }, + { + Interval: 10, + QueryPatt: "c", + Datapoints: getCopy(c), + }, + }, + t, + ) +} + +func testSortBy(name string, fn string, reverse bool, in []models.Series, out []models.Series, t *testing.T) { + f := NewSortByConstructor(fn, reverse)() + f.(*FuncSortBy).in = NewMock(in) + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q: err should be nil. got %q", name, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q: len output expected %d, got %d", name, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.QueryPatt != exp.QueryPatt { + t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q: len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) + if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkSortBy10k_1NoNulls(b *testing.B) { + benchmarkSortBy(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkSortBy10k_10NoNulls(b *testing.B) { + benchmarkSortBy(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkSortBy10k_100NoNulls(b *testing.B) { + benchmarkSortBy(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkSortBy10k_1000NoNulls(b *testing.B) { + benchmarkSortBy(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkSortBy10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkSortBy10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkSortBy10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkSortBy(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkSortBy(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + f := NewSortByConstructor("average", true)() + f.(*FuncSortBy).in = NewMock(input) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/funcs.go b/expr/funcs.go index 49aea2ae0e..c8817dd15e 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -87,7 +87,10 @@ func init() { "scale": {NewScale, true}, "scaleToSeconds": {NewScaleToSeconds, true}, "smartSummarize": {NewSmartSummarize, false}, + "sortBy": {NewSortByConstructor("", false), true}, + "sortByMaxima": {NewSortByConstructor("max", true), true}, "sortByName": {NewSortByName, true}, + "sortByTotal": {NewSortByConstructor("sum", true), true}, "stddevSeries": {NewAggregateConstructor("stddev", crossSeriesStddev), true}, "sum": {NewAggregateConstructor("sum", crossSeriesSum), true}, "sumSeries": {NewAggregateConstructor("sum", crossSeriesSum), true},