diff --git a/docs/graphite.md b/docs/graphite.md index baa0984ac8..a9a5f27434 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -27,30 +27,33 @@ See also: Here are the currently included functions: -| Function name and signature | Alias | Metrictank | -| ----------------------------------------------------- | ----------- | ---------- | -| alias(seriesList, alias) seriesList | | Stable | -| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable | -| aliasSub(seriesList, pattern, replacement) seriesList | | Stable | -| averageSeries(seriesLists) series | avg | Stable | -| consolidateBy(seriesList, func) seriesList | | Stable | -| countSeries(seriesLists) series | | Stable | -| diffSeries(seriesLists) series | | Stable | -| divideSeries(dividend, divisor) seriesList | | Stable | -| divideSeriesLists(dividends, divisors) seriesList | | Stable | -| exclude(seriesList, pattern) seriesList | | Stable | -| grep(seriesList, pattern) seriesList | | Stable | -| groupByTags(seriesList, func, tagList) seriesList | | Stable | -| isNonNull(seriesList) seriesList | | Stable | -| maxSeries(seriesList) series | max | Stable | -| minSeries(seriesList) series | min | Stable | -| multiplySeries(seriesList) series | | Stable | -| movingAverage(seriesLists, windowSize) seriesList | | Unstable | -| perSecond(seriesLists) seriesList | | Stable | -| rangeOfSeries(seriesList) series | | Stable | -| scale(seriesList, num) series | | Stable | -| scaleToSeconds(seriesList, seconds) series | | Stable | -| stddevSeries(seriesList) series | | Stable | -| sumSeries(seriesLists) series | sum | Stable | -| summarize(seriesList) seriesList | | Stable | -| transformNull(seriesList, default=0) seriesList | | Stable | + +| Function name and signature | Alias | Metrictank | +| -------------------------------------------------------------- | ----------- | ---------- | +| alias(seriesList, alias) seriesList | | Stable | +| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable | +| aliasSub(seriesList, pattern, replacement) seriesList | | Stable | +| averageSeries(seriesLists) series | avg | Stable | +| consolidateBy(seriesList, func) seriesList | | Stable | +| countSeries(seriesLists) series | | Stable | +| diffSeries(seriesLists) series | | Stable | +| divideSeries(dividend, divisor) seriesList | | Stable | +| divideSeriesLists(dividends, divisors) seriesList | | Stable | +| exclude(seriesList, pattern) seriesList | | Stable | +| filterSeries(seriesList, func, operator, threshold) seriesList | | Stable | +| grep(seriesList, pattern) seriesList | | Stable | +| groupByTags(seriesList, func, tagList) seriesList | | Stable | +| isNonNull(seriesList) seriesList | | Stable | +| maxSeries(seriesList) series | max | Stable | +| minSeries(seriesList) series | min | Stable | +| multiplySeries(seriesList) series | | Stable | +| movingAverage(seriesLists, windowSize) seriesList | | Unstable | +| perSecond(seriesLists) seriesList | | Stable | +| rangeOfSeries(seriesList) series | | Stable | +| scale(seriesList, num) series | | Stable | +| scaleToSeconds(seriesList, seconds) series | | Stable | +| stddevSeries(seriesList) series | | Stable | +| sumSeries(seriesLists) series | sum | Stable | +| summarize(seriesList) seriesList | | Stable | +| transformNull(seriesList, default=0) seriesList | | Stable | + diff --git a/expr/func_filterseries.go b/expr/func_filterseries.go new file mode 100644 index 0000000000..256107d6af --- /dev/null +++ b/expr/func_filterseries.go @@ -0,0 +1,88 @@ +package expr + +import ( + "errors" + "math" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/consolidation" +) + +type FuncFilterSeries struct { + in GraphiteFunc + fn string + operator string + threshold float64 +} + +func NewFilterSeries() GraphiteFunc { + return &FuncFilterSeries{} +} + +func (s *FuncFilterSeries) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgString{key: "func", val: &s.fn, validator: []Validator{IsConsolFunc}}, + ArgString{key: "operator", val: &s.operator, validator: []Validator{IsOperator}}, + ArgFloat{key: "threshold", val: &s.threshold}, + }, []Arg{ArgSeriesList{}} +} + +func (s *FuncFilterSeries) Context(context Context) Context { + return context +} + +func getOperatorFunc(operator string) (func(float64, float64) bool, error) { + switch operator { + case "=": + return func(val, threshold float64) bool { + return val == threshold + }, nil + + case "!=": + return func(val, threshold float64) bool { + return val != threshold + }, nil + + case ">": + return func(val, threshold float64) bool { + return val > threshold + }, nil + case ">=": + return func(val, threshold float64) bool { + return val >= threshold + }, nil + + case "<": + return func(val, threshold float64) bool { + return math.IsNaN(val) || val < threshold + }, nil + case "<=": + return func(val, threshold float64) bool { + return math.IsNaN(val) || val <= threshold + }, nil + } + return func(v1, v2 float64) bool { return false }, errors.New("Unsupported operator: " + operator) +} + +func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + consolidationFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) + operatorFunc, err := getOperatorFunc(s.operator) + if err != nil { + return nil, err + } + + out := make([]models.Series, 0, len(series)) + for _, serie := range series { + if operatorFunc(consolidationFunc(serie.Datapoints), s.threshold) { + out = append(out, serie) + } + } + + return out, nil +} diff --git a/expr/func_filterseries_test.go b/expr/func_filterseries_test.go new file mode 100644 index 0000000000..ce32b824b1 --- /dev/null +++ b/expr/func_filterseries_test.go @@ -0,0 +1,385 @@ +package expr + +import ( + "math" + "math/rand" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestFilterSeriesEqual(t *testing.T) { + + testFilterSeries( + "abcd_equal", + "max", + "=", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + }, + t, + ) +} + +func TestFilterSeriesNotEqual(t *testing.T) { + + testFilterSeries( + "abcd_notequal", + "max", + "!=", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesLessThan(t *testing.T) { + + testFilterSeries( + "abcd_lessthan", + "max", + "<", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesLessThanOrEqualTo(t *testing.T) { + + testFilterSeries( + "abcd_lessorequal", + "max", + "<=", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesMoreThan(t *testing.T) { + + testFilterSeries( + "abcd_more", + "max", + ">", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + }, + t, + ) +} + +func TestFilterSeriesMoreThanOrEqual(t *testing.T) { + + testFilterSeries( + "abcd_moreorequal", + "max", + ">=", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func testFilterSeries(name string, fn string, operator string, threshold float64, in []models.Series, out []models.Series, t *testing.T) { + f := NewFilterSeries() + f.(*FuncFilterSeries).in = NewMock(in) + f.(*FuncFilterSeries).fn = fn + f.(*FuncFilterSeries).operator = operator + f.(*FuncFilterSeries).threshold = threshold + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q (%s, %s, %f): err should be nil. got %q", name, fn, operator, threshold, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q (%s, %s, %f): isNonNull len output expected %d, got %d", name, fn, operator, threshold, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.Target != exp.Target { + t.Fatalf("case %q (%s, %s, %f): expected target %q, got %q", name, fn, operator, threshold, exp.Target, g.Target) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q (%s, %s, %f) len output expected %d, got %d", name, fn, operator, threshold, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) + if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q (%s, %s, %f): output point %d - expected %v got %v", name, fn, operator, threshold, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkFilterSeries10k_1NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_10NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_100NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_1000NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkFilterSeries10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkFilterSeries10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkFilterSeries(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewFilterSeries() + f.(*FuncFilterSeries).in = NewMock(input) + f.(*FuncFilterSeries).fn = "sum" + f.(*FuncFilterSeries).operator = ">" + f.(*FuncFilterSeries).threshold = rand.Float64() + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/funcs.go b/expr/funcs.go index 5b0572e92b..5aef8783f6 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -60,6 +60,7 @@ func init() { "divideSeries": {NewDivideSeries, true}, "divideSeriesLists": {NewDivideSeriesLists, true}, "exclude": {NewExclude, true}, + "filterSeries": {NewFilterSeries, true}, "grep": {NewGrep, true}, "groupByTags": {NewGroupByTags, true}, "isNonNull": {NewIsNonNull, true}, diff --git a/expr/validator.go b/expr/validator.go index 7ec5ba2ede..07f89e13e3 100644 --- a/expr/validator.go +++ b/expr/validator.go @@ -36,3 +36,11 @@ func IsIntervalString(e *expr) error { _, err := dur.ParseDuration(e.str) return err } + +func IsOperator(e *expr) error { + switch e.str { + case "=", "!=", ">", ">=", "<", "<=": + return nil + } + return errors.New("Unsupported operator: " + e.str) +}