-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
analyzer.go
156 lines (129 loc) · 4 KB
/
analyzer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package querysharding
import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/prometheus/promql/parser"
)
// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.
type Analyzer interface {
Analyze(string) (QueryAnalysis, error)
}
type QueryAnalyzer struct{}
type CachedQueryAnalyzer struct {
analyzer *QueryAnalyzer
cache *lru.Cache
}
var nonShardableFuncs = []string{
"label_join",
"label_replace",
}
// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) {
cache, err := lru.New(256)
if err != nil {
return nil, err
}
return &CachedQueryAnalyzer{
analyzer: &QueryAnalyzer{},
cache: cache,
}, nil
}
type cachedValue struct {
QueryAnalysis QueryAnalysis
err error
}
func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
if a.cache.Contains(query) {
value, ok := a.cache.Get(query)
if ok {
return value.(cachedValue).QueryAnalysis, value.(cachedValue).err
}
}
// Analyze if needed.
analysis, err := a.analyzer.Analyze(query)
// Adding to cache.
_ = a.cache.Add(query, cachedValue{QueryAnalysis: analysis, err: err})
return analysis, err
}
// Analyze analyzes a query and returns a QueryAnalysis.
// Analyze uses the following algorithm:
// - if a query has subqueries, such as label_join or label_replace,
// or has functions which cannot be sharded, then treat the query as non shardable.
// - if the query's root expression has grouping labels,
// then treat the query as shardable by those labels.
// - if the query's root expression has no grouping labels,
// then walk the query and find the least common labelset
// used in grouping expressions. If non-empty, treat the query
// as shardable by those labels.
// - otherwise, treat the query as non-shardable.
//
// The le label is excluded from sharding.
func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return nonShardableQuery(), err
}
isShardable := true
var analysis QueryAnalysis
parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error {
switch n := node.(type) {
case *parser.SubqueryExpr:
isShardable = false
return fmt.Errorf("expressions with subqueries are not shardable")
case *parser.Call:
if n.Func != nil && contains(n.Func.Name, nonShardableFuncs) {
isShardable = false
return fmt.Errorf("expressions with %s are not shardable", n.Func.Name)
}
case *parser.BinaryExpr:
if n.VectorMatching != nil {
shardingLabels := without(n.VectorMatching.MatchingLabels, []string{"le"})
analysis = analysis.scopeToLabels(shardingLabels, n.VectorMatching.On)
}
case *parser.AggregateExpr:
shardingLabels := make([]string, 0)
if len(n.Grouping) > 0 {
shardingLabels = without(n.Grouping, []string{"le"})
}
analysis = analysis.scopeToLabels(shardingLabels, !n.Without)
}
return nil
})
if !isShardable {
return nonShardableQuery(), nil
}
rootAnalysis := analyzeRootExpression(expr)
if rootAnalysis.IsShardable() && rootAnalysis.shardBy {
return rootAnalysis, nil
}
return analysis, nil
}
func analyzeRootExpression(node parser.Node) QueryAnalysis {
switch n := node.(type) {
case *parser.BinaryExpr:
if n.VectorMatching != nil && n.VectorMatching.On {
shardingLabels := without(n.VectorMatching.MatchingLabels, []string{"le"})
return newShardableByLabels(shardingLabels, n.VectorMatching.On)
} else {
return nonShardableQuery()
}
case *parser.AggregateExpr:
if len(n.Grouping) == 0 {
return nonShardableQuery()
}
shardingLabels := without(n.Grouping, []string{"le"})
return newShardableByLabels(shardingLabels, !n.Without)
}
return nonShardableQuery()
}
func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
return true
}
}
return false
}