Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 906a922

Browse files
committed
Implemented scaleToSeconds
1 parent 3ce976f commit 906a922

File tree

4 files changed

+299
-25
lines changed

4 files changed

+299
-25
lines changed

docs/graphite.md

+26-25
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,29 @@ See also:
2727

2828
Here are the currently included functions:
2929

30-
Function name and signature | Alias | Metrictank
31-
----------------------------------------------------- | ------------ | ----------
32-
alias(seriesList, alias) seriesList | | Stable
33-
aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable
34-
aliasSub(seriesList, pattern, replacement) seriesList | | Stable
35-
averageSeries(seriesLists) series | avg | Stable
36-
consolidateBy(seriesList, func) seriesList | | Stable
37-
diffSeries(seriesLists) series | | Stable
38-
divideSeries(dividend, divisor) seriesList | | Stable
39-
divideSeriesLists(dividends, divisors) seriesList | | Stable
40-
exclude(seriesList, pattern) seriesList | | Stable
41-
grep(seriesList, pattern) seriesList | | Stable
42-
groupByTags(seriesList, func, tagList) seriesList | | Stable
43-
isNonNull(seriesList) seriesList | | Stable
44-
maxSeries(seriesList) series | max | Stable
45-
minSeries(seriesList) series | min | Stable
46-
multiplySeries(seriesList) series | | Stable
47-
movingAverage(seriesLists, windowSize) seriesList | | Unstable
48-
perSecond(seriesLists) seriesList | | Stable
49-
rangeOfSeries(seriesList) series | | Stable
50-
scale(seriesLists, num) series | | Stable
51-
stddevSeries(seriesList) series | | Stable
52-
sumSeries(seriesLists) series | sum | Stable
53-
summarize(seriesList) seriesList | | Stable
54-
transformNull(seriesList, default=0) seriesList | | Stable
30+
| Function name and signature | Alias | Metrictank |
31+
| ----------------------------------------------------- | ----------- | ---------- |
32+
| alias(seriesList, alias) seriesList | | Stable |
33+
| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable |
34+
| aliasSub(seriesList, pattern, replacement) seriesList | | Stable |
35+
| averageSeries(seriesLists) series | avg | Stable |
36+
| consolidateBy(seriesList, func) seriesList | | Stable |
37+
| diffSeries(seriesLists) series | | Stable |
38+
| divideSeries(dividend, divisor) seriesList | | Stable |
39+
| divideSeriesLists(dividends, divisors) seriesList | | Stable |
40+
| exclude(seriesList, pattern) seriesList | | Stable |
41+
| grep(seriesList, pattern) seriesList | | Stable |
42+
| groupByTags(seriesList, func, tagList) seriesList | | Stable |
43+
| isNonNull(seriesList) seriesList | | Stable |
44+
| maxSeries(seriesList) series | max | Stable |
45+
| minSeries(seriesList) series | min | Stable |
46+
| multiplySeries(seriesList) series | | Stable |
47+
| movingAverage(seriesLists, windowSize) seriesList | | Unstable |
48+
| perSecond(seriesLists) seriesList | | Stable |
49+
| rangeOfSeries(seriesList) series | | Stable |
50+
| scale(seriesList, num) series | | Stable |
51+
| scaleToSeconds(seriesList, seconds) series | | Stable |
52+
| stddevSeries(seriesList) series | | Stable |
53+
| sumSeries(seriesLists) series | sum | Stable |
54+
| summarize(seriesList) seriesList | | Stable |
55+
| transformNull(seriesList, default=0) seriesList | | Stable |

expr/func_scaletoseconds.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package expr
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"strconv"
7+
8+
"github.com/grafana/metrictank/api/models"
9+
schema "gopkg.in/raintank/schema.v1"
10+
)
11+
12+
type FuncScaleToSeconds struct {
13+
in GraphiteFunc
14+
seconds float64
15+
}
16+
17+
func NewScaleToSeconds() GraphiteFunc {
18+
return &FuncScaleToSeconds{}
19+
}
20+
21+
func (s *FuncScaleToSeconds) Signature() ([]Arg, []Arg) {
22+
return []Arg{
23+
ArgSeriesList{val: &s.in},
24+
ArgFloat{key: "seconds", val: &s.seconds},
25+
}, []Arg{ArgSeriesList{}}
26+
}
27+
28+
func (s *FuncScaleToSeconds) Context(context Context) Context {
29+
return context
30+
}
31+
32+
func (s *FuncScaleToSeconds) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
33+
series, err := s.in.Exec(cache)
34+
if err != nil {
35+
return nil, err
36+
}
37+
38+
out := make([]models.Series, len(series))
39+
for i, serie := range series {
40+
transformed := &out[i]
41+
transformed.Target = fmt.Sprintf("scaleToSeconds(%s,%d)", serie.Target, int64(s.seconds))
42+
transformed.QueryPatt = transformed.Target
43+
transformed.Tags = make(map[string]string, len(serie.Tags)+1)
44+
transformed.Datapoints = pointSlicePool.Get().([]schema.Point)
45+
transformed.Interval = serie.Interval
46+
transformed.Consolidator = serie.Consolidator
47+
transformed.QueryCons = serie.QueryCons
48+
49+
for k, v := range serie.Tags {
50+
transformed.Tags[k] = v
51+
}
52+
transformed.Tags["scaleToSeconds"] = strconv.FormatFloat(s.seconds, 'g', -1, 64)
53+
54+
factor := float64(s.seconds) / float64(serie.Interval)
55+
for _, p := range serie.Datapoints {
56+
if !math.IsNaN(p.Val) {
57+
p.Val = math.Round(p.Val*factor*1000000) / 1000000 // round to 6 decimal places
58+
}
59+
transformed.Datapoints = append(transformed.Datapoints, p)
60+
}
61+
cache[Req{}] = append(cache[Req{}], *transformed)
62+
}
63+
64+
return out, nil
65+
}

expr/func_scaletoseconds_test.go

+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package expr
2+
3+
import (
4+
"math"
5+
"strconv"
6+
"testing"
7+
8+
"github.com/grafana/metrictank/api/models"
9+
"github.com/grafana/metrictank/test"
10+
"gopkg.in/raintank/schema.v1"
11+
)
12+
13+
func TestScaleToSecondsSingle(t *testing.T) {
14+
testScaleToSeconds(
15+
"identity",
16+
[]models.Series{
17+
{
18+
Interval: 10,
19+
QueryPatt: "a",
20+
Target: "a",
21+
Datapoints: getCopy(a),
22+
},
23+
},
24+
[]models.Series{
25+
{
26+
Interval: 10,
27+
QueryPatt: "scaleToSeconds(a,10)",
28+
Datapoints: getCopy(a),
29+
},
30+
},
31+
t,
32+
10,
33+
)
34+
}
35+
36+
func TestScaleToSecondsSingleAllNonNull(t *testing.T) {
37+
out := []schema.Point{
38+
{Val: 0, Ts: 10},
39+
{Val: 3.0437127721620759e+19, Ts: 20},
40+
{Val: 1.8354510353341003e+20, Ts: 30},
41+
{Val: 2.674777890687885e+19, Ts: 40},
42+
{Val: 7.3786976294838198e+19, Ts: 50},
43+
{Val: 2.3058430092136936e+20, Ts: 60},
44+
}
45+
46+
testScaleToSeconds(
47+
"identity-largeseconds",
48+
[]models.Series{
49+
{
50+
Interval: 10,
51+
QueryPatt: "d",
52+
Target: "d",
53+
Datapoints: getCopy(d),
54+
},
55+
},
56+
[]models.Series{
57+
{
58+
Interval: 10,
59+
QueryPatt: "scaleToSeconds(d,9223372036854774784)",
60+
Datapoints: out,
61+
},
62+
},
63+
t,
64+
9223372036854774784,
65+
)
66+
}
67+
68+
func TestScaleToSecondsMulti(t *testing.T) {
69+
out1 := []schema.Point{
70+
{Val: 0, Ts: 10},
71+
{Val: math.Inf(0), Ts: 20},
72+
{Val: math.Inf(0), Ts: 30},
73+
{Val: math.NaN(), Ts: 40},
74+
{Val: 123456.7890, Ts: 50},
75+
{Val: math.NaN(), Ts: 60},
76+
}
77+
out2 := []schema.Point{
78+
{Val: 0, Ts: 10},
79+
{Val: 0, Ts: 20},
80+
{Val: 0.0001, Ts: 30},
81+
{Val: 0.0002, Ts: 40},
82+
{Val: 0.0003, Ts: 50},
83+
{Val: 0.0004, Ts: 60},
84+
}
85+
testScaleToSeconds(
86+
"multiple-series-subseconds",
87+
[]models.Series{
88+
{
89+
Interval: 10,
90+
QueryPatt: "b.*",
91+
Target: "b.*",
92+
Datapoints: getCopy(b),
93+
},
94+
{
95+
Interval: 10,
96+
QueryPatt: "c.foo{bar,baz}",
97+
Target: "c.foo{bar,baz}",
98+
Datapoints: getCopy(c),
99+
},
100+
},
101+
[]models.Series{
102+
{
103+
QueryPatt: "scaleToSeconds(b.*,0)",
104+
Datapoints: out1,
105+
},
106+
{
107+
QueryPatt: "scaleToSeconds(c.foo{bar,baz},0)",
108+
Datapoints: out2,
109+
},
110+
},
111+
t,
112+
0.001,
113+
)
114+
}
115+
116+
func testScaleToSeconds(name string, in []models.Series, out []models.Series, t *testing.T, seconds float64) {
117+
f := NewScaleToSeconds()
118+
f.(*FuncScaleToSeconds).in = NewMock(in)
119+
f.(*FuncScaleToSeconds).seconds = seconds
120+
gots, err := f.Exec(make(map[Req][]models.Series))
121+
if err != nil {
122+
t.Fatalf("case %q (%f): err should be nil. got %q", name, seconds, err)
123+
}
124+
if len(gots) != len(out) {
125+
t.Fatalf("case %q (%f): isNonNull len output expected %d, got %d", name, seconds, len(out), len(gots))
126+
}
127+
for i, g := range gots {
128+
exp := out[i]
129+
if g.QueryPatt != exp.QueryPatt {
130+
t.Fatalf("case %q (%f): expected target %q, got %q", name, seconds, exp.QueryPatt, g.QueryPatt)
131+
}
132+
if len(g.Datapoints) != len(exp.Datapoints) {
133+
t.Fatalf("case %q (%f) len output expected %d, got %d", name, seconds, len(exp.Datapoints), len(g.Datapoints))
134+
}
135+
for j, p := range g.Datapoints {
136+
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
137+
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
138+
continue
139+
}
140+
t.Fatalf("case %q (%f): output point %d - expected %v got %v", name, seconds, j, exp.Datapoints[j], p)
141+
}
142+
}
143+
}
144+
145+
func BenchmarkScaleToSeconds10k_1NoNulls(b *testing.B) {
146+
benchmarkScaleToSeconds(b, 1, test.RandFloats10k, test.RandFloats10k)
147+
}
148+
func BenchmarkScaleToSeconds10k_10NoNulls(b *testing.B) {
149+
benchmarkScaleToSeconds(b, 10, test.RandFloats10k, test.RandFloats10k)
150+
}
151+
func BenchmarkScaleToSeconds10k_100NoNulls(b *testing.B) {
152+
benchmarkScaleToSeconds(b, 100, test.RandFloats10k, test.RandFloats10k)
153+
}
154+
func BenchmarkScaleToSeconds10k_1000NoNulls(b *testing.B) {
155+
benchmarkScaleToSeconds(b, 1000, test.RandFloats10k, test.RandFloats10k)
156+
}
157+
158+
func BenchmarkScaleToSeconds10k_1SomeSeriesHalfNulls(b *testing.B) {
159+
benchmarkScaleToSeconds(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
160+
}
161+
func BenchmarkScaleToSeconds10k_10SomeSeriesHalfNulls(b *testing.B) {
162+
benchmarkScaleToSeconds(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
163+
}
164+
func BenchmarkScaleToSeconds10k_100SomeSeriesHalfNulls(b *testing.B) {
165+
benchmarkScaleToSeconds(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
166+
}
167+
func BenchmarkScaleToSeconds10k_1000SomeSeriesHalfNulls(b *testing.B) {
168+
benchmarkScaleToSeconds(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
169+
}
170+
171+
func BenchmarkScaleToSeconds10k_1AllSeriesHalfNulls(b *testing.B) {
172+
benchmarkScaleToSeconds(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
173+
}
174+
func BenchmarkScaleToSeconds10k_10AllSeriesHalfNulls(b *testing.B) {
175+
benchmarkScaleToSeconds(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
176+
}
177+
func BenchmarkScaleToSeconds10k_100AllSeriesHalfNulls(b *testing.B) {
178+
benchmarkScaleToSeconds(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
179+
}
180+
func BenchmarkScaleToSeconds10k_1000AllSeriesHalfNulls(b *testing.B) {
181+
benchmarkScaleToSeconds(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
182+
}
183+
184+
func benchmarkScaleToSeconds(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
185+
var input []models.Series
186+
for i := 0; i < numSeries; i++ {
187+
series := models.Series{
188+
QueryPatt: strconv.Itoa(i),
189+
}
190+
if i%2 == 0 {
191+
series.Datapoints = fn0()
192+
} else {
193+
series.Datapoints = fn1()
194+
}
195+
input = append(input, series)
196+
}
197+
b.ResetTimer()
198+
for i := 0; i < b.N; i++ {
199+
f := NewScaleToSeconds()
200+
f.(*FuncScaleToSeconds).in = NewMock(input)
201+
got, err := f.Exec(make(map[Req][]models.Series))
202+
if err != nil {
203+
b.Fatalf("%s", err)
204+
}
205+
results = got
206+
}
207+
}

expr/funcs.go

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func init() {
7171
"perSecond": {NewPerSecond, true},
7272
"rangeOfSeries": {NewAggregateConstructor("rangeOf", crossSeriesRange), true},
7373
"scale": {NewScale, true},
74+
"scaleToSeconds": {NewScaleToSeconds, true},
7475
"smartSummarize": {NewSmartSummarize, false},
7576
"sortByName": {NewSortByName, true},
7677
"stddevSeries": {NewAggregateConstructor("stddev", crossSeriesStddev), true},

0 commit comments

Comments
 (0)