diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 0640f00307e0..80764de70892 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -74,7 +74,7 @@ func (ng *DownstreamEngine) Query(ctx context.Context, p Params) Query { // DownstreamSampleExpr is a SampleExpr which signals downstream computation type DownstreamSampleExpr struct { - shard *Shard + shard *ShardWithChunkRefs syntax.SampleExpr } @@ -107,7 +107,7 @@ func (d DownstreamSampleExpr) Pretty(level int) string { // DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation type DownstreamLogSelectorExpr struct { - shard *Shard + shard *ShardWithChunkRefs syntax.LogSelectorExpr } @@ -396,12 +396,19 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( // downstream to a querier var shards Shards if e.shard != nil { - shards = append(shards, *e.shard) + shards = append(shards, e.shard.Shard) + params = ParamsWithChunkOverrides{ + Params: params, + StoreChunksOverride: &e.shard.chunks, + } } acc := NewBufferedAccumulator(1) results, err := ev.Downstream(ctx, []DownstreamQuery{{ Params: ParamsWithShardsOverride{ - Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.SampleExpr}, + Params: ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: e.SampleExpr, + }, ShardsOverride: shards.Encode(), }, }}, acc) @@ -418,7 +425,13 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr}, } if shard := cur.DownstreamSampleExpr.shard; shard != nil { - qry.Params = ParamsWithShardsOverride{Params: qry.Params, ShardsOverride: Shards{*shard}.Encode()} + qry.Params = ParamsWithShardsOverride{ + Params: ParamsWithChunkOverrides{ + Params: qry.Params, + StoreChunksOverride: &shard.chunks, + }, + ShardsOverride: Shards{shard.Shard}.Encode(), + } } queries = append(queries, qry) cur = cur.next @@ -457,8 +470,11 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( } if shard := d.shard; shard != nil { qry.Params = ParamsWithShardsOverride{ - Params: qry.Params, - ShardsOverride: Shards{*shard}.Encode(), + Params: ParamsWithChunkOverrides{ + Params: qry.Params, + StoreChunksOverride: &shard.chunks, + }, + ShardsOverride: Shards{shard.Shard}.Encode(), } } queries = append(queries, qry) @@ -498,12 +514,19 @@ func (ev *DownstreamEvaluator) NewIterator( // downstream to a querier var shards Shards if e.shard != nil { - shards = append(shards, *e.shard) + shards = append(shards, e.shard.Shard) + params = ParamsWithChunkOverrides{ + Params: params, + StoreChunksOverride: &e.shard.chunks, + } } acc := NewStreamAccumulator(params) results, err := ev.Downstream(ctx, []DownstreamQuery{{ Params: ParamsWithShardsOverride{ - Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.LogSelectorExpr}, + Params: ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: e.LogSelectorExpr, + }, ShardsOverride: shards.Encode(), }, }}, acc) @@ -520,7 +543,13 @@ func (ev *DownstreamEvaluator) NewIterator( Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr}, } if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil { - qry.Params = ParamsWithShardsOverride{Params: qry.Params, ShardsOverride: Shards{*shard}.Encode()} + qry.Params = ParamsWithShardsOverride{ + Params: ParamsWithChunkOverrides{ + Params: qry.Params, + StoreChunksOverride: &shard.chunks, + }, + ShardsOverride: Shards{shard.Shard}.Encode(), + } } queries = append(queries, qry) cur = cur.next diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ad5cf73d001b..c33f97ed74a5 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -632,7 +632,7 @@ func TestFormat_ShardedExpr(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 3, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -648,7 +648,7 @@ func TestFormat_ShardedExpr(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 3, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -664,7 +664,7 @@ func TestFormat_ShardedExpr(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 3, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 1c3efe2ee37e..9370c614b38c 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -148,6 +148,15 @@ func (p ParamsWithShardsOverride) Shards() []string { return p.ShardsOverride } +type ParamsWithChunkOverrides struct { + Params + StoreChunksOverride *logproto.ChunkRefGroup +} + +func (p ParamsWithChunkOverrides) GetStoreChunks() *logproto.ChunkRefGroup { + return p.StoreChunksOverride +} + // Sortable logql contain sort or sort_desc. func Sortable(q Params) (bool, error) { var sortable bool diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index df7c62a895bb..d965676c278f 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -457,7 +457,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, Of: uint32(shards), }) downstreams = append(downstreams, DownstreamSampleExpr{ - shard: &s, + shard: &ShardWithChunkRefs{ + Shard: s, + }, SampleExpr: expr, }) } diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index f81f90a13778..784301928583 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -24,7 +24,7 @@ func TestShardedStringer(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, @@ -34,7 +34,7 @@ func TestShardedStringer(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, @@ -75,7 +75,7 @@ func TestMapSampleExpr(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -91,7 +91,7 @@ func TestMapSampleExpr(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -508,7 +508,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, @@ -518,7 +518,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, }, @@ -534,7 +534,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.PipelineExpr{ Left: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, @@ -555,7 +555,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), LogSelectorExpr: &syntax.PipelineExpr{ Left: &syntax.MatchersExpr{ Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, @@ -582,7 +582,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -598,7 +598,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -620,7 +620,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeCount, Left: &syntax.LogRange{ @@ -636,7 +636,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeCount, Left: &syntax.LogRange{ @@ -661,7 +661,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -681,7 +681,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -712,7 +712,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -728,7 +728,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.RangeAggregationExpr{ Operation: syntax.OpRangeTypeRate, Left: &syntax.LogRange{ @@ -754,7 +754,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -774,7 +774,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -806,7 +806,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -826,7 +826,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -853,7 +853,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -873,7 +873,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -913,7 +913,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -935,7 +935,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -975,7 +975,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -995,7 +995,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeSum, @@ -1030,7 +1030,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1050,7 +1050,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1092,7 +1092,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1114,7 +1114,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1144,7 +1144,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1164,7 +1164,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1206,7 +1206,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1228,7 +1228,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1257,7 +1257,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1277,7 +1277,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{}, Operation: syntax.OpTypeCount, @@ -1312,7 +1312,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1337,7 +1337,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1371,7 +1371,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1393,7 +1393,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Grouping: &syntax.Grouping{ Groups: []string{"cluster"}, @@ -1482,7 +1482,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Left: &syntax.RangeAggregationExpr{ Left: &syntax.LogRange{ @@ -1505,7 +1505,7 @@ func TestMapping(t *testing.T) { shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, - }).Ptr(), + }).Bind(nil), SampleExpr: &syntax.VectorAggregationExpr{ Left: &syntax.RangeAggregationExpr{ Left: &syntax.LogRange{ diff --git a/pkg/logql/shards.go b/pkg/logql/shards.go index 75281aa3c95b..d280777c0f60 100644 --- a/pkg/logql/shards.go +++ b/pkg/logql/shards.go @@ -20,6 +20,9 @@ type Shards []Shard type ShardVersion uint8 +// TODO(owen-d): refactor this file. There's too many layers (sharding strategies, sharding resolvers). +// Eventually we should have a single strategy (bounded) and a single resolver (dynamic). +// It's likely this could be refactored anyway -- I was in a rush writing it the first time around. const ( PowerOfTwoVersion ShardVersion = iota BoundedVersion @@ -62,20 +65,24 @@ func ParseShardVersion(s string) (ShardVersion, error) { type ShardResolver interface { Shards(expr syntax.Expr) (int, uint64, error) - ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ([]logproto.Shard, error) + // ShardingRanges returns shards and optionally a set of precomputed chunk refs for each group. If present, + // they will be used in lieu of resolving chunk refs from the index durin evaluation. + // If chunks are present, the number of shards returned must match the number of chunk ref groups. + ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ([]logproto.Shard, []logproto.ChunkRefGroup, error) GetStats(e syntax.Expr) (stats.Stats, error) } type ConstantShards int func (s ConstantShards) Shards(_ syntax.Expr) (int, uint64, error) { return int(s), 0, nil } -func (s ConstantShards) ShardingRanges(_ syntax.Expr, _ uint64) ([]logproto.Shard, error) { - return sharding.LinearShards(int(s), 0), nil +func (s ConstantShards) ShardingRanges(_ syntax.Expr, _ uint64) ([]logproto.Shard, []logproto.ChunkRefGroup, error) { + return sharding.LinearShards(int(s), 0), nil, nil } func (s ConstantShards) GetStats(_ syntax.Expr) (stats.Stats, error) { return stats.Stats{}, nil } type ShardingStrategy interface { - Shards(expr syntax.Expr) (shards Shards, maxBytesPerShard uint64, err error) + // The chunks for each shard are optional and are used to precompute chunk refs for each group + Shards(expr syntax.Expr) (shards []ShardWithChunkRefs, maxBytesPerShard uint64, err error) Resolver() ShardResolver } @@ -84,19 +91,25 @@ type DynamicBoundsStrategy struct { targetBytesPerShard uint64 } -func (s DynamicBoundsStrategy) Shards(expr syntax.Expr) (Shards, uint64, error) { - shards, err := s.resolver.ShardingRanges(expr, s.targetBytesPerShard) +func (s DynamicBoundsStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, uint64, error) { + shards, chunks, err := s.resolver.ShardingRanges(expr, s.targetBytesPerShard) if err != nil { return nil, 0, err } var maxBytes uint64 - res := make(Shards, 0, len(shards)) - for _, shard := range shards { + res := make([]ShardWithChunkRefs, 0, len(shards)) + for i, shard := range shards { + x := ShardWithChunkRefs{ + Shard: NewBoundedShard(shard), + } if shard.Stats != nil { maxBytes = max(maxBytes, shard.Stats.Bytes) } - res = append(res, NewBoundedShard(shard)) + if len(chunks) > 0 { + x.chunks = chunks[i] + } + res = append(res, x) } return res, maxBytes, nil @@ -122,7 +135,8 @@ func (s PowerOfTwoStrategy) Resolver() ShardResolver { return s.resolver } -func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) (Shards, uint64, error) { +// PowerOfTwo strategy does not support precomputed chunk refs +func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, uint64, error) { factor, bytesPerShard, err := s.resolver.Shards(expr) if err != nil { return nil, 0, err @@ -132,13 +146,26 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) (Shards, uint64, error) { return nil, bytesPerShard, nil } - res := make(Shards, 0, factor) + res := make([]ShardWithChunkRefs, 0, factor) for i := 0; i < factor; i++ { - res = append(res, NewPowerOfTwoShard(index.ShardAnnotation{Of: uint32(factor), Shard: uint32(i)})) + res = append( + res, + ShardWithChunkRefs{ + Shard: NewPowerOfTwoShard(index.ShardAnnotation{Of: uint32(factor), Shard: uint32(i)}), + }, + ) } return res, bytesPerShard, nil } +// ShardWithChunkRefs is a convenience type for passing around shards with associated chunk refs. +// The chunk refs are optional as determined by their contents (zero chunks means no precomputed refs) +// and are used to precompute chunk refs for each group +type ShardWithChunkRefs struct { + Shard + chunks logproto.ChunkRefGroup +} + // Shard represents a shard annotation // It holds either a power of two shard (legacy) or a bounded shard type Shard struct { @@ -176,6 +203,16 @@ func (s Shard) Ptr() *Shard { return &s } +func (s Shard) Bind(chunks *logproto.ChunkRefGroup) *ShardWithChunkRefs { + res := &ShardWithChunkRefs{ + Shard: s, + } + if chunks != nil { + res.chunks = *chunks + } + return res +} + func NewBoundedShard(shard logproto.Shard) Shard { return Shard{Bounded: &shard} } diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 3d1485d5a77e..7103a38ab376 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -45,6 +45,7 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request { Plan: &plan.QueryPlan{ AST: params.GetExpression(), }, + StoreChunks: params.GetStoreChunks(), } } return &LokiRequest{ @@ -60,6 +61,7 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request { Plan: &plan.QueryPlan{ AST: params.GetExpression(), }, + StoreChunks: params.GetStoreChunks(), } } diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index e0b66a906c41..49f3608f679a 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -218,7 +218,11 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, uint64, error) { return factor, bytesPerShard, nil } -func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ([]logproto.Shard, error) { +func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ( + []logproto.Shard, + []logproto.ChunkRefGroup, + error, +) { sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.ShardingRanges") defer sp.Finish() log := spanlogger.FromContext(ctx) @@ -231,7 +235,7 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh // of binary ops, but I'm putting in the loop for completion grps, err := syntax.MatcherGroups(expr) if err != nil { - return nil, err + return nil, nil, err } for _, grp := range grps { @@ -265,7 +269,7 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh if resp, ok := httpgrpc.HTTPResponseFromError(err); ok && (resp.Code == http.StatusNotFound) { n, bytesPerShard, err := r.Shards(expr) if err != nil { - return nil, errors.Wrap(err, "falling back to building linear shards from stats") + return nil, nil, errors.Wrap(err, "falling back to building linear shards from stats") } level.Debug(log).Log( "msg", "falling back to building linear shards from stats", @@ -276,13 +280,13 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh return sharding.LinearShards(n, uint64(n)*bytesPerShard), nil } - return nil, errors.Wrapf(err, "failed to get shards for expression, got %T: %+v", err, err) + return nil, nil, errors.Wrapf(err, "failed to get shards for expression, got %T: %+v", err, err) } casted, ok := resp.(*ShardsResponse) if !ok { - return nil, fmt.Errorf("expected *ShardsResponse while querying index, got %T", resp) + return nil, nil, fmt.Errorf("expected *ShardsResponse while querying index, got %T", resp) } // accumulate stats @@ -303,5 +307,5 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh "total_refs", refs, ) - return casted.Response.Shards, err + return casted.Response.Shards, casted.Response.ChunkGroups, err }