From fc2ffad6a59579305e3b4ff4562fa8096e6cc926 Mon Sep 17 00:00:00 2001 From: Jeanette Tan Date: Sat, 6 Jul 2024 06:33:04 +0800 Subject: [PATCH] Modify astmapper to handle the query format for cross-cluster federation --- .../querymiddleware/astmapper/sharding.go | 34 ++++++++++++++--- .../astmapper/sharding_test.go | 37 ++++++++++++++++++- pkg/storage/sharding/label.go | 5 ++- 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/pkg/frontend/querymiddleware/astmapper/sharding.go b/pkg/frontend/querymiddleware/astmapper/sharding.go index b6ab1e0d3cb..bff3852a4c4 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding.go @@ -19,7 +19,7 @@ import ( // NewSharding creates a new query sharding mapper. func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) { - shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats) + shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats, nil) if err != nil { return nil, err } @@ -41,15 +41,28 @@ type shardSummer struct { logger log.Logger stats *MapperStats + splitLabel string + clusters []string + canShardAllVectorSelectorsCache map[string]bool } -// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard -func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) { +// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard. If clusters is defined, +// the mapper will be used to rewrite the queries for cross-cluster query federation. Otherwise, it will +// rewrite the queries for query sharding. +func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats, clusters []string) (ASTMapper, error) { if squasher == nil { return nil, errors.Errorf("squasher required and not passed") } + var splitLabel string + if len(clusters) > 0 { + shards = len(clusters) + splitLabel = sharding.ClusterLabel + } else { + splitLabel = sharding.ShardLabel + } + return NewASTExprMapper(&shardSummer{ ctx: ctx, @@ -59,6 +72,9 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l logger: logger, stats: stats, + splitLabel: splitLabel, + clusters: clusters, + canShardAllVectorSelectorsCache: make(map[string]bool), }), nil } @@ -98,7 +114,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish case *parser.VectorSelector: if summer.currentShard != nil { - mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e) + mapped, err := summer.shardVectorSelector(e) return mapped, true, err } return e, true, nil @@ -522,8 +538,14 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser. return summer.squash(children...) } -func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) { - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue()) +func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) { + var splitLabelValue string + if len(summer.clusters) > 0 { + splitLabelValue = summer.clusters[*summer.currentShard] + } else { + splitLabelValue = sharding.ShardSelector{ShardIndex: uint64(*summer.currentShard), ShardCount: uint64(summer.shards)}.LabelValue() + } + shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.splitLabel, splitLabelValue) if err != nil { return nil, err } diff --git a/pkg/frontend/querymiddleware/astmapper/sharding_test.go b/pkg/frontend/querymiddleware/astmapper/sharding_test.go index 2ae900ff963..c0305598186 100644 --- a/pkg/frontend/querymiddleware/astmapper/sharding_test.go +++ b/pkg/frontend/querymiddleware/astmapper/sharding_test.go @@ -577,7 +577,7 @@ func TestShardSummerWithEncoding(t *testing.T) { } { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { stats := NewMapperStats() - summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats) + summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats, nil) require.Nil(t, err) expr, err := parser.ParseExpr(c.input) require.Nil(t, err) @@ -593,6 +593,41 @@ func TestShardSummerWithEncoding(t *testing.T) { } } +func TestShardSummerWithClusters(t *testing.T) { + clusters := []string{ + "cluster-a", + "cluster-b", + "cluster-c", + "cluster-d", + } + clustersSize := len(clusters) + for i, c := range []struct { + input string + expected string + }{ + { + input: `sum(rate(bar1{baz="blip"}[1m]))`, + expected: `sum(__embedded_queries__{__queries__="{\"Concat\":[\"sum(rate(bar1{__cluster__=\\\"cluster-a\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-b\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-c\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-d\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`, + }, + } { + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + stats := NewMapperStats() + summer, err := newShardSummer(context.Background(), 0, vectorSquasher, log.NewNopLogger(), stats, clusters) + require.Nil(t, err) + expr, err := parser.ParseExpr(c.input) + require.Nil(t, err) + + res, err := summer.Map(expr) + require.Nil(t, err) + assert.Equal(t, clustersSize, stats.GetShardedQueries()) + expected, err := parser.ParseExpr(c.expected) + require.Nil(t, err) + + require.Equal(t, expected.String(), res.String()) + }) + } +} + func TestIsSubqueryCall(t *testing.T) { tests := []struct { query string diff --git a/pkg/storage/sharding/label.go b/pkg/storage/sharding/label.go index b21d8c59670..3c37cdf929c 100644 --- a/pkg/storage/sharding/label.go +++ b/pkg/storage/sharding/label.go @@ -12,8 +12,11 @@ import ( ) const ( - // ShardLabel is a reserved label referencing a shard on read path. + // ShardLabel is a reserved label referencing a shard on read path used for query sharding. ShardLabel = "__query_shard__" + + // ClusterLabel is a reserved label referencing a cluster for cross-cluster query federation. + ClusterLabel = "__cluster__" ) // ShardSelector holds information about the configured query shard.