diff --git a/pkg/frontend/querymiddleware/astmapper/sharding.go b/pkg/frontend/querymiddleware/astmapper/sharding.go index bff3852a4c4..b6ab1e0d3cb 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding.go @@ -19,7 +19,7 @@ import ( // NewSharding creates a new query sharding mapper. func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) { - shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats, nil) + shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats) if err != nil { return nil, err } @@ -41,28 +41,15 @@ type shardSummer struct { logger log.Logger stats *MapperStats - splitLabel string - clusters []string - canShardAllVectorSelectorsCache map[string]bool } -// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard. If clusters is defined, -// the mapper will be used to rewrite the queries for cross-cluster query federation. Otherwise, it will -// rewrite the queries for query sharding. -func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats, clusters []string) (ASTMapper, error) { +// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard +func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { if squasher == nil { return nil, errors.Errorf("squasher required and not passed") } - var splitLabel string - if len(clusters) > 0 { - shards = len(clusters) - splitLabel = sharding.ClusterLabel - } else { - splitLabel = sharding.ShardLabel - } - return NewASTExprMapper(&shardSummer{ ctx: ctx, @@ -72,9 +59,6 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l logger: logger, stats: stats, - splitLabel: splitLabel, - clusters: clusters, - canShardAllVectorSelectorsCache: make(map[string]bool), }), nil } @@ -114,7 +98,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish case *parser.VectorSelector: if summer.currentShard != nil { - mapped, err := summer.shardVectorSelector(e) + mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e) return mapped, true, err } return e, true, nil @@ -538,14 +522,8 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser. return summer.squash(children...) } -func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) { - var splitLabelValue string - if len(summer.clusters) > 0 { - splitLabelValue = summer.clusters[*summer.currentShard] - } else { - splitLabelValue = sharding.ShardSelector{ShardIndex: uint64(*summer.currentShard), ShardCount: uint64(summer.shards)}.LabelValue() - } - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.splitLabel, splitLabelValue) +func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) { + shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue()) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding_test.go b/pkg/frontend/querymiddleware/astmapper/sharding_test.go index c0305598186..2ae900ff963 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding_test.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding_test.go @@ -577,7 +577,7 @@ func TestShardSummerWithEncoding(t *testing.T) { } { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats, nil) + summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats) require.Nil(t, err) expr, err := parser.ParseExpr(c.input) require.Nil(t, err) @@ -593,41 +593,6 @@ func TestShardSummerWithEncoding(t *testing.T) { } } -func TestShardSummerWithClusters(t *testing.T) { - clusters := []string{ - "cluster-a", - "cluster-b", - "cluster-c", - "cluster-d", - } - clustersSize := len(clusters) - for i, c := range []struct { - input string - expected string - }{ - { - input: `sum(rate(bar1{baz="blip"}[1m]))`, - expected: `sum(__embedded_queries__{__queries__="{\"Concat\":[\"sum(rate(bar1{__cluster__=\\\"cluster-a\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-b\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-c\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-d\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`, - }, - } { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), 0, vectorSquasher, log.NewNopLogger(), stats, clusters) - require.Nil(t, err) - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - - res, err := summer.Map(expr) - require.Nil(t, err) - assert.Equal(t, clustersSize, stats.GetShardedQueries()) - expected, err := parser.ParseExpr(c.expected) - require.Nil(t, err) - - require.Equal(t, expected.String(), res.String()) - }) - } -} - func TestIsSubqueryCall(t *testing.T) { tests := []struct { query string diff --git a/pkg/storage/sharding/label.go b/pkg/storage/sharding/label.go index 3c37cdf929c..b21d8c59670 100644 --- a/pkg/storage/sharding/label.go +++ b/pkg/storage/sharding/label.go @@ -12,11 +12,8 @@ import ( ) const ( - // ShardLabel is a reserved label referencing a shard on read path used for query sharding. + // ShardLabel is a reserved label referencing a shard on read path. ShardLabel = "__query_shard__" - - // ClusterLabel is a reserved label referencing a cluster for cross-cluster query federation. - ClusterLabel = "__cluster__" ) // ShardSelector holds information about the configured query shard.