Skip to content

Commit

Permalink
Modify astmapper to handle the query format for cross-cluster federation
Browse files Browse the repository at this point in the history
  • Loading branch information
zenador committed Jul 5, 2024
1 parent 40cfafd commit fc2ffad
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
34 changes: 28 additions & 6 deletions pkg/frontend/querymiddleware/astmapper/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// NewSharding creates a new query sharding mapper.
func NewSharding(ctx context.Context, shards int, logger log.Logger, stats *MapperStats) (ASTMapper, error) {
shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats)
shardSummer, err := newShardSummer(ctx, shards, vectorSquasher, logger, stats, nil)
if err != nil {
return nil, err
}
Expand All @@ -41,15 +41,28 @@ type shardSummer struct {
logger log.Logger
stats *MapperStats

splitLabel string
clusters []string

canShardAllVectorSelectorsCache map[string]bool
}

// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard
func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats) (ASTMapper, error) {
// newShardSummer instantiates an ASTMapper which will fan out sum queries by shard. If clusters is defined,
// the mapper will be used to rewrite the queries for cross-cluster query federation. Otherwise, it will
// rewrite the queries for query sharding.
func newShardSummer(ctx context.Context, shards int, squasher squasher, logger log.Logger, stats *MapperStats, clusters []string) (ASTMapper, error) {
if squasher == nil {
return nil, errors.Errorf("squasher required and not passed")
}

var splitLabel string
if len(clusters) > 0 {
shards = len(clusters)
splitLabel = sharding.ClusterLabel
} else {
splitLabel = sharding.ShardLabel
}

return NewASTExprMapper(&shardSummer{
ctx: ctx,

Expand All @@ -59,6 +72,9 @@ func newShardSummer(ctx context.Context, shards int, squasher squasher, logger l
logger: logger,
stats: stats,

splitLabel: splitLabel,
clusters: clusters,

canShardAllVectorSelectorsCache: make(map[string]bool),
}), nil
}
Expand Down Expand Up @@ -98,7 +114,7 @@ func (summer *shardSummer) MapExpr(expr parser.Expr) (mapped parser.Expr, finish

case *parser.VectorSelector:
if summer.currentShard != nil {
mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, e)
mapped, err := summer.shardVectorSelector(e)
return mapped, true, err
}
return e, true, nil
Expand Down Expand Up @@ -522,8 +538,14 @@ func (summer *shardSummer) shardAndSquashBinOp(expr *parser.BinaryExpr) (parser.
return summer.squash(children...)
}

func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Expr, error) {
shardMatcher, err := labels.NewMatcher(labels.MatchEqual, sharding.ShardLabel, sharding.ShardSelector{ShardIndex: uint64(curshard), ShardCount: uint64(shards)}.LabelValue())
func (summer *shardSummer) shardVectorSelector(selector *parser.VectorSelector) (parser.Expr, error) {
var splitLabelValue string
if len(summer.clusters) > 0 {
splitLabelValue = summer.clusters[*summer.currentShard]
} else {
splitLabelValue = sharding.ShardSelector{ShardIndex: uint64(*summer.currentShard), ShardCount: uint64(summer.shards)}.LabelValue()
}
shardMatcher, err := labels.NewMatcher(labels.MatchEqual, summer.splitLabel, splitLabelValue)
if err != nil {
return nil, err
}
Expand Down
37 changes: 36 additions & 1 deletion pkg/frontend/querymiddleware/astmapper/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestShardSummerWithEncoding(t *testing.T) {
} {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
stats := NewMapperStats()
summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats)
summer, err := newShardSummer(context.Background(), c.shards, vectorSquasher, log.NewNopLogger(), stats, nil)
require.Nil(t, err)
expr, err := parser.ParseExpr(c.input)
require.Nil(t, err)
Expand All @@ -593,6 +593,41 @@ func TestShardSummerWithEncoding(t *testing.T) {
}
}

func TestShardSummerWithClusters(t *testing.T) {
clusters := []string{
"cluster-a",
"cluster-b",
"cluster-c",
"cluster-d",
}
clustersSize := len(clusters)
for i, c := range []struct {
input string
expected string
}{
{
input: `sum(rate(bar1{baz="blip"}[1m]))`,
expected: `sum(__embedded_queries__{__queries__="{\"Concat\":[\"sum(rate(bar1{__cluster__=\\\"cluster-a\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-b\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-c\\\",baz=\\\"blip\\\"}[1m]))\",\"sum(rate(bar1{__cluster__=\\\"cluster-d\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`,
},
} {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
stats := NewMapperStats()
summer, err := newShardSummer(context.Background(), 0, vectorSquasher, log.NewNopLogger(), stats, clusters)
require.Nil(t, err)
expr, err := parser.ParseExpr(c.input)
require.Nil(t, err)

res, err := summer.Map(expr)
require.Nil(t, err)
assert.Equal(t, clustersSize, stats.GetShardedQueries())
expected, err := parser.ParseExpr(c.expected)
require.Nil(t, err)

require.Equal(t, expected.String(), res.String())
})
}
}

func TestIsSubqueryCall(t *testing.T) {
tests := []struct {
query string
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/sharding/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

const (
// ShardLabel is a reserved label referencing a shard on read path.
// ShardLabel is a reserved label referencing a shard on read path used for query sharding.
ShardLabel = "__query_shard__"

// ClusterLabel is a reserved label referencing a cluster for cross-cluster query federation.
ClusterLabel = "__cluster__"
)

// ShardSelector holds information about the configured query shard.
Expand Down

0 comments on commit fc2ffad

Please sign in to comment.