Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit edbf319

Browse files
authored
Merge pull request #996 from bloomberg/nonNegativeDerivative
Add derivative and nonNegativeDerivative functions
2 parents 347a051 + 1b947a3 commit edbf319

6 files changed

+644
-43
lines changed

docs/graphite.md

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Here are the currently included functions:
3636
| averageSeries(seriesLists) series | avg | Stable |
3737
| consolidateBy(seriesList, func) seriesList | | Stable |
3838
| countSeries(seriesLists) series | | Stable |
39+
| derivative(seriesLists) series | | Stable |
3940
| diffSeries(seriesLists) series | | Stable |
4041
| divideSeries(dividend, divisor) seriesList | | Stable |
4142
| divideSeriesLists(dividends, divisors) seriesList | | Stable |
@@ -55,6 +56,7 @@ Here are the currently included functions:
5556
| minSeries(seriesList) series | min | Stable |
5657
| multiplySeries(seriesList) series | | Stable |
5758
| movingAverage(seriesLists, windowSize) seriesList | | Unstable |
59+
| nonNegatievDerivative(seriesList, maxValue) seriesList | | Stable |
5860
| perSecond(seriesLists) seriesList | | Stable |
5961
| rangeOfSeries(seriesList) series | | Stable |
6062
| removeAboveValue(seriesList, n) seriesList | | Stable |

expr/func_derivative.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package expr
2+
3+
import (
4+
"fmt"
5+
"math"
6+
7+
"github.com/grafana/metrictank/api/models"
8+
schema "gopkg.in/raintank/schema.v1"
9+
)
10+
11+
type FuncDerivative struct {
12+
in GraphiteFunc
13+
}
14+
15+
func NewDerivative() GraphiteFunc {
16+
return &FuncDerivative{}
17+
}
18+
19+
func (s *FuncDerivative) Signature() ([]Arg, []Arg) {
20+
return []Arg{
21+
ArgSeriesList{val: &s.in}}, []Arg{ArgSeriesList{}}
22+
}
23+
24+
func (s *FuncDerivative) Context(context Context) Context {
25+
return context
26+
}
27+
28+
func (s *FuncDerivative) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
29+
series, err := s.in.Exec(cache)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
outSeries := make([]models.Series, len(series))
35+
for i, serie := range series {
36+
serie.Target = fmt.Sprintf("derivative(%s)", serie.Target)
37+
serie.QueryPatt = fmt.Sprintf("derivative(%s)", serie.QueryPatt)
38+
out := pointSlicePool.Get().([]schema.Point)
39+
40+
newTags := make(map[string]string, len(serie.Tags)+1)
41+
for k, v := range serie.Tags {
42+
newTags[k] = v
43+
}
44+
newTags["derivative"] = "1"
45+
serie.Tags = newTags
46+
47+
prev := math.NaN()
48+
for _, p := range serie.Datapoints {
49+
val := p.Val
50+
if math.IsNaN(prev) || math.IsNaN(val) {
51+
p.Val = math.NaN()
52+
} else {
53+
p.Val -= prev
54+
}
55+
prev = val
56+
out = append(out, p)
57+
}
58+
serie.Datapoints = out
59+
outSeries[i] = serie
60+
}
61+
cache[Req{}] = append(cache[Req{}], outSeries...)
62+
return outSeries, nil
63+
}

expr/func_derivative_test.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package expr
2+
3+
import (
4+
"math"
5+
"strconv"
6+
"testing"
7+
8+
"github.com/grafana/metrictank/api/models"
9+
"github.com/grafana/metrictank/test"
10+
"gopkg.in/raintank/schema.v1"
11+
)
12+
13+
func TestDerivativeNoMax(t *testing.T) {
14+
testDerivative(
15+
"derivative",
16+
[]models.Series{
17+
{
18+
Interval: 10,
19+
QueryPatt: "abcd",
20+
Target: "a",
21+
Datapoints: getCopy(a),
22+
},
23+
{
24+
Interval: 10,
25+
QueryPatt: "abcd",
26+
Target: "b",
27+
Datapoints: getCopy(b),
28+
},
29+
{
30+
Interval: 10,
31+
QueryPatt: "abcd",
32+
Target: "c",
33+
Datapoints: getCopy(c),
34+
},
35+
{
36+
Interval: 10,
37+
QueryPatt: "abcd",
38+
Target: "d",
39+
Datapoints: getCopy(d),
40+
},
41+
},
42+
[]models.Series{
43+
{
44+
Interval: 10,
45+
QueryPatt: "derivative(abcd)",
46+
Datapoints: []schema.Point{
47+
{Val: math.NaN(), Ts: 10},
48+
{Val: 0, Ts: 20},
49+
{Val: 5.5, Ts: 30},
50+
{Val: math.NaN(), Ts: 40},
51+
{Val: math.NaN(), Ts: 50},
52+
{Val: math.NaN(), Ts: 60},
53+
},
54+
},
55+
{
56+
Interval: 10,
57+
QueryPatt: "derivative(abcd)",
58+
Datapoints: []schema.Point{
59+
{Val: math.NaN(), Ts: 10},
60+
{Val: math.MaxFloat64, Ts: 20},
61+
{Val: 0, Ts: 30},
62+
{Val: math.NaN(), Ts: 40},
63+
{Val: math.NaN(), Ts: 50},
64+
{Val: math.NaN(), Ts: 60},
65+
},
66+
},
67+
{
68+
Interval: 10,
69+
QueryPatt: "derivative(abcd)",
70+
Datapoints: []schema.Point{
71+
{Val: math.NaN(), Ts: 10},
72+
{Val: 0, Ts: 20},
73+
{Val: 1, Ts: 30},
74+
{Val: 1, Ts: 40},
75+
{Val: 1, Ts: 50},
76+
{Val: 1, Ts: 60},
77+
},
78+
},
79+
{
80+
Interval: 10,
81+
QueryPatt: "derivative(abcd)",
82+
Datapoints: []schema.Point{
83+
{Val: math.NaN(), Ts: 10},
84+
{Val: 33, Ts: 20},
85+
{Val: 166, Ts: 30},
86+
{Val: -170, Ts: 40},
87+
{Val: 51, Ts: 50},
88+
{Val: 170, Ts: 60},
89+
},
90+
},
91+
},
92+
t,
93+
)
94+
}
95+
96+
func testDerivative(name string, in []models.Series, out []models.Series, t *testing.T) {
97+
f := NewDerivative()
98+
f.(*FuncDerivative).in = NewMock(in)
99+
gots, err := f.Exec(make(map[Req][]models.Series))
100+
if err != nil {
101+
t.Fatalf("case %q: err should be nil. got %q", name, err)
102+
}
103+
if len(gots) != len(out) {
104+
t.Fatalf("case %q: isNonNull len output expected %d, got %d", name, len(out), len(gots))
105+
}
106+
for i, g := range gots {
107+
exp := out[i]
108+
if g.QueryPatt != exp.QueryPatt {
109+
t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt)
110+
}
111+
if len(g.Datapoints) != len(exp.Datapoints) {
112+
t.Fatalf("case %q len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints))
113+
}
114+
for j, p := range g.Datapoints {
115+
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
116+
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
117+
continue
118+
}
119+
t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p)
120+
}
121+
}
122+
}
123+
func BenchmarkDerivative10k_1NoNulls(b *testing.B) {
124+
benchmarkDerivative(b, 1, test.RandFloats10k, test.RandFloats10k)
125+
}
126+
func BenchmarkDerivative10k_10NoNulls(b *testing.B) {
127+
benchmarkDerivative(b, 10, test.RandFloats10k, test.RandFloats10k)
128+
}
129+
func BenchmarkDerivative10k_100NoNulls(b *testing.B) {
130+
benchmarkDerivative(b, 100, test.RandFloats10k, test.RandFloats10k)
131+
}
132+
func BenchmarkDerivative10k_1000NoNulls(b *testing.B) {
133+
benchmarkDerivative(b, 1000, test.RandFloats10k, test.RandFloats10k)
134+
}
135+
func BenchmarkDerivative10k_1SomeSeriesHalfNulls(b *testing.B) {
136+
benchmarkDerivative(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
137+
}
138+
func BenchmarkDerivative10k_10SomeSeriesHalfNulls(b *testing.B) {
139+
benchmarkDerivative(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
140+
}
141+
func BenchmarkDerivative10k_100SomeSeriesHalfNulls(b *testing.B) {
142+
benchmarkDerivative(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
143+
}
144+
func BenchmarkDerivative10k_1000SomeSeriesHalfNulls(b *testing.B) {
145+
benchmarkDerivative(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
146+
}
147+
func BenchmarkDerivative10k_1AllSeriesHalfNulls(b *testing.B) {
148+
benchmarkDerivative(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
149+
}
150+
func BenchmarkDerivative10k_10AllSeriesHalfNulls(b *testing.B) {
151+
benchmarkDerivative(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
152+
}
153+
func BenchmarkDerivative10k_100AllSeriesHalfNulls(b *testing.B) {
154+
benchmarkDerivative(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
155+
}
156+
func BenchmarkDerivative10k_1000AllSeriesHalfNulls(b *testing.B) {
157+
benchmarkDerivative(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
158+
}
159+
func benchmarkDerivative(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
160+
var input []models.Series
161+
for i := 0; i < numSeries; i++ {
162+
series := models.Series{
163+
QueryPatt: strconv.Itoa(i),
164+
}
165+
if i%2 == 0 {
166+
series.Datapoints = fn0()
167+
} else {
168+
series.Datapoints = fn1()
169+
}
170+
input = append(input, series)
171+
}
172+
b.ResetTimer()
173+
for i := 0; i < b.N; i++ {
174+
f := NewDerivative()
175+
f.(*FuncDerivative).in = NewMock(input)
176+
got, err := f.Exec(make(map[Req][]models.Series))
177+
if err != nil {
178+
b.Fatalf("%s", err)
179+
}
180+
results = got
181+
}
182+
}

expr/func_nonnegativederivative.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package expr
2+
3+
import (
4+
"fmt"
5+
"math"
6+
7+
"github.com/grafana/metrictank/api/models"
8+
schema "gopkg.in/raintank/schema.v1"
9+
)
10+
11+
type FuncNonNegativeDerivative struct {
12+
in GraphiteFunc
13+
maxValue float64
14+
}
15+
16+
func NewNonNegativeDerivative() GraphiteFunc {
17+
return &FuncNonNegativeDerivative{maxValue: math.NaN()}
18+
}
19+
20+
func (s *FuncNonNegativeDerivative) Signature() ([]Arg, []Arg) {
21+
return []Arg{
22+
ArgSeriesList{val: &s.in},
23+
ArgFloat{
24+
key: "maxValue",
25+
opt: true,
26+
val: &s.maxValue}}, []Arg{ArgSeriesList{}}
27+
}
28+
29+
func (s *FuncNonNegativeDerivative) Context(context Context) Context {
30+
return context
31+
}
32+
33+
func (s *FuncNonNegativeDerivative) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
34+
series, err := s.in.Exec(cache)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
outSeries := make([]models.Series, len(series))
40+
for i, serie := range series {
41+
serie.Target = fmt.Sprintf("nonNegativeDerivative(%s)", serie.Target)
42+
serie.QueryPatt = fmt.Sprintf("nonNegativeDerivative(%s)", serie.QueryPatt)
43+
out := pointSlicePool.Get().([]schema.Point)
44+
45+
newTags := make(map[string]string, len(serie.Tags)+1)
46+
for k, v := range serie.Tags {
47+
newTags[k] = v
48+
}
49+
newTags["nonNegativeDerivative"] = "1"
50+
serie.Tags = newTags
51+
52+
prev := math.NaN()
53+
for _, p := range serie.Datapoints {
54+
var delta float64
55+
delta, prev = nonNegativeDelta(p.Val, prev, s.maxValue)
56+
p.Val = delta
57+
out = append(out, p)
58+
}
59+
serie.Datapoints = out
60+
outSeries[i] = serie
61+
}
62+
cache[Req{}] = append(cache[Req{}], outSeries...)
63+
return outSeries, nil
64+
}
65+
66+
func nonNegativeDelta(val, prev, maxValue float64) (float64, float64) {
67+
if val > maxValue {
68+
return math.NaN(), math.NaN()
69+
}
70+
71+
if math.IsNaN(prev) || math.IsNaN(val) {
72+
return math.NaN(), val
73+
}
74+
75+
if val >= prev {
76+
return val - prev, val
77+
}
78+
79+
if !math.IsNaN(maxValue) {
80+
return maxValue + 1 + val - prev, val
81+
}
82+
83+
return math.NaN(), val
84+
}

0 commit comments

Comments
 (0)