forked from grafana/metrictank
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathfunc_summarize.go
100 lines (83 loc) · 2.69 KB
/
func_summarize.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package expr
import (
"fmt"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/batch"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/util"
"github.com/raintank/dur"
"gopkg.in/raintank/schema.v1"
)
type FuncSummarize struct {
in GraphiteFunc
intervalString string
fn string
alignToFrom bool
}
func NewSummarize() GraphiteFunc {
return &FuncSummarize{fn: "sum", alignToFrom: false}
}
func (s *FuncSummarize) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in},
ArgString{val: &s.intervalString, validator: []Validator{IsIntervalString}},
ArgString{key: "func", opt: true, val: &s.fn, validator: []Validator{IsConsolFunc}},
ArgBool{key: "alignToFrom", opt: true, val: &s.alignToFrom},
}, []Arg{ArgSeriesList{}}
}
func (s *FuncSummarize) Context(context Context) Context {
context.consol = 0
return context
}
func (s *FuncSummarize) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}
interval, _ := dur.ParseDuration(s.intervalString)
aggFunc := consolidation.GetAggFunc(consolidation.FromConsolidateBy(s.fn))
var alignToFromTarget string
if s.alignToFrom {
alignToFromTarget = ", true"
}
newName := func(oldName string) string {
return fmt.Sprintf("summarize(%s, \"%s\", \"%s\"%s)", oldName, s.intervalString, s.fn, alignToFromTarget)
}
var outputs []models.Series
for _, serie := range series {
var newStart, newEnd uint32 = serie.QueryFrom, serie.QueryTo
if len(serie.Datapoints) > 0 {
newStart = serie.Datapoints[0].Ts
newEnd = serie.Datapoints[len(serie.Datapoints)-1].Ts + serie.Interval
}
if !s.alignToFrom {
newStart = newStart - (newStart % interval)
newEnd = newEnd - (newEnd % interval) + interval
}
out := summarizeValues(serie, aggFunc, interval, newStart, newEnd)
output := models.Series{
Target: newName(serie.Target),
QueryPatt: newName(serie.QueryPatt),
Tags: serie.Tags,
Datapoints: out,
Interval: interval,
}
outputs = append(outputs, output)
cache[Req{}] = append(cache[Req{}], output)
}
return outputs, nil
}
func summarizeValues(serie models.Series, aggFunc batch.AggFunc, interval, start, end uint32) []schema.Point {
out := pointSlicePool.Get().([]schema.Point)
numPoints := int(util.Min(uint32(len(serie.Datapoints)), (start-end)/interval))
for ts, i := start, 0; i < numPoints && ts < end; ts += interval {
s := i
for ; i < numPoints && serie.Datapoints[i].Ts < ts+interval; i++ {
if serie.Datapoints[i].Ts <= ts {
s = i
}
}
out = append(out, schema.Point{Val: aggFunc(serie.Datapoints[s:i]), Ts: ts})
}
return out
}