Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit c3ae72d

Browse files
authored
Merge pull request #1745 from bloomberg/unique
Add unique processing function
2 parents 31e90cc + ad1028f commit c3ae72d

File tree

4 files changed

+207
-1
lines changed

4 files changed

+207
-1
lines changed

docs/graphite.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ See also:
169169
| timeSlice | | No |
170170
| timeStack | | No |
171171
| transformNull(seriesList, default=0) seriesList | | Stable |
172-
| unique | | No |
172+
| unique | | Stable |
173173
| useSeriesAbove | | No |
174174
| verticalLine | | No |
175175
| weightedAverage | | No |

expr/func_unique.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package expr
2+
3+
import (
4+
"github.com/grafana/metrictank/api/models"
5+
)
6+
7+
type FuncUnique struct {
8+
in []GraphiteFunc
9+
}
10+
11+
func NewUnique() GraphiteFunc {
12+
return &FuncUnique{}
13+
}
14+
15+
func (s *FuncUnique) Signature() ([]Arg, []Arg) {
16+
return []Arg{
17+
ArgSeriesLists{val: &s.in}}, []Arg{ArgSeriesList{}}
18+
}
19+
20+
func (s *FuncUnique) Context(context Context) Context {
21+
return context
22+
}
23+
24+
func (s *FuncUnique) Exec(dataMap DataMap) ([]models.Series, error) {
25+
series, _, err := consumeFuncs(dataMap, s.in)
26+
if err != nil {
27+
return nil, err
28+
}
29+
seenNames := make(map[string]bool)
30+
var uniqueSeries []models.Series
31+
for _, serie := range series {
32+
if _, ok := seenNames[serie.Target]; !ok {
33+
seenNames[serie.Target] = true
34+
uniqueSeries = append(uniqueSeries, serie)
35+
}
36+
}
37+
return uniqueSeries, nil
38+
}

expr/func_unique_test.go

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package expr
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
7+
"github.com/grafana/metrictank/api/models"
8+
"github.com/grafana/metrictank/schema"
9+
"github.com/grafana/metrictank/test"
10+
)
11+
12+
func getNewUnique(in [][]models.Series) *FuncUnique {
13+
f := NewUnique()
14+
s := f.(*FuncUnique)
15+
for i := range in {
16+
s.in = append(s.in, NewMock(in[i]))
17+
}
18+
return s
19+
}
20+
21+
func TestUnique(t *testing.T) {
22+
f := getNewUnique([][]models.Series{
23+
{
24+
{
25+
Interval: 10,
26+
QueryPatt: "foo.a",
27+
Target: "foo.a",
28+
Datapoints: getCopy(a),
29+
},
30+
{
31+
Interval: 100,
32+
QueryPatt: "foo.b",
33+
Target: "foo.b",
34+
Datapoints: getCopy(b),
35+
},
36+
},
37+
{
38+
{
39+
Interval: 10,
40+
QueryPatt: "bar.b",
41+
Target: "bar.b",
42+
Datapoints: getCopy(b),
43+
},
44+
},
45+
{
46+
{
47+
Interval: 10,
48+
QueryPatt: "foo.a",
49+
Target: "foo.a",
50+
Datapoints: getCopy(a),
51+
},
52+
{
53+
Interval: 10,
54+
QueryPatt: "bar.*",
55+
Target: "bar.b",
56+
Datapoints: getCopy(b),
57+
},
58+
{
59+
Interval: 100,
60+
QueryPatt: "bar.*",
61+
Target: "bar.d",
62+
Datapoints: getCopy(d),
63+
},
64+
{
65+
Interval: 10,
66+
QueryPatt: "foo.a",
67+
Target: "foo.a",
68+
Datapoints: getCopy(a),
69+
},
70+
},
71+
},
72+
)
73+
74+
out := []models.Series{
75+
{
76+
Interval: 10,
77+
QueryPatt: "foo.a",
78+
Target: "foo.a",
79+
Datapoints: getCopy(a),
80+
},
81+
{
82+
Interval: 100,
83+
QueryPatt: "foo.b",
84+
Target: "foo.b",
85+
Datapoints: getCopy(b),
86+
},
87+
{
88+
Interval: 10,
89+
QueryPatt: "bar.b",
90+
Target: "bar.b",
91+
Datapoints: getCopy(b),
92+
},
93+
{
94+
Interval: 100,
95+
QueryPatt: "bar.*",
96+
Target: "bar.d",
97+
Datapoints: getCopy(d),
98+
},
99+
}
100+
101+
got, err := f.Exec(make(map[Req][]models.Series))
102+
if err := equalOutput(out, got, nil, err); err != nil {
103+
t.Fatal(err)
104+
}
105+
}
106+
107+
func BenchmarkUnique10k_1NoNulls(b *testing.B) {
108+
benchmarkUnique(b, 1, test.RandFloats10k, test.RandFloats10k)
109+
}
110+
func BenchmarkUnique10k_10NoNulls(b *testing.B) {
111+
benchmarkUnique(b, 10, test.RandFloats10k, test.RandFloats10k)
112+
}
113+
func BenchmarkUnique10k_100NoNulls(b *testing.B) {
114+
benchmarkUnique(b, 100, test.RandFloats10k, test.RandFloats10k)
115+
}
116+
func BenchmarkUnique10k_1000NoNulls(b *testing.B) {
117+
benchmarkUnique(b, 1000, test.RandFloats10k, test.RandFloats10k)
118+
}
119+
func BenchmarkUnique10k_1SomeSeriesHalfNulls(b *testing.B) {
120+
benchmarkUnique(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
121+
}
122+
func BenchmarkUnique10k_10SomeSeriesHalfNulls(b *testing.B) {
123+
benchmarkUnique(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
124+
}
125+
func BenchmarkUnique10k_100SomeSeriesHalfNulls(b *testing.B) {
126+
benchmarkUnique(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
127+
}
128+
func BenchmarkUnique10k_1000SomeSeriesHalfNulls(b *testing.B) {
129+
benchmarkUnique(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
130+
}
131+
func BenchmarkUnique10k_1AllSeriesHalfNulls(b *testing.B) {
132+
benchmarkUnique(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
133+
}
134+
func BenchmarkUnique10k_10AllSeriesHalfNulls(b *testing.B) {
135+
benchmarkUnique(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
136+
}
137+
func BenchmarkUnique10k_100AllSeriesHalfNulls(b *testing.B) {
138+
benchmarkUnique(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
139+
}
140+
func BenchmarkUnique10k_1000AllSeriesHalfNulls(b *testing.B) {
141+
benchmarkUnique(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
142+
}
143+
144+
func benchmarkUnique(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
145+
var input []models.Series
146+
for i := 0; i < numSeries; i++ {
147+
series := models.Series{
148+
QueryPatt: strconv.Itoa(i),
149+
}
150+
if i%2 == 0 {
151+
series.Datapoints = fn0()
152+
} else {
153+
series.Datapoints = fn1()
154+
}
155+
input = append(input, series)
156+
}
157+
b.ResetTimer()
158+
for i := 0; i < b.N; i++ {
159+
f := NewUnique()
160+
f.(*FuncUnique).in = append(f.(*FuncUnique).in, NewMock(input))
161+
got, err := f.Exec(make(map[Req][]models.Series))
162+
if err != nil {
163+
b.Fatalf("%s", err)
164+
}
165+
results = got
166+
}
167+
}

expr/funcs.go

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func init() {
119119
"sumSeries": {NewAggregateConstructor("sum", crossSeriesSum), true},
120120
"summarize": {NewSummarize, true},
121121
"transformNull": {NewTransformNull, true},
122+
"unique": {NewUnique, true},
122123
}
123124
}
124125

0 commit comments

Comments
 (0)