Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Add derivative and nonNegativeDerivative functions #996

Merged
merged 3 commits into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Here are the currently included functions:
| averageSeries(seriesLists) series | avg | Stable |
| consolidateBy(seriesList, func) seriesList | | Stable |
| countSeries(seriesLists) series | | Stable |
| derivative(seriesLists) series | | Stable |
| diffSeries(seriesLists) series | | Stable |
| divideSeries(dividend, divisor) seriesList | | Stable |
| divideSeriesLists(dividends, divisors) seriesList | | Stable |
Expand All @@ -55,6 +56,7 @@ Here are the currently included functions:
| minSeries(seriesList) series | min | Stable |
| multiplySeries(seriesList) series | | Stable |
| movingAverage(seriesLists, windowSize) seriesList | | Unstable |
| nonNegatievDerivative(seriesList, maxValue) seriesList | | Stable |
| perSecond(seriesLists) seriesList | | Stable |
| rangeOfSeries(seriesList) series | | Stable |
| removeAboveValue(seriesList, n) seriesList | | Stable |
Expand Down
63 changes: 63 additions & 0 deletions expr/func_derivative.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package expr

import (
"fmt"
"math"

"github.com/grafana/metrictank/api/models"
schema "gopkg.in/raintank/schema.v1"
)

type FuncDerivative struct {
in GraphiteFunc
}

func NewDerivative() GraphiteFunc {
return &FuncDerivative{}
}

func (s *FuncDerivative) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in}}, []Arg{ArgSeriesList{}}
}

func (s *FuncDerivative) Context(context Context) Context {
return context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, i had originally envisioned the context mechanism to allow things like getting an earlier point by adjusting the from query time, so that we can start off our first point with a valid one instead of NaN. but that infrastructure isn't quite place yet, and graphite doesn't try to be this fancy anyway, so we can leave this as is. and maybe someday revisit.

}

func (s *FuncDerivative) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

outSeries := make([]models.Series, len(series))
for i, serie := range series {
serie.Target = fmt.Sprintf("derivative(%s)", serie.Target)
serie.QueryPatt = fmt.Sprintf("derivative(%s)", serie.QueryPatt)
out := pointSlicePool.Get().([]schema.Point)

newTags := make(map[string]string, len(serie.Tags)+1)
for k, v := range serie.Tags {
newTags[k] = v
}
newTags["derivative"] = "1"
serie.Tags = newTags

prev := math.NaN()
for _, p := range serie.Datapoints {
val := p.Val
if math.IsNaN(prev) || math.IsNaN(val) {
p.Val = math.NaN()
} else {
p.Val -= prev
}
prev = val
out = append(out, p)
}
serie.Datapoints = out
outSeries[i] = serie
}
cache[Req{}] = append(cache[Req{}], outSeries...)
return outSeries, nil
}
182 changes: 182 additions & 0 deletions expr/func_derivative_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package expr

import (
"math"
"strconv"
"testing"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)

func TestDerivativeNoMax(t *testing.T) {
testDerivative(
"derivative",
[]models.Series{
{
Interval: 10,
QueryPatt: "abcd",
Target: "a",
Datapoints: getCopy(a),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "b",
Datapoints: getCopy(b),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "c",
Datapoints: getCopy(c),
},
{
Interval: 10,
QueryPatt: "abcd",
Target: "d",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "derivative(abcd)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: 0, Ts: 20},
{Val: 5.5, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "derivative(abcd)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: math.MaxFloat64, Ts: 20},
{Val: 0, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: math.NaN(), Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "derivative(abcd)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: 0, Ts: 20},
{Val: 1, Ts: 30},
{Val: 1, Ts: 40},
{Val: 1, Ts: 50},
{Val: 1, Ts: 60},
},
},
{
Interval: 10,
QueryPatt: "derivative(abcd)",
Datapoints: []schema.Point{
{Val: math.NaN(), Ts: 10},
{Val: 33, Ts: 20},
{Val: 166, Ts: 30},
{Val: -170, Ts: 40},
{Val: 51, Ts: 50},
{Val: 170, Ts: 60},
},
},
},
t,
)
}

func testDerivative(name string, in []models.Series, out []models.Series, t *testing.T) {
f := NewDerivative()
f.(*FuncDerivative).in = NewMock(in)
gots, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q: err should be nil. got %q", name, err)
}
if len(gots) != len(out) {
t.Fatalf("case %q: isNonNull len output expected %d, got %d", name, len(out), len(gots))
}
for i, g := range gots {
exp := out[i]
if g.QueryPatt != exp.QueryPatt {
t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt)
}
if len(g.Datapoints) != len(exp.Datapoints) {
t.Fatalf("case %q len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints))
}
for j, p := range g.Datapoints {
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
continue
}
t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p)
}
}
}
func BenchmarkDerivative10k_1NoNulls(b *testing.B) {
benchmarkDerivative(b, 1, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkDerivative10k_10NoNulls(b *testing.B) {
benchmarkDerivative(b, 10, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkDerivative10k_100NoNulls(b *testing.B) {
benchmarkDerivative(b, 100, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkDerivative10k_1000NoNulls(b *testing.B) {
benchmarkDerivative(b, 1000, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkDerivative10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkDerivative10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkDerivative(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func benchmarkDerivative(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
var input []models.Series
for i := 0; i < numSeries; i++ {
series := models.Series{
QueryPatt: strconv.Itoa(i),
}
if i%2 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()
}
input = append(input, series)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := NewDerivative()
f.(*FuncDerivative).in = NewMock(input)
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
b.Fatalf("%s", err)
}
results = got
}
}
84 changes: 84 additions & 0 deletions expr/func_nonnegativederivative.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package expr

import (
"fmt"
"math"

"github.com/grafana/metrictank/api/models"
schema "gopkg.in/raintank/schema.v1"
)

type FuncNonNegativeDerivative struct {
in GraphiteFunc
maxValue float64
}

func NewNonNegativeDerivative() GraphiteFunc {
return &FuncNonNegativeDerivative{maxValue: math.NaN()}
}

func (s *FuncNonNegativeDerivative) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in},
ArgFloat{
key: "maxValue",
opt: true,
val: &s.maxValue}}, []Arg{ArgSeriesList{}}
}

func (s *FuncNonNegativeDerivative) Context(context Context) Context {
return context
}

func (s *FuncNonNegativeDerivative) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

outSeries := make([]models.Series, len(series))
for i, serie := range series {
serie.Target = fmt.Sprintf("nonNegativeDerivative(%s)", serie.Target)
serie.QueryPatt = fmt.Sprintf("nonNegativeDerivative(%s)", serie.QueryPatt)
out := pointSlicePool.Get().([]schema.Point)

newTags := make(map[string]string, len(serie.Tags)+1)
for k, v := range serie.Tags {
newTags[k] = v
}
newTags["nonNegativeDerivative"] = "1"
serie.Tags = newTags

prev := math.NaN()
for _, p := range serie.Datapoints {
var delta float64
delta, prev = nonNegativeDelta(p.Val, prev, s.maxValue)
p.Val = delta
out = append(out, p)
}
serie.Datapoints = out
outSeries[i] = serie
}
cache[Req{}] = append(cache[Req{}], outSeries...)
return outSeries, nil
}

func nonNegativeDelta(val, prev, maxValue float64) (float64, float64) {
if val > maxValue {
return math.NaN(), math.NaN()
}

if math.IsNaN(prev) || math.IsNaN(val) {
return math.NaN(), val
}

if val >= prev {
return val - prev, val
}

if !math.IsNaN(maxValue) {
return maxValue + 1 + val - prev, val
}

return math.NaN(), val
}
Loading