From 747ef3a2b2e4956dd24eb02b0a275ac5bd41ffae Mon Sep 17 00:00:00 2001 From: Jeanette Tan Date: Sat, 3 Aug 2024 01:02:48 +0800 Subject: [PATCH 1/3] Modify astmapper to handle the query format for cross-cluster federation --- .../querymiddleware/astmapper/sharding.go | 34 ++++++++++++++--- .../astmapper/sharding_test.go | 37 ++++++++++++++++++- pkg/storage/sharding/label.go | 5 ++- 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/sharding.go b/pkg/frontend/querymiddleware/astmapper/sharding.go index b6ab1e0d3cb..bff3852a4c4 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding.go @@ -19,7 +19,7 @@ import ( // NewSharding creates a new query sharding mapper. func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) { - shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats) + shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats, nil) if err != nil { return nil, err } @@ -41,15 +41,28 @@ type shardSummer struct { logger log.Logger stats *MapperStats + splitLabel string + clusters []string + canShardAllVectorSelectorsCache map[string]bool } -// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard -func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { +// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard. If clusters is defined, +// the mapper will be used to rewrite the queries for cross-cluster query federation. Otherwise, it will +// rewrite the queries for query sharding. +func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats, clusters []string) (ASTMapper, error) { if squasher == nil { return nil, errors.Errorf("squasher required and not passed") } + var splitLabel string + if len(clusters) > 0 { + shards = len(clusters) + splitLabel = sharding.ClusterLabel + } else { + splitLabel = sharding.ShardLabel + } + return NewASTExprMapper(&shardSummer{ ctx: ctx, @@ -59,6 +72,9 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l logger: logger, stats: stats, + splitLabel: splitLabel, + clusters: clusters, + canShardAllVectorSelectorsCache: make(map[string]bool), }), nil } @@ -98,7 +114,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish case *parser.VectorSelector: if summer.currentShard != nil { - mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e) + mapped, err := summer.shardVectorSelector(e) return mapped, true, err } return e, true, nil @@ -522,8 +538,14 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser. return summer.squash(children...) } -func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) { - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue()) +func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) { + var splitLabelValue string + if len(summer.clusters) > 0 { + splitLabelValue = summer.clusters[*summer.currentShard] + } else { + splitLabelValue = sharding.ShardSelector{ShardIndex: uint64(*summer.currentShard), ShardCount: uint64(summer.shards)}.LabelValue() + } + shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.splitLabel, splitLabelValue) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding_test.go b/pkg/frontend/querymiddleware/astmapper/sharding_test.go index 2ae900ff963..c0305598186 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding_test.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding_test.go @@ -577,7 +577,7 @@ func TestShardSummerWithEncoding(t *testing.T) { } { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats) + summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats, nil) require.Nil(t, err) expr, err := parser.ParseExpr(c.input) require.Nil(t, err) @@ -593,6 +593,41 @@ func TestShardSummerWithEncoding(t *testing.T) { } } +func TestShardSummerWithClusters(t *testing.T) { + clusters := []string{ + "cluster-a", + "cluster-b", + "cluster-c", + "cluster-d", + } + clustersSize := len(clusters) + for i, c := range []struct { + input string + expected string + }{ + { + input: `sum(rate(bar1{baz="blip"}[1m]))`, + expected: `sum(__embedded_queries__{__queries__="{\"Concat\":[\"sum(rate(bar1{__cluster__=\\\"cluster-a\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-b\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-c\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-d\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`, + }, + } { + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + stats := NewMapperStats() + summer, err := newShardSummer(context.Background(), 0, vectorSquasher, log.NewNopLogger(), stats, clusters) + require.Nil(t, err) + expr, err := parser.ParseExpr(c.input) + require.Nil(t, err) + + res, err := summer.Map(expr) + require.Nil(t, err) + assert.Equal(t, clustersSize, stats.GetShardedQueries()) + expected, err := parser.ParseExpr(c.expected) + require.Nil(t, err) + + require.Equal(t, expected.String(), res.String()) + }) + } +} + func TestIsSubqueryCall(t *testing.T) { tests := []struct { query string diff --git a/pkg/storage/sharding/label.go b/pkg/storage/sharding/label.go index b21d8c59670..3c37cdf929c 100644 --- a/pkg/storage/sharding/label.go +++ b/pkg/storage/sharding/label.go @@ -12,8 +12,11 @@ import ( ) const ( - // ShardLabel is a reserved label referencing a shard on read path. + // ShardLabel is a reserved label referencing a shard on read path used for query sharding. ShardLabel = "__query_shard__" + + // ClusterLabel is a reserved label referencing a cluster for cross-cluster query federation. + ClusterLabel = "__cluster__" ) // ShardSelector holds information about the configured query shard. From 2a077623599a4dbab20672482484112f238fffe0 Mon Sep 17 00:00:00 2001 From: Jeanette Tan Date: Sat, 3 Aug 2024 01:02:48 +0800 Subject: [PATCH 2/3] Revert "Modify astmapper to handle the query format for cross-cluster federation" This reverts commit fc2ffad6a59579305e3b4ff4562fa8096e6cc926. --- .../querymiddleware/astmapper/sharding.go | 34 +++-------------- .../astmapper/sharding_test.go | 37 +------------------ pkg/storage/sharding/label.go | 5 +-- 3 files changed, 8 insertions(+), 68 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/sharding.go b/pkg/frontend/querymiddleware/astmapper/sharding.go index bff3852a4c4..b6ab1e0d3cb 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding.go @@ -19,7 +19,7 @@ import ( // NewSharding creates a new query sharding mapper. func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) { - shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats, nil) + shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats) if err != nil { return nil, err } @@ -41,28 +41,15 @@ type shardSummer struct { logger log.Logger stats *MapperStats - splitLabel string - clusters []string - canShardAllVectorSelectorsCache map[string]bool } -// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard. If clusters is defined, -// the mapper will be used to rewrite the queries for cross-cluster query federation. Otherwise, it will -// rewrite the queries for query sharding. -func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats, clusters []string) (ASTMapper, error) { +// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard +func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { if squasher == nil { return nil, errors.Errorf("squasher required and not passed") } - var splitLabel string - if len(clusters) > 0 { - shards = len(clusters) - splitLabel = sharding.ClusterLabel - } else { - splitLabel = sharding.ShardLabel - } - return NewASTExprMapper(&shardSummer{ ctx: ctx, @@ -72,9 +59,6 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l logger: logger, stats: stats, - splitLabel: splitLabel, - clusters: clusters, - canShardAllVectorSelectorsCache: make(map[string]bool), }), nil } @@ -114,7 +98,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish case *parser.VectorSelector: if summer.currentShard != nil { - mapped, err := summer.shardVectorSelector(e) + mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e) return mapped, true, err } return e, true, nil @@ -538,14 +522,8 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser. return summer.squash(children...) } -func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) { - var splitLabelValue string - if len(summer.clusters) > 0 { - splitLabelValue = summer.clusters[*summer.currentShard] - } else { - splitLabelValue = sharding.ShardSelector{ShardIndex: uint64(*summer.currentShard), ShardCount: uint64(summer.shards)}.LabelValue() - } - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.splitLabel, splitLabelValue) +func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) { + shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue()) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding_test.go b/pkg/frontend/querymiddleware/astmapper/sharding_test.go index c0305598186..2ae900ff963 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding_test.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding_test.go @@ -577,7 +577,7 @@ func TestShardSummerWithEncoding(t *testing.T) { } { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats, nil) + summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats) require.Nil(t, err) expr, err := parser.ParseExpr(c.input) require.Nil(t, err) @@ -593,41 +593,6 @@ func TestShardSummerWithEncoding(t *testing.T) { } } -func TestShardSummerWithClusters(t *testing.T) { - clusters := []string{ - "cluster-a", - "cluster-b", - "cluster-c", - "cluster-d", - } - clustersSize := len(clusters) - for i, c := range []struct { - input string - expected string - }{ - { - input: `sum(rate(bar1{baz="blip"}[1m]))`, - expected: `sum(__embedded_queries__{__queries__="{\"Concat\":[\"sum(rate(bar1{__cluster__=\\\"cluster-a\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-b\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-c\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-d\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`, - }, - } { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), 0, vectorSquasher, log.NewNopLogger(), stats, clusters) - require.Nil(t, err) - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - - res, err := summer.Map(expr) - require.Nil(t, err) - assert.Equal(t, clustersSize, stats.GetShardedQueries()) - expected, err := parser.ParseExpr(c.expected) - require.Nil(t, err) - - require.Equal(t, expected.String(), res.String()) - }) - } -} - func TestIsSubqueryCall(t *testing.T) { tests := []struct { query string diff --git a/pkg/storage/sharding/label.go b/pkg/storage/sharding/label.go index 3c37cdf929c..b21d8c59670 100644 --- a/pkg/storage/sharding/label.go +++ b/pkg/storage/sharding/label.go @@ -12,11 +12,8 @@ import ( ) const ( - // ShardLabel is a reserved label referencing a shard on read path used for query sharding. + // ShardLabel is a reserved label referencing a shard on read path. ShardLabel = "__query_shard__" - - // ClusterLabel is a reserved label referencing a cluster for cross-cluster query federation. - ClusterLabel = "__cluster__" ) // ShardSelector holds information about the configured query shard. From e74546f514631d5af8ea4f41040fe0574459178b Mon Sep 17 00:00:00 2001 From: Jeanette Tan Date: Sat, 3 Aug 2024 01:02:48 +0800 Subject: [PATCH 3/3] Generalise astmapper's shardSummer for more than query sharding --- .../astmapper/astmapper_test.go | 4 +- .../querymiddleware/astmapper/embedded.go | 2 +- .../astmapper/instant_splitting.go | 2 +- .../querymiddleware/astmapper/sharding.go | 64 ++++++++++++++----- .../astmapper/sharding_test.go | 7 +- .../astmapper/subtree_folder.go | 2 +- pkg/frontend/querymiddleware/querysharding.go | 3 +- 7 files changed, 61 insertions(+), 23 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/astmapper_test.go b/pkg/frontend/querymiddleware/astmapper/astmapper_test.go index f6ff1c41c79..06b6b6948e1 100644 --- a/pkg/frontend/querymiddleware/astmapper/astmapper_test.go +++ b/pkg/frontend/querymiddleware/astmapper/astmapper_test.go @@ -138,8 +138,10 @@ func TestSharding_BinaryExpressionsDontTakeExponentialTime(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - mapper, err := NewSharding(ctx, 2, log.NewNopLogger(), NewMapperStats()) + + summer, err := NewQueryShardSummer(ctx, 2, VectorSquasher, log.NewNopLogger(), NewMapperStats()) require.NoError(t, err) + mapper := NewSharding(summer) _, err = mapper.Map(expr) require.NoError(t, err) diff --git a/pkg/frontend/querymiddleware/astmapper/embedded.go b/pkg/frontend/querymiddleware/astmapper/embedded.go index 66770c30b0b..22a51b11272 100644 --- a/pkg/frontend/querymiddleware/astmapper/embedded.go +++ b/pkg/frontend/querymiddleware/astmapper/embedded.go @@ -62,7 +62,7 @@ func (c jsonCodec) Decode(encoded string) (queries []string, err error) { // VectorSquash reduces an AST into a single vector query which can be hijacked by a Queryable impl. // It always uses a VectorSelector as the substitution expr. // This is important because logical/set binops can only be applied against vectors and not matrices. -func vectorSquasher(exprs ...parser.Expr) (parser.Expr, error) { +func VectorSquasher(exprs ...parser.Expr) (parser.Expr, error) { // concat OR legs strs := make([]string, 0, len(exprs)) for _, expr := range exprs { diff --git a/pkg/frontend/querymiddleware/astmapper/instant_splitting.go b/pkg/frontend/querymiddleware/astmapper/instant_splitting.go index 35196a6a1c3..cffd12b8c3b 100644 --- a/pkg/frontend/querymiddleware/astmapper/instant_splitting.go +++ b/pkg/frontend/querymiddleware/astmapper/instant_splitting.go @@ -380,7 +380,7 @@ func (i *instantSplitter) splitAndSquashCall(expr *parser.Call, rangeInterval ti embeddedQueries = append([]parser.Expr{splitExpr}, embeddedQueries...) } - squashExpr, err := vectorSquasher(embeddedQueries...) + squashExpr, err := VectorSquasher(embeddedQueries...) if err != nil { return nil, false, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding.go b/pkg/frontend/querymiddleware/astmapper/sharding.go index b6ab1e0d3cb..a9fc037e40e 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding.go @@ -18,39 +18,71 @@ import ( ) // NewSharding creates a new query sharding mapper. -func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) { - shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats) - if err != nil { - return nil, err - } +func NewSharding(shardSummer ASTMapper) ASTMapper { subtreeFolder := newSubtreeFolder() return NewMultiMapper( shardSummer, subtreeFolder, - ), nil + ) +} + +type Squasher = func(...parser.Expr) (parser.Expr, error) + +type ShardLabeller interface { + GetLabelName() string + GetLabelValue(shard int) string } -type squasher = func(...parser.Expr) (parser.Expr, error) +// queryShardLabeller implements ShardLabeller for query sharding. +type queryShardLabeller struct { + shards int +} + +func newQueryShardLabeller(shards int) ShardLabeller { + return &queryShardLabeller{shards: shards} +} + +func (lbl *queryShardLabeller) GetLabelName() string { + return sharding.ShardLabel +} + +func (lbl *queryShardLabeller) GetLabelValue(shard int) string { + return sharding.ShardSelector{ShardIndex: uint64(shard), ShardCount: uint64(lbl.shards)}.LabelValue() +} + +// NewQueryShardSummer instantiates an ASTMapper which will fan out sum queries by shard. +func NewQueryShardSummer(ctx context.Context, shards int, squasher Squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { + return NewShardSummerWithLabeller(ctx, shards, squasher, logger, stats, newQueryShardLabeller(shards)) +} + +func NewShardSummerWithLabeller(ctx context.Context, shards int, squasher Squasher, logger log.Logger, stats *MapperStats, labeller ShardLabeller) (ASTMapper, error) { + summer, err := newShardSummer(ctx, shards, squasher, logger, stats, labeller) + if err != nil { + return nil, err + } + return NewASTExprMapper(summer), nil +} type shardSummer struct { ctx context.Context shards int currentShard *int - squash squasher + squash Squasher logger log.Logger stats *MapperStats + shardLabeller ShardLabeller + canShardAllVectorSelectorsCache map[string]bool } -// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard -func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { +func newShardSummer(ctx context.Context, shards int, squasher Squasher, logger log.Logger, stats *MapperStats, shardLabeller ShardLabeller) (*shardSummer, error) { if squasher == nil { return nil, errors.Errorf("squasher required and not passed") } - return NewASTExprMapper(&shardSummer{ + return &shardSummer{ ctx: ctx, shards: shards, @@ -59,8 +91,10 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l logger: logger, stats: stats, + shardLabeller: shardLabeller, + canShardAllVectorSelectorsCache: make(map[string]bool), - }), nil + }, nil } // Clone returns a clone of shardSummer with stats and current shard index reset to default. @@ -98,7 +132,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish case *parser.VectorSelector: if summer.currentShard != nil { - mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e) + mapped, err := summer.shardVectorSelector(e) return mapped, true, err } return e, true, nil @@ -522,8 +556,8 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser. return summer.squash(children...) } -func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) { - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue()) +func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) { + shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.shardLabeller.GetLabelName(), summer.shardLabeller.GetLabelValue(*summer.currentShard)) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding_test.go b/pkg/frontend/querymiddleware/astmapper/sharding_test.go index 2ae900ff963..b0ef16ff393 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding_test.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding_test.go @@ -523,8 +523,9 @@ func TestShardSummer(t *testing.T) { t.Run(tt.in, func(t *testing.T) { stats := NewMapperStats() - mapper, err := NewSharding(context.Background(), 3, log.NewNopLogger(), stats) + summer, err := NewQueryShardSummer(context.Background(), 3, VectorSquasher, log.NewNopLogger(), stats) require.NoError(t, err) + mapper := NewSharding(summer) expr, err := parser.ParseExpr(tt.in) require.NoError(t, err) out, err := parser.ParseExpr(tt.out) @@ -556,7 +557,7 @@ func concat(queries ...string) string { exprs = append(exprs, n) } - mapped, err := vectorSquasher(exprs...) + mapped, err := VectorSquasher(exprs...) if err != nil { panic(err) } @@ -577,7 +578,7 @@ func TestShardSummerWithEncoding(t *testing.T) { } { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats) + summer, err := NewQueryShardSummer(context.Background(), c.shards, VectorSquasher, log.NewNopLogger(), stats) require.Nil(t, err) expr, err := parser.ParseExpr(c.input) require.Nil(t, err) diff --git a/pkg/frontend/querymiddleware/astmapper/subtree_folder.go b/pkg/frontend/querymiddleware/astmapper/subtree_folder.go index f6b646b53ee..e0764d72434 100644 --- a/pkg/frontend/querymiddleware/astmapper/subtree_folder.go +++ b/pkg/frontend/querymiddleware/astmapper/subtree_folder.go @@ -39,7 +39,7 @@ func (f *subtreeFolder) MapExpr(expr parser.Expr) (mapped parser.Expr, finished // Change the expr if it contains vector selectors, as only those need to be embedded. if hasVectorSelector { - expr, err := vectorSquasher(expr) + expr, err := VectorSquasher(expr) return expr, true, err } return expr, false, nil diff --git a/pkg/frontend/querymiddleware/querysharding.go b/pkg/frontend/querymiddleware/querysharding.go index 3ac8456dccd..bea9986f30c 100644 --- a/pkg/frontend/querymiddleware/querysharding.go +++ b/pkg/frontend/querymiddleware/querysharding.go @@ -253,10 +253,11 @@ func (s *querySharding) shardQuery(ctx context.Context, query string, totalShard ctx, cancel := context.WithTimeout(ctx, shardingTimeout) defer cancel() - mapper, err := astmapper.NewSharding(ctx, totalShards, s.logger, stats) + summer, err := astmapper.NewQueryShardSummer(ctx, totalShards, astmapper.VectorSquasher, s.logger, stats) if err != nil { return "", nil, err } + mapper := astmapper.NewSharding(summer) // The mapper can modify the input expression in-place, so we must re-parse the original query // each time before passing it to the mapper.