-
Notifications
You must be signed in to change notification settings - Fork 107
Add keepLastValue function #995
Changes from all commits
cb03df8
106ee1f
940e62d
1a46459
c969661
3ce3c8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package expr | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
|
||
"github.com/grafana/metrictank/api/models" | ||
schema "gopkg.in/raintank/schema.v1" | ||
) | ||
|
||
type FuncKeepLastValue struct { | ||
in GraphiteFunc | ||
limit int64 | ||
} | ||
|
||
func NewKeepLastValue() GraphiteFunc { | ||
return &FuncKeepLastValue{limit: math.MaxInt64} | ||
} | ||
|
||
func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { | ||
var stub string | ||
return []Arg{ | ||
ArgSeriesList{val: &s.in}, | ||
ArgIn{key: "limit", | ||
opt: true, | ||
args: []Arg{ | ||
ArgInt{val: &s.limit}, | ||
// Treats any string as infinity. This matches Graphite's behavior | ||
// (although intended bevahior is to let user specify "INF" as the limit) | ||
ArgString{val: &stub}, | ||
}, | ||
}, | ||
}, | ||
[]Arg{ArgSeriesList{}} | ||
} | ||
|
||
func (s *FuncKeepLastValue) Context(context Context) Context { | ||
return context | ||
} | ||
|
||
func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series, error) { | ||
series, err := s.in.Exec(cache) | ||
if err != nil { | ||
return nil, err | ||
} | ||
limit := int(s.limit) | ||
outSeries := make([]models.Series, len(series)) | ||
for i, serie := range series { | ||
serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target) | ||
serie.QueryPatt = serie.Target | ||
|
||
out := pointSlicePool.Get().([]schema.Point) | ||
|
||
var consecutiveNaNs int | ||
lastVal := math.NaN() | ||
|
||
for i, p := range serie.Datapoints { | ||
out = append(out, p) | ||
if math.IsNaN(p.Val) { | ||
consecutiveNaNs++ | ||
continue | ||
} | ||
if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) { | ||
for j := i - consecutiveNaNs; j < i; j++ { | ||
out[j].Val = lastVal | ||
} | ||
} | ||
consecutiveNaNs = 0 | ||
lastVal = p.Val | ||
} | ||
|
||
if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) { | ||
for i := len(out) - consecutiveNaNs; i < len(out); i++ { | ||
out[i].Val = lastVal | ||
} | ||
} | ||
|
||
serie.Datapoints = out | ||
outSeries[i] = serie | ||
} | ||
cache[Req{}] = append(cache[Req{}], outSeries...) | ||
return outSeries, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
package expr | ||
|
||
import ( | ||
"math" | ||
"strconv" | ||
"testing" | ||
|
||
"github.com/grafana/metrictank/api/models" | ||
"github.com/grafana/metrictank/test" | ||
"gopkg.in/raintank/schema.v1" | ||
) | ||
|
||
func TestKeepLastValueAll(t *testing.T) { | ||
out := []schema.Point{ | ||
{Val: 0, Ts: 10}, | ||
{Val: 0, Ts: 20}, | ||
{Val: 5.5, Ts: 30}, | ||
{Val: 5.5, Ts: 40}, | ||
{Val: 5.5, Ts: 50}, | ||
{Val: 1234567890, Ts: 60}, | ||
} | ||
|
||
testKeepLastValue( | ||
"keepAll", | ||
math.MaxInt64, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "a", | ||
Datapoints: getCopy(a), | ||
}, | ||
}, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "keepLastValue(a)", | ||
Datapoints: out, | ||
}, | ||
}, | ||
t, | ||
) | ||
} | ||
|
||
func TestKeepLastValueNone(t *testing.T) { | ||
|
||
testKeepLastValue( | ||
"keepNone", | ||
0, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "sum4a2b", | ||
Datapoints: getCopy(sum4a2b), | ||
}, | ||
}, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "keepLastValue(sum4a2b)", | ||
Datapoints: getCopy(sum4a2b), | ||
}, | ||
}, | ||
t, | ||
) | ||
} | ||
|
||
func TestKeepLastValueOne(t *testing.T) { | ||
out := []schema.Point{ | ||
{Val: 0, Ts: 10}, | ||
{Val: math.MaxFloat64, Ts: 20}, | ||
{Val: math.MaxFloat64 - 20, Ts: 30}, | ||
{Val: math.MaxFloat64 - 20, Ts: 40}, | ||
{Val: 1234567890, Ts: 50}, | ||
{Val: 1234567890, Ts: 60}, | ||
} | ||
|
||
testKeepLastValue( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar to feedback in previous PR, using utility functions is good, but if they call t.Fatal internally, it's harder to trace back to the calling function so if a test fails it's hard to tell where it failed. not a huge deal though, if you have more important stuff to do, then don't worry about this. but something to keep in mind for the future at least. i don't want you to refactor this if you have other functions or whatever to work on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I didn't have the utility functions when I wrote this (I just rebased), so didn't do that. Will use them in the future though. |
||
"keepOne", | ||
1, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "b", | ||
Datapoints: getCopy(b), | ||
}, | ||
{ | ||
Interval: 10, | ||
Target: "a", | ||
Datapoints: getCopy(a), | ||
}, | ||
}, | ||
[]models.Series{ | ||
{ | ||
Interval: 10, | ||
Target: "keepLastValue(b)", | ||
Datapoints: out, | ||
}, | ||
{ | ||
Interval: 10, | ||
Target: "keepLastValue(a)", | ||
Datapoints: getCopy(a), | ||
}, | ||
}, | ||
t, | ||
) | ||
} | ||
|
||
func testKeepLastValue(name string, limit int64, in []models.Series, out []models.Series, t *testing.T) { | ||
f := NewKeepLastValue() | ||
f.(*FuncKeepLastValue).in = NewMock(in) | ||
f.(*FuncKeepLastValue).limit = limit | ||
gots, err := f.Exec(make(map[Req][]models.Series)) | ||
if err != nil { | ||
t.Fatalf("case %q (%d): err should be nil. got %q", name, limit, err) | ||
} | ||
if len(gots) != len(out) { | ||
t.Fatalf("case %q (%d): isNonNull len output expected %d, got %d", name, limit, len(out), len(gots)) | ||
} | ||
for i, g := range gots { | ||
exp := out[i] | ||
if g.Target != exp.Target { | ||
t.Fatalf("case %q (%d): expected target %q, got %q", name, limit, exp.Target, g.Target) | ||
} | ||
if len(g.Datapoints) != len(exp.Datapoints) { | ||
t.Fatalf("case %q (%d) len output expected %d, got %d", name, limit, len(exp.Datapoints), len(g.Datapoints)) | ||
} | ||
for j, p := range g.Datapoints { | ||
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) | ||
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { | ||
continue | ||
} | ||
t.Fatalf("case %q (%d): output point %d - expected %v got %v", name, limit, j, exp.Datapoints[j], p) | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkKeepLastValue10k_1NoNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloats10k) | ||
} | ||
func BenchmarkKeepLastValue10k_10NoNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloats10k) | ||
} | ||
func BenchmarkKeepLastValue10k_100NoNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloats10k) | ||
} | ||
func BenchmarkKeepLastValue10k_1000NoNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloats10k) | ||
} | ||
|
||
func BenchmarkKeepLastValue10k_1SomeSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_10SomeSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_100SomeSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_1000SomeSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) | ||
} | ||
|
||
func BenchmarkKeepLastValue10k_1AllSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_10AllSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_100AllSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) | ||
} | ||
func BenchmarkKeepLastValue10k_1000AllSeriesHalfNulls(b *testing.B) { | ||
benchmarkKeepLastValue(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) | ||
} | ||
|
||
func benchmarkKeepLastValue(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { | ||
var input []models.Series | ||
for i := 0; i < numSeries; i++ { | ||
series := models.Series{ | ||
QueryPatt: strconv.Itoa(i), | ||
} | ||
if i%2 == 0 { | ||
series.Datapoints = fn0() | ||
} else { | ||
series.Datapoints = fn1() | ||
} | ||
input = append(input, series) | ||
} | ||
b.ResetTimer() | ||
for i := 0; i < b.N; i++ { | ||
f := NewKeepLastValue() | ||
f.(*FuncKeepLastValue).in = NewMock(input) | ||
got, err := f.Exec(make(map[Req][]models.Series)) | ||
if err != nil { | ||
b.Fatalf("%s", err) | ||
} | ||
results = got | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. i always thought graphite would just replace nones upto the limit, instead of only replacing nones if an entire batch falls under the limit.
but i checked functions.py and it's the same.
TIL ...