Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit b3d3831

Browse files
committed
graphite expression parser
mostly just parsing of expressions
1 parent 9626065 commit b3d3831

13 files changed

+1121
-118
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
build/
44
/metrictank
55
/cmd/mt-aggs-explain/mt-aggs-explain
6+
/cmd/mt-explain/mt-explain
67
/cmd/mt-index-cat/mt-index-cat
78
/cmd/mt-index-migrate/mt-index-migrate
89
/cmd/mt-index-migrate-050-to-054/mt-index-migrate-050-to-054

api/graphite.go

+91-113
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package api
22

33
import (
44
"errors"
5+
"math"
56
"net/http"
67
"sort"
78
"strings"
@@ -14,9 +15,11 @@ import (
1415
"github.com/raintank/metrictank/api/response"
1516
"github.com/raintank/metrictank/cluster"
1617
"github.com/raintank/metrictank/consolidation"
18+
"github.com/raintank/metrictank/expr"
1719
"github.com/raintank/metrictank/idx"
1820
"github.com/raintank/metrictank/mdata"
1921
"github.com/raintank/metrictank/stats"
22+
"github.com/raintank/metrictank/util"
2023
"github.com/raintank/worldping-api/pkg/log"
2124
)
2225

@@ -40,34 +43,6 @@ type Series struct {
4043
Node cluster.Node
4144
}
4245

43-
func parseTarget(target string) (string, string, error) {
44-
var consolidateBy string
45-
// yes, i am aware of the arguably grossness of the below.
46-
// however, it is solid based on the documented allowed input format.
47-
// once we need to support several functions, we can implement
48-
// a proper expression parser
49-
if strings.HasPrefix(target, "consolidateBy(") {
50-
var q1, q2 int
51-
t := target
52-
if t[len(t)-2:] == "')" && (strings.Contains(t, ",'") || strings.Contains(t, ", '")) && strings.Count(t, "'") == 2 {
53-
q1 = strings.Index(t, "'")
54-
q2 = strings.LastIndex(t, "'")
55-
} else if t[len(t)-2:] == "\")" && (strings.Contains(t, ",\"") || strings.Contains(t, ", \"")) && strings.Count(t, "\"") == 2 {
56-
q1 = strings.Index(t, "\"")
57-
q2 = strings.LastIndex(t, "\"")
58-
} else {
59-
return "", "", response.NewError(http.StatusBadRequest, "target parse error")
60-
}
61-
consolidateBy = t[q1+1 : q2]
62-
err := consolidation.Validate(consolidateBy)
63-
if err != nil {
64-
return "", "", err
65-
}
66-
target = t[strings.Index(t, "(")+1 : strings.LastIndex(t, ",")]
67-
}
68-
return target, consolidateBy, nil
69-
}
70-
7146
func (s *Server) findSeries(orgId int, patterns []string, seenAfter int64) ([]Series, error) {
7247
peers := cluster.MembersForQuery()
7348
log.Debug("HTTP findSeries for %v across %d instances", patterns, len(peers))
@@ -189,112 +164,37 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
189164
return
190165
}
191166

192-
reqs := make([]models.Req, 0)
193-
194-
patterns := make([]string, 0)
195-
type locatedDef struct {
196-
def idx.Archive
197-
node cluster.Node
198-
}
199-
200-
//locatedDefs[<pattern>][<def.id>]locatedDef
201-
locatedDefs := make(map[string]map[string]locatedDef)
202-
//targetForPattern[<pattern>]<target>
203-
targetForPattern := make(map[string]string)
204-
for _, target := range targets {
205-
pattern, _, err := parseTarget(target)
206-
if err != nil {
207-
ctx.Error(http.StatusBadRequest, err.Error())
208-
return
209-
}
210-
patterns = append(patterns, pattern)
211-
targetForPattern[pattern] = target
212-
locatedDefs[pattern] = make(map[string]locatedDef)
213-
214-
}
215-
216-
series, err := s.findSeries(ctx.OrgId, patterns, int64(fromUnix))
167+
exprs, err := expr.ParseMany(targets)
217168
if err != nil {
218-
response.Write(ctx, response.WrapError(err))
219-
}
220-
221-
for _, s := range series {
222-
for _, metric := range s.Series {
223-
if !metric.Leaf {
224-
continue
225-
}
226-
for _, def := range metric.Defs {
227-
locatedDefs[s.Pattern][def.Id] = locatedDef{def, s.Node}
228-
}
229-
}
230-
}
231-
232-
for pattern, ldefs := range locatedDefs {
233-
for _, locdef := range ldefs {
234-
archive := locdef.def
235-
// set consolidator that will be used to normalize raw data before feeding into processing functions
236-
// not to be confused with runtime consolidation which happens in the graphite api, after all processing.
237-
fn := mdata.Aggregations.Get(archive.AggId).AggregationMethod[0]
238-
consolidator := consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
239-
// target is like foo.bar or foo.* or consolidateBy(foo.*,'sum')
240-
// pattern is like foo.bar or foo.*
241-
// def.Name is like foo.concretebar
242-
// so we want target to contain the concrete graphite name, potentially wrapped with consolidateBy().
243-
target := strings.Replace(targetForPattern[pattern], pattern, archive.Name, -1)
244-
reqs = append(reqs, models.NewReq(archive.Id, target, fromUnix, toUnix, request.MaxDataPoints, uint32(archive.Interval), consolidator, locdef.node, archive.SchemaId, archive.AggId))
245-
}
246-
}
247-
248-
reqRenderSeriesCount.Value(len(reqs))
249-
reqRenderTargetCount.Value(len(request.Targets))
250-
251-
if len(reqs) == 0 {
252-
if request.Format == "msgp" {
253-
var series models.SeriesByTarget
254-
response.Write(ctx, response.NewMsgp(200, series))
255-
} else {
256-
response.Write(ctx, response.NewJson(200, []string{}, ""))
257-
}
169+
ctx.Error(http.StatusBadRequest, err.Error())
258170
return
259171
}
260172

261-
if (toUnix - fromUnix) >= logMinDur {
262-
log.Info("HTTP Render: INCOMING REQ %q from: %q, to: %q target cnt: %d, maxDataPoints: %d",
263-
ctx.Req.Method, from, to, len(request.Targets), request.MaxDataPoints)
264-
}
173+
reqRenderTargetCount.Value(len(targets))
265174

266-
reqs, err = alignRequests(uint32(time.Now().Unix()), reqs)
175+
plan, err := expr.NewPlan(exprs, fromUnix, toUnix, request.MaxDataPoints, nil)
267176
if err != nil {
268-
log.Error(3, "HTTP Render alignReq error: %s", err)
269-
response.Write(ctx, response.WrapError(err))
177+
ctx.Error(http.StatusBadRequest, err.Error())
270178
return
271179
}
272180

273-
if LogLevel < 2 {
274-
for _, req := range reqs {
275-
log.Debug("HTTP Render %s - arch:%d archI:%d outI:%d aggN: %d from %s", req, req.Archive, req.ArchInterval, req.OutInterval, req.AggNum, req.Node.Name)
276-
}
277-
}
278-
279-
out, err := s.getTargets(reqs)
181+
out, err := s.executePlan(ctx.OrgId, plan)
280182
if err != nil {
281-
log.Error(3, "HTTP Render %s", err.Error())
282-
response.Write(ctx, response.WrapError(err))
183+
ctx.Error(http.StatusBadRequest, err.Error())
283184
return
284185
}
186+
sort.Sort(models.SeriesByTarget(out))
285187

286-
merged := mergeSeries(out)
287-
sort.Sort(models.SeriesByTarget(merged))
288188
defer func() {
289189
for _, serie := range out {
290190
pointSlicePool.Put(serie.Datapoints[:0])
291191
}
292192
}()
293193

294194
if request.Format == "msgp" {
295-
response.Write(ctx, response.NewMsgp(200, models.SeriesByTarget(merged)))
195+
response.Write(ctx, response.NewMsgp(200, models.SeriesByTarget(out)))
296196
} else {
297-
response.Write(ctx, response.NewFastJson(200, models.SeriesByTarget(merged)))
197+
response.Write(ctx, response.NewFastJson(200, models.SeriesByTarget(out)))
298198
}
299199
}
300200

@@ -557,3 +457,81 @@ func (s *Server) metricsDeleteRemote(orgId int, query string, peer cluster.Node)
557457

558458
return resp.DeletedDefs, nil
559459
}
460+
461+
// note if you do something like sum(foo.*) and all of those metrics happen to be on another node,
462+
// we will collect all the indidividual series from the peer, and then sum here. that could be optimized
463+
func (s *Server) executePlan(orgId int, plan expr.Plan) ([]models.Series, error) {
464+
465+
type locatedDef struct {
466+
def idx.Archive
467+
node cluster.Node
468+
}
469+
minFrom := uint32(math.MaxUint32)
470+
var maxTo uint32
471+
//locatedDefs[request][<def.id>]locatedDef
472+
locatedDefs := make(map[expr.Req]map[string]locatedDef)
473+
474+
// note that different patterns to query can have different from / to, so they require different index lookups
475+
// e.g. target=movingAvg(foo.*, "1h")&target=foo.*
476+
// note that in this case we fetch foo.* twice. can be optimized later
477+
for _, r := range plan.Reqs {
478+
series, err := s.findSeries(orgId, []string{r.Query}, int64(r.From))
479+
if err != nil {
480+
return nil, err
481+
}
482+
483+
locatedDefs[r] = make(map[string]locatedDef)
484+
485+
minFrom = util.Min(minFrom, r.From)
486+
maxTo = util.Max(maxTo, r.To)
487+
488+
for _, s := range series {
489+
for _, metric := range s.Series {
490+
if !metric.Leaf {
491+
continue
492+
}
493+
for _, def := range metric.Defs {
494+
locatedDefs[r][def.Id] = locatedDef{def, s.Node}
495+
}
496+
}
497+
}
498+
}
499+
500+
var reqs []models.Req
501+
for r, ldefs := range locatedDefs {
502+
for _, locdef := range ldefs {
503+
archive := locdef.def
504+
// set consolidator that will be used to normalize raw data before feeding into processing functions
505+
// not to be confused with runtime consolidation which happens after all processing.
506+
fn := mdata.Aggregations.Get(archive.AggId).AggregationMethod[0]
507+
consolidator := consolidation.Consolidator(fn) // we use the same number assignments so we can cast them
508+
reqs = append(reqs, models.NewReq(
509+
archive.Id, archive.Name, r.From, r.To, plan.MaxDataPoints, uint32(archive.Interval), consolidator, locdef.node, archive.SchemaId, archive.AggId))
510+
}
511+
}
512+
reqRenderSeriesCount.Value(len(reqs))
513+
if len(reqs) == 0 {
514+
return nil, nil
515+
}
516+
// note: if 1 series has a movingAvg that requires a long time range extension, it may push other reqs into another archive. can be optimized later
517+
reqs, err := alignRequests(uint32(time.Now().Unix()), minFrom, maxTo, reqs)
518+
if err != nil {
519+
log.Error(3, "HTTP Render alignReq error: %s", err)
520+
return nil, err
521+
}
522+
523+
if LogLevel < 2 {
524+
for _, req := range reqs {
525+
log.Debug("HTTP Render %s - arch:%d archI:%d outI:%d aggN: %d from %s", req, req.Archive, req.ArchInterval, req.OutInterval, req.AggNum, req.Node.Name)
526+
}
527+
}
528+
529+
out, err := s.getTargets(reqs)
530+
if err != nil {
531+
log.Error(3, "HTTP Render %s", err.Error())
532+
return nil, err
533+
}
534+
535+
merged := mergeSeries(out)
536+
return merged, nil
537+
}

api/query_engine.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ var (
2626
// alignRequests updates the requests with all details for fetching, making sure all metrics are in the same, optimal interval
2727
// note: it is assumed that all requests have the same from & to.
2828
// also takes a "now" value which we compare the TTL against
29-
func alignRequests(now uint32, reqs []models.Req) ([]models.Req, error) {
30-
tsRange := (reqs[0].To - reqs[0].From)
29+
func alignRequests(now, from, to uint32, reqs []models.Req) ([]models.Req, error) {
30+
tsRange := to - from
3131

3232
var listIntervals []uint32
3333
var seenIntervals = make(map[uint32]struct{})

api/query_engine_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func testAlign(reqs []models.Req, retentions [][]conf.Retention, outReqs []model
2828
}
2929

3030
mdata.Schemas = conf.NewSchemas(schemas)
31-
out, err := alignRequests(now, reqs)
31+
out, err := alignRequests(now, reqs[0].From, reqs[0].To, reqs)
3232
if err != outErr {
3333
t.Errorf("different err value expected: %v, got: %v", outErr, err)
3434
}
@@ -430,7 +430,7 @@ func testMaxPointsPerReq(maxPointsSoft, maxPointsHard int, reqs []models.Req, t
430430
}),
431431
}})
432432

433-
out, err := alignRequests(30*day, reqs)
433+
out, err := alignRequests(30*day, reqs[0].From, reqs[0].To, reqs)
434434
maxPointsPerReqSoft = origMaxPointsPerReqSoft
435435
maxPointsPerReqHard = origMaxPointsPerReqHard
436436
return out, err
@@ -523,7 +523,7 @@ func BenchmarkAlignRequests(b *testing.B) {
523523
})
524524

525525
for n := 0; n < b.N; n++ {
526-
res, _ = alignRequests(14*24*3600, reqs)
526+
res, _ = alignRequests(14*24*3600, 0, 3600*24*7, reqs)
527527
}
528528
result = res
529529
}

cmd/mt-explain/main.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/davecgh/go-spew/spew"
7+
"github.com/raintank/metrictank/expr"
8+
)
9+
10+
func main() {
11+
target := "movingAverage(sumSeries(foo.bar), '2min')"
12+
exps, err := expr.ParseMany([]string{target})
13+
if err != nil {
14+
fmt.Println("Error while parsing:", err)
15+
return
16+
}
17+
spew.Dump(exps)
18+
19+
plan, err := expr.NewPlan(exps, 1000, 1200, nil)
20+
if err != nil {
21+
fmt.Println("Error while planning", err)
22+
return
23+
}
24+
spew.Dump(plan)
25+
}

expr/expr.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package expr
2+
3+
type exprType int
4+
5+
const (
6+
etName exprType = iota // e.g. a metric query pattern like foo.bar or foo.*.baz
7+
etFunc
8+
etConst
9+
etString
10+
)
11+
12+
type expr struct {
13+
target string // the name of a etName, or func name for etFunc. not used for etConst
14+
etype exprType
15+
val float64 // for etConst
16+
valStr string // for etString, and to store original input for etConst
17+
args []*expr // positional
18+
namedArgs map[string]*expr
19+
argString string // for etFunc: literal string of how all the args were specified
20+
}

expr/func_consolidateby.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package expr
2+
3+
import "github.com/raintank/metrictank/consolidation"
4+
5+
type FuncConsolidateBy struct {
6+
}
7+
8+
func NewConsolidateBy() Func {
9+
return FuncConsolidateBy{}
10+
}
11+
12+
func (s FuncConsolidateBy) Signature() ([]argType, []argType) {
13+
return []argType{seriesList, str}, []argType{seriesList}
14+
}
15+
16+
func (s FuncConsolidateBy) Init(args []*expr) error {
17+
return consolidation.Validate(args[1].valStr)
18+
}
19+
20+
func (s FuncConsolidateBy) Depends(from, to uint32) (uint32, uint32) {
21+
return from, to
22+
}

0 commit comments

Comments
 (0)