Skip to content

Commit

Permalink
Fix querysharding labels analysis (#5880)
Browse files Browse the repository at this point in the history
* fix querysharding labels analysis

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update test case

Signed-off-by: Ben Ye <benye@amazon.com>

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Nov 11, 2022
1 parent cd2cfe8 commit afc3bba
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.
- [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend.
- [#5230](https://github.com/thanos-io/thanos/pull/5230) Rule: Stateless ruler support restoring `for` state from query API servers. The query API servers should be able to access the remote write storage.
- [#5880](https://github.com/thanos-io/thanos/pull/5880) Query Frontend: Fixes some edge cases of query sharding analysis.

### Added

Expand Down
19 changes: 5 additions & 14 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,13 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger)
if err != nil {
return nil, err
}
queryInstantTripperware, err := newInstantQueryTripperware(
queryInstantTripperware := newInstantQueryTripperware(
config.NumShards,
queryRangeLimits,
queryInstantCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg),
config.ForwardHeaders,
)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), queryInstantTripperware(next), reg)
}, nil
Expand Down Expand Up @@ -191,10 +188,7 @@ func newQueryRangeTripperware(
}

if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg),
Expand Down Expand Up @@ -332,14 +326,11 @@ func newInstantQueryTripperware(
codec queryrange.Codec,
reg prometheus.Registerer,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
) queryrange.Tripperware {
instantQueryMiddlewares := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
instantQueryMiddlewares = append(
instantQueryMiddlewares,
queryrange.InstrumentMiddleware("sharding", m),
Expand All @@ -352,7 +343,7 @@ func newInstantQueryTripperware(
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
}, nil
}
}

// shouldCache controls what kind of Thanos request should be cached.
Expand Down
28 changes: 16 additions & 12 deletions pkg/querysharding/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ func nonShardableQuery() QueryAnalysis {
}
}

func newShardableByLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

return QueryAnalysis{
shardBy: by,
shardingLabels: labels,
}
}

func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

Expand All @@ -39,16 +30,29 @@ func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
}
}

if by {
if q.shardBy && by {
return QueryAnalysis{
shardBy: true,
shardingLabels: intersect(q.shardingLabels, labels),
}
}

if !q.shardBy && !by {
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
}
}

// If we are sharding by and without the same time,
// keep the sharding by labels that are not in the without labels set.
labelsBy, labelsWithout := q.shardingLabels, labels
if !q.shardBy {
labelsBy, labelsWithout = labelsWithout, labelsBy
}
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
shardBy: true,
shardingLabels: without(labelsBy, labelsWithout),
}
}

Expand Down
43 changes: 6 additions & 37 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ var nonShardableFuncs = []string{
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) {
cache, err := lru.New(256)
if err != nil {
return nil, err
}

func NewQueryAnalyzer() *CachedQueryAnalyzer {
// Ignore the error check since it throws error
// only if size is <= 0.
cache, _ := lru.New(256)
return &CachedQueryAnalyzer{
analyzer: &QueryAnalyzer{},
cache: cache,
}, nil
}
}

type cachedValue struct {
Expand Down Expand Up @@ -70,10 +68,7 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
// Analyze uses the following algorithm:
// - if a query has subqueries, such as label_join or label_replace,
// or has functions which cannot be sharded, then treat the query as non shardable.
// - if the query's root expression has grouping labels,
// then treat the query as shardable by those labels.
// - if the query's root expression has no grouping labels,
// then walk the query and find the least common labelset
// - Walk the query and find the least common labelset
// used in grouping expressions. If non-empty, treat the query
// as shardable by those labels.
// - otherwise, treat the query as non-shardable.
Expand Down Expand Up @@ -117,35 +112,9 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nonShardableQuery(), nil
}

rootAnalysis := analyzeRootExpression(expr)
if rootAnalysis.IsShardable() && rootAnalysis.shardBy {
return rootAnalysis, nil
}

return analysis, nil
}

func analyzeRootExpression(node parser.Node) QueryAnalysis {
switch n := node.(type) {
case *parser.BinaryExpr:
if n.VectorMatching != nil && n.VectorMatching.On {
shardingLabels := without(n.VectorMatching.MatchingLabels, []string{"le"})
return newShardableByLabels(shardingLabels, n.VectorMatching.On)
} else {
return nonShardableQuery()
}
case *parser.AggregateExpr:
if len(n.Grouping) == 0 {
return nonShardableQuery()
}

shardingLabels := without(n.Grouping, []string{"le"})
return newShardableByLabels(shardingLabels, !n.Without)
}

return nonShardableQuery()
}

func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
Expand Down
16 changes: 9 additions & 7 deletions pkg/querysharding/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ sum by (container) (
{
name: "multiple binary expressions with grouping",
expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`,
shardingLabels: []string{"cluster", "pod"},
shardingLabels: []string{"pod"},
},
{
name: "histogram quantile",
Expand All @@ -137,6 +137,11 @@ sum by (container) (
expression: "increase(sum(http_requests_total) by (pod, cluster) [1h:1m])",
shardingLabels: []string{"cluster", "pod"},
},
{
name: "ignore vector matching with 2 aggregations",
expression: `sum(rate(node_cpu_seconds_total[3h])) by (cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[3h])) by (cluster_id)`,
shardingLabels: []string{"cluster_id"},
},
}

shardableWithoutLabels := []testCase{
Expand Down Expand Up @@ -177,8 +182,7 @@ http_requests_total`,

for _, test := range nonShardable {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.False(t, analysis.IsShardable())
Expand All @@ -187,8 +191,7 @@ http_requests_total`,

for _, test := range shardableByLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand All @@ -202,8 +205,7 @@ http_requests_total`,

for _, test := range shardableWithoutLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand Down

0 comments on commit afc3bba

Please sign in to comment.