diff --git a/docs/graphite.md b/docs/graphite.md index 11d7daa00f..4da43d0b0c 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -40,6 +40,7 @@ divideSeriesLists(dividends, divisors) seriesList | | Stable exclude(seriesList, pattern) seriesList | | Stable grep(seriesList, pattern) seriesList | | Stable groupByTags(seriesList, func, tagList) seriesList | | Stable +isNonNull(seriesList) seriesList | | Stable maxSeries(seriesList) series | max | Stable minSeries(seriesList) series | min | Stable multiplySeries(seriesList) series | | Stable diff --git a/expr/func_isnonnull.go b/expr/func_isnonnull.go new file mode 100644 index 0000000000..164a875385 --- /dev/null +++ b/expr/func_isnonnull.go @@ -0,0 +1,61 @@ +package expr + +import ( + "fmt" + "math" + + "github.com/grafana/metrictank/api/models" + schema "gopkg.in/raintank/schema.v1" +) + +type FuncIsNonNull struct { + in GraphiteFunc +} + +func NewIsNonNull() GraphiteFunc { + return &FuncIsNonNull{} +} + +func (s *FuncIsNonNull) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}}, []Arg{ArgSeriesList{}} +} + +func (s *FuncIsNonNull) Context(context Context) Context { + return context +} + +func (s *FuncIsNonNull) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + out := make([]models.Series, len(series)) + for i, serie := range series { + transformed := &out[i] + transformed.Target = fmt.Sprintf("isNonNull(%s)", serie.Target) + transformed.QueryPatt = fmt.Sprintf("isNonNull(%s)", serie.QueryPatt) + transformed.Tags = make(map[string]string, len(serie.Tags)+1) + transformed.Datapoints = pointSlicePool.Get().([]schema.Point) + transformed.Interval = serie.Interval + transformed.Consolidator = serie.Consolidator + transformed.QueryCons = serie.QueryCons + + for k, v := range serie.Tags { + transformed.Tags[k] = v + } + transformed.Tags["isNonNull"] = "1" + for _, p := range serie.Datapoints { + if math.IsNaN(p.Val) { + p.Val = 0 + } else { + p.Val = 1 + } + transformed.Datapoints = append(transformed.Datapoints, p) + } + cache[Req{}] = append(cache[Req{}], *transformed) + } + + return out, nil +} diff --git a/expr/func_isnonnull_test.go b/expr/func_isnonnull_test.go new file mode 100644 index 0000000000..276518857d --- /dev/null +++ b/expr/func_isnonnull_test.go @@ -0,0 +1,217 @@ +package expr + +import ( + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +var aIsNonNull = []schema.Point{ + {Val: 1, Ts: 10}, + {Val: 1, Ts: 20}, + {Val: 1, Ts: 30}, + {Val: 0, Ts: 40}, + {Val: 0, Ts: 50}, + {Val: 1, Ts: 60}, +} + +var bIsNonNull = []schema.Point{ + {Val: 1, Ts: 10}, + {Val: 1, Ts: 20}, + {Val: 1, Ts: 30}, + {Val: 0, Ts: 40}, + {Val: 1, Ts: 50}, + {Val: 0, Ts: 60}, +} + +var cdIsNonNull = []schema.Point{ + {Val: 1, Ts: 10}, + {Val: 1, Ts: 20}, + {Val: 1, Ts: 30}, + {Val: 1, Ts: 40}, + {Val: 1, Ts: 50}, + {Val: 1, Ts: 60}, +} + +func TestIsNonNullSingle(t *testing.T) { + testIsNonNull( + "identity", + []models.Series{ + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "isNonNull(a)", + Datapoints: getCopy(aIsNonNull), + }, + }, + t, + ) +} + +func TestIsNonNullSingleAllNonNull(t *testing.T) { + testIsNonNull( + "identity-counter8bit", + []models.Series{ + { + Interval: 10, + QueryPatt: "counter8bit", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + Interval: 10, + QueryPatt: "isNonNull(counter8bit)", + Datapoints: getCopy(cdIsNonNull), + }, + }, + t, + ) +} + +func TestIsNonNullMulti(t *testing.T) { + testIsNonNull( + "multiple-series", + []models.Series{ + { + Interval: 10, + QueryPatt: "a", + Datapoints: getCopy(a), + }, + { + Interval: 10, + QueryPatt: "b.*", + Datapoints: getCopy(b), + }, + { + Interval: 10, + QueryPatt: "c.foo{bar,baz}", + Datapoints: getCopy(c), + }, + { + Interval: 10, + QueryPatt: "movingAverage(bar, '1min')", + Datapoints: getCopy(d), + }, + }, + []models.Series{ + { + QueryPatt: "isNonNull(a)", + Datapoints: getCopy(aIsNonNull), + }, + { + QueryPatt: "isNonNull(b.*)", + Datapoints: getCopy(bIsNonNull), + }, + { + QueryPatt: "isNonNull(c.foo{bar,baz})", + Datapoints: getCopy(cdIsNonNull), + }, + { + QueryPatt: "isNonNull(movingAverage(bar, '1min'))", + Datapoints: getCopy(cdIsNonNull), + }, + }, + t, + ) +} + +func testIsNonNull(name string, in []models.Series, out []models.Series, t *testing.T) { + f := NewIsNonNull() + f.(*FuncIsNonNull).in = NewMock(in) + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q: err should be nil. got %q", name, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q: isNonNull len output expected %d, got %d", name, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.QueryPatt != exp.QueryPatt { + t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q: len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + if (p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkIsNonNull10k_1NoNulls(b *testing.B) { + benchmarkIsNonNull(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkIsNonNull10k_10NoNulls(b *testing.B) { + benchmarkIsNonNull(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkIsNonNull10k_100NoNulls(b *testing.B) { + benchmarkIsNonNull(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkIsNonNull10k_1000NoNulls(b *testing.B) { + benchmarkIsNonNull(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkIsNonNull10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkIsNonNull10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkIsNonNull10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkIsNonNull(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkIsNonNull(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewIsNonNull() + f.(*FuncIsNonNull).in = NewMock(input) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/funcs.go b/expr/funcs.go index 3ff9090c92..6b51733048 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -61,6 +61,7 @@ func init() { "exclude": {NewExclude, true}, "grep": {NewGrep, true}, "groupByTags": {NewGroupByTags, true}, + "isNonNull": {NewIsNonNull, true}, "max": {NewAggregateConstructor("max", crossSeriesMax), true}, "maxSeries": {NewAggregateConstructor("max", crossSeriesMax), true}, "min": {NewAggregateConstructor("min", crossSeriesMin), true},