From 7573c2382d8ff8655ddc4ea95ca41613532d7abd Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Wed, 1 Aug 2018 20:05:00 -0400 Subject: [PATCH 1/4] fully working --- docs/graphite.md | 53 ++--- expr/func_filterseries.go | 71 ++++++ expr/func_filterseries_test.go | 385 +++++++++++++++++++++++++++++++++ expr/funcs.go | 1 + expr/validator.go | 8 + 5 files changed, 492 insertions(+), 26 deletions(-) create mode 100644 expr/func_filterseries.go create mode 100644 expr/func_filterseries_test.go diff --git a/docs/graphite.md b/docs/graphite.md index 5ea5331de2..9dd46fc622 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -27,29 +27,30 @@ See also: Here are the currently included functions: -| Function name and signature | Alias | Metrictank | -| ----------------------------------------------------- | ----------- | ---------- | -| alias(seriesList, alias) seriesList | | Stable | -| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable | -| aliasSub(seriesList, pattern, replacement) seriesList | | Stable | -| averageSeries(seriesLists) series | avg | Stable | -| consolidateBy(seriesList, func) seriesList | | Stable | -| diffSeries(seriesLists) series | | Stable | -| divideSeries(dividend, divisor) seriesList | | Stable | -| divideSeriesLists(dividends, divisors) seriesList | | Stable | -| exclude(seriesList, pattern) seriesList | | Stable | -| grep(seriesList, pattern) seriesList | | Stable | -| groupByTags(seriesList, func, tagList) seriesList | | Stable | -| isNonNull(seriesList) seriesList | | Stable | -| maxSeries(seriesList) series | max | Stable | -| minSeries(seriesList) series | min | Stable | -| multiplySeries(seriesList) series | | Stable | -| movingAverage(seriesLists, windowSize) seriesList | | Unstable | -| perSecond(seriesLists) seriesList | | Stable | -| rangeOfSeries(seriesList) series | | Stable | -| scale(seriesList, num) series | | Stable | -| scaleToSeconds(seriesList, seconds) series | | Stable | -| stddevSeries(seriesList) series | | Stable | -| sumSeries(seriesLists) series | sum | Stable | -| summarize(seriesList) seriesList | | Stable | -| transformNull(seriesList, default=0) seriesList | | Stable | +| Function name and signature | Alias | Metrictank | +| -------------------------------------------------------------- | ----------- | ---------- | +| alias(seriesList, alias) seriesList | | Stable | +| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable | +| aliasSub(seriesList, pattern, replacement) seriesList | | Stable | +| averageSeries(seriesLists) series | avg | Stable | +| consolidateBy(seriesList, func) seriesList | | Stable | +| diffSeries(seriesLists) series | | Stable | +| divideSeries(dividend, divisor) seriesList | | Stable | +| divideSeriesLists(dividends, divisors) seriesList | | Stable | +| exclude(seriesList, pattern) seriesList | | Stable | +| filterSeries(seriesList, func, operator, threshold) seriesList | | Stable | +| grep(seriesList, pattern) seriesList | | Stable | +| groupByTags(seriesList, func, tagList) seriesList | | Stable | +| isNonNull(seriesList) seriesList | | Stable | +| maxSeries(seriesList) series | max | Stable | +| minSeries(seriesList) series | min | Stable | +| multiplySeries(seriesList) series | | Stable | +| movingAverage(seriesLists, windowSize) seriesList | | Unstable | +| perSecond(seriesLists) seriesList | | Stable | +| rangeOfSeries(seriesList) series | | Stable | +| scale(seriesList, num) series | | Stable | +| scaleToSeconds(seriesList, seconds) series | | Stable | +| stddevSeries(seriesList) series | | Stable | +| sumSeries(seriesLists) series | sum | Stable | +| summarize(seriesList) seriesList | | Stable | +| transformNull(seriesList, default=0) seriesList | | Stable | diff --git a/expr/func_filterseries.go b/expr/func_filterseries.go new file mode 100644 index 0000000000..a88328fba4 --- /dev/null +++ b/expr/func_filterseries.go @@ -0,0 +1,71 @@ +package expr + +import ( + "math" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/consolidation" +) + +type FuncFilterSeries struct { + in GraphiteFunc + fn string + operator string + threshhold float64 +} + +func NewFilterSeries() GraphiteFunc { + return &FuncFilterSeries{} +} + +func (s *FuncFilterSeries) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgString{key: "func", val: &s.fn, validator: []Validator{IsConsolFunc}}, + ArgString{key: "operator", val: &s.operator, validator: []Validator{IsOperator}}, + ArgFloat{key: "threshhold", val: &s.threshhold}, + }, []Arg{ArgSeriesList{}} +} + +func (s *FuncFilterSeries) Context(context Context) Context { + return context +} + +func (s *FuncFilterSeries) operatorFunc(val float64) bool { + if math.IsNaN(val) { + return true + } + switch s.operator { + case "=": + return val == s.threshhold + case "!=": + return val != s.threshhold + case ">": + return val > s.threshhold + case ">=": + return val >= s.threshhold + case "<": + return val < s.threshhold + case "<=": + return val <= s.threshhold + } + return false // should never happen +} + +func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + consolidationFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) + + out := make([]models.Series, 0, len(series)) + for _, serie := range series { + if s.operatorFunc(consolidationFunc(serie.Datapoints)) { + out = append(out, serie) + } + } + + return out, nil +} diff --git a/expr/func_filterseries_test.go b/expr/func_filterseries_test.go new file mode 100644 index 0000000000..764a0808f9 --- /dev/null +++ b/expr/func_filterseries_test.go @@ -0,0 +1,385 @@ +package expr + +import ( + "math" + "math/rand" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestFilterSeriesEqual(t *testing.T) { + + testFilterSeries( + "abcd_equal", + "max", + "=", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + }, + t, + ) +} + +func TestFilterSeriesNotEqual(t *testing.T) { + + testFilterSeries( + "abcd_notequal", + "max", + "!=", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesLessThan(t *testing.T) { + + testFilterSeries( + "abcd_lessthan", + "max", + "<", + 1234567890, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesLessThanOrEqualTo(t *testing.T) { + + testFilterSeries( + "abcd_lessorequal", + "max", + "<=", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func TestFilterSeriesMoreThan(t *testing.T) { + + testFilterSeries( + "abcd_more", + "max", + ">", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + }, + t, + ) +} + +func TestFilterSeriesMoreThanOrEqual(t *testing.T) { + + testFilterSeries( + "abcd_moreorequal", + "max", + ">=", + 250, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "c", + Datapoints: getCopy(c), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "d", + Datapoints: getCopy(d), + }, + }, + t, + ) +} + +func testFilterSeries(name string, fn string, operator string, threshhold float64, in []models.Series, out []models.Series, t *testing.T) { + f := NewFilterSeries() + f.(*FuncFilterSeries).in = NewMock(in) + f.(*FuncFilterSeries).fn = fn + f.(*FuncFilterSeries).operator = operator + f.(*FuncFilterSeries).threshhold = threshhold + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q (%s, %s, %f): err should be nil. got %q", name, fn, operator, threshhold, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q (%s, %s, %f): isNonNull len output expected %d, got %d", name, fn, operator, threshhold, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.Target != exp.Target { + t.Fatalf("case %q (%s, %s, %f): expected target %q, got %q", name, fn, operator, threshhold, exp.Target, g.Target) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q (%s, %s, %f) len output expected %d, got %d", name, fn, operator, threshhold, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) + if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q (%s, %s, %f): output point %d - expected %v got %v", name, fn, operator, threshhold, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkFilterSeries10k_1NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_10NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_100NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkFilterSeries10k_1000NoNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkFilterSeries10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkFilterSeries10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkFilterSeries10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkFilterSeries(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkFilterSeries(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewFilterSeries() + f.(*FuncFilterSeries).in = NewMock(input) + f.(*FuncFilterSeries).fn = "sum" + f.(*FuncFilterSeries).operator = ">" + f.(*FuncFilterSeries).threshhold = rand.Float64() + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/funcs.go b/expr/funcs.go index 4f12367938..348008691a 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -61,6 +61,7 @@ func init() { "exclude": {NewExclude, true}, "grep": {NewGrep, true}, "groupByTags": {NewGroupByTags, true}, + "filterSeries": {NewFilterSeries, true}, "isNonNull": {NewIsNonNull, true}, "max": {NewAggregateConstructor("max", crossSeriesMax), true}, "maxSeries": {NewAggregateConstructor("max", crossSeriesMax), true}, diff --git a/expr/validator.go b/expr/validator.go index 7ec5ba2ede..07f89e13e3 100644 --- a/expr/validator.go +++ b/expr/validator.go @@ -36,3 +36,11 @@ func IsIntervalString(e *expr) error { _, err := dur.ParseDuration(e.str) return err } + +func IsOperator(e *expr) error { + switch e.str { + case "=", "!=", ">", ">=", "<", "<=": + return nil + } + return errors.New("Unsupported operator: " + e.str) +} From 5cf5a0e3e3b85fbf59a2afde1f40635658f3fd7c Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Thu, 2 Aug 2018 15:34:55 -0400 Subject: [PATCH 2/4] pr changes --- expr/func_filterseries.go | 47 ++++++++++++++++++++++------------ expr/func_filterseries_test.go | 16 ++++++------ 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/expr/func_filterseries.go b/expr/func_filterseries.go index a88328fba4..9781d03392 100644 --- a/expr/func_filterseries.go +++ b/expr/func_filterseries.go @@ -8,10 +8,10 @@ import ( ) type FuncFilterSeries struct { - in GraphiteFunc - fn string - operator string - threshhold float64 + in GraphiteFunc + fn string + operator string + threshold float64 } func NewFilterSeries() GraphiteFunc { @@ -23,7 +23,7 @@ func (s *FuncFilterSeries) Signature() ([]Arg, []Arg) { ArgSeriesList{val: &s.in}, ArgString{key: "func", val: &s.fn, validator: []Validator{IsConsolFunc}}, ArgString{key: "operator", val: &s.operator, validator: []Validator{IsOperator}}, - ArgFloat{key: "threshhold", val: &s.threshhold}, + ArgFloat{key: "threshold", val: &s.threshold}, }, []Arg{ArgSeriesList{}} } @@ -31,25 +31,37 @@ func (s *FuncFilterSeries) Context(context Context) Context { return context } -func (s *FuncFilterSeries) operatorFunc(val float64) bool { - if math.IsNaN(val) { - return true - } +func (s *FuncFilterSeries) getOperatorFunc() func(float64) bool { switch s.operator { case "=": - return val == s.threshhold + return func(val float64) bool { + return val == s.threshold + } + case "!=": - return val != s.threshhold + return func(val float64) bool { + return val != s.threshold + } + case ">": - return val > s.threshhold + return func(val float64) bool { + return val > s.threshold + } case ">=": - return val >= s.threshhold + return func(val float64) bool { + return val >= s.threshold + } + case "<": - return val < s.threshhold + return func(val float64) bool { + return math.IsNaN(val) || val < s.threshold + } case "<=": - return val <= s.threshhold + return func(val float64) bool { + return math.IsNaN(val) || val <= s.threshold + } } - return false // should never happen + return func(val float64) bool { return false } // should never happen } func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { @@ -59,10 +71,11 @@ func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, } consolidationFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) + operatorFunc := s.getOperatorFunc() out := make([]models.Series, 0, len(series)) for _, serie := range series { - if s.operatorFunc(consolidationFunc(serie.Datapoints)) { + if operatorFunc(consolidationFunc(serie.Datapoints)) { out = append(out, serie) } } diff --git a/expr/func_filterseries_test.go b/expr/func_filterseries_test.go index 764a0808f9..ce32b824b1 100644 --- a/expr/func_filterseries_test.go +++ b/expr/func_filterseries_test.go @@ -286,33 +286,33 @@ func TestFilterSeriesMoreThanOrEqual(t *testing.T) { ) } -func testFilterSeries(name string, fn string, operator string, threshhold float64, in []models.Series, out []models.Series, t *testing.T) { +func testFilterSeries(name string, fn string, operator string, threshold float64, in []models.Series, out []models.Series, t *testing.T) { f := NewFilterSeries() f.(*FuncFilterSeries).in = NewMock(in) f.(*FuncFilterSeries).fn = fn f.(*FuncFilterSeries).operator = operator - f.(*FuncFilterSeries).threshhold = threshhold + f.(*FuncFilterSeries).threshold = threshold gots, err := f.Exec(make(map[Req][]models.Series)) if err != nil { - t.Fatalf("case %q (%s, %s, %f): err should be nil. got %q", name, fn, operator, threshhold, err) + t.Fatalf("case %q (%s, %s, %f): err should be nil. got %q", name, fn, operator, threshold, err) } if len(gots) != len(out) { - t.Fatalf("case %q (%s, %s, %f): isNonNull len output expected %d, got %d", name, fn, operator, threshhold, len(out), len(gots)) + t.Fatalf("case %q (%s, %s, %f): isNonNull len output expected %d, got %d", name, fn, operator, threshold, len(out), len(gots)) } for i, g := range gots { exp := out[i] if g.Target != exp.Target { - t.Fatalf("case %q (%s, %s, %f): expected target %q, got %q", name, fn, operator, threshhold, exp.Target, g.Target) + t.Fatalf("case %q (%s, %s, %f): expected target %q, got %q", name, fn, operator, threshold, exp.Target, g.Target) } if len(g.Datapoints) != len(exp.Datapoints) { - t.Fatalf("case %q (%s, %s, %f) len output expected %d, got %d", name, fn, operator, threshhold, len(exp.Datapoints), len(g.Datapoints)) + t.Fatalf("case %q (%s, %s, %f) len output expected %d, got %d", name, fn, operator, threshold, len(exp.Datapoints), len(g.Datapoints)) } for j, p := range g.Datapoints { bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { continue } - t.Fatalf("case %q (%s, %s, %f): output point %d - expected %v got %v", name, fn, operator, threshhold, j, exp.Datapoints[j], p) + t.Fatalf("case %q (%s, %s, %f): output point %d - expected %v got %v", name, fn, operator, threshold, j, exp.Datapoints[j], p) } } } @@ -375,7 +375,7 @@ func benchmarkFilterSeries(b *testing.B, numSeries int, fn0, fn1 func() []schema f.(*FuncFilterSeries).in = NewMock(input) f.(*FuncFilterSeries).fn = "sum" f.(*FuncFilterSeries).operator = ">" - f.(*FuncFilterSeries).threshhold = rand.Float64() + f.(*FuncFilterSeries).threshold = rand.Float64() got, err := f.Exec(make(map[Req][]models.Series)) if err != nil { b.Fatalf("%s", err) From 9ce727879cf3322c64f9b2c0fdbe13c94f74945d Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Tue, 7 Aug 2018 11:13:55 -0400 Subject: [PATCH 3/4] added error check and free function --- expr/func_filterseries.go | 50 +++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/expr/func_filterseries.go b/expr/func_filterseries.go index 9781d03392..256107d6af 100644 --- a/expr/func_filterseries.go +++ b/expr/func_filterseries.go @@ -1,6 +1,7 @@ package expr import ( + "errors" "math" "github.com/grafana/metrictank/api/models" @@ -31,37 +32,37 @@ func (s *FuncFilterSeries) Context(context Context) Context { return context } -func (s *FuncFilterSeries) getOperatorFunc() func(float64) bool { - switch s.operator { +func getOperatorFunc(operator string) (func(float64, float64) bool, error) { + switch operator { case "=": - return func(val float64) bool { - return val == s.threshold - } + return func(val, threshold float64) bool { + return val == threshold + }, nil case "!=": - return func(val float64) bool { - return val != s.threshold - } + return func(val, threshold float64) bool { + return val != threshold + }, nil case ">": - return func(val float64) bool { - return val > s.threshold - } + return func(val, threshold float64) bool { + return val > threshold + }, nil case ">=": - return func(val float64) bool { - return val >= s.threshold - } + return func(val, threshold float64) bool { + return val >= threshold + }, nil case "<": - return func(val float64) bool { - return math.IsNaN(val) || val < s.threshold - } + return func(val, threshold float64) bool { + return math.IsNaN(val) || val < threshold + }, nil case "<=": - return func(val float64) bool { - return math.IsNaN(val) || val <= s.threshold - } + return func(val, threshold float64) bool { + return math.IsNaN(val) || val <= threshold + }, nil } - return func(val float64) bool { return false } // should never happen + return func(v1, v2 float64) bool { return false }, errors.New("Unsupported operator: " + operator) } func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { @@ -71,11 +72,14 @@ func (s *FuncFilterSeries) Exec(cache map[Req][]models.Series) ([]models.Series, } consolidationFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn)) - operatorFunc := s.getOperatorFunc() + operatorFunc, err := getOperatorFunc(s.operator) + if err != nil { + return nil, err + } out := make([]models.Series, 0, len(series)) for _, serie := range series { - if operatorFunc(consolidationFunc(serie.Datapoints)) { + if operatorFunc(consolidationFunc(serie.Datapoints), s.threshold) { out = append(out, serie) } } From 80168191272488996be62ca43a0fbb277fdd45df Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Thu, 9 Aug 2018 13:00:00 -0400 Subject: [PATCH 4/4] Update funcs.go --- expr/funcs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/expr/funcs.go b/expr/funcs.go index 348008691a..957c801e32 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -59,9 +59,9 @@ func init() { "divideSeries": {NewDivideSeries, true}, "divideSeriesLists": {NewDivideSeriesLists, true}, "exclude": {NewExclude, true}, + "filterSeries": {NewFilterSeries, true}, "grep": {NewGrep, true}, "groupByTags": {NewGroupByTags, true}, - "filterSeries": {NewFilterSeries, true}, "isNonNull": {NewIsNonNull, true}, "max": {NewAggregateConstructor("max", crossSeriesMax), true}, "maxSeries": {NewAggregateConstructor("max", crossSeriesMax), true},