Skip to content

Commit

Permalink
binding shard to chunk refs
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Apr 18, 2024
1 parent e8f58f5 commit 4639cfd
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 74 deletions.
49 changes: 39 additions & 10 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (ng *DownstreamEngine) Query(ctx context.Context, p Params) Query {

// DownstreamSampleExpr is a SampleExpr which signals downstream computation
type DownstreamSampleExpr struct {
shard *Shard
shard *ShardWithChunkRefs
syntax.SampleExpr
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (d DownstreamSampleExpr) Pretty(level int) string {

// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation
type DownstreamLogSelectorExpr struct {
shard *Shard
shard *ShardWithChunkRefs
syntax.LogSelectorExpr
}

Expand Down Expand Up @@ -396,12 +396,19 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
// downstream to a querier
var shards Shards
if e.shard != nil {
shards = append(shards, *e.shard)
shards = append(shards, e.shard.Shard)
params = ParamsWithChunkOverrides{
Params: params,
StoreChunksOverride: &e.shard.chunks,
}
}
acc := NewBufferedAccumulator(1)
results, err := ev.Downstream(ctx, []DownstreamQuery{{
Params: ParamsWithShardsOverride{
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.SampleExpr},
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: e.SampleExpr,
},
ShardsOverride: shards.Encode(),
},
}}, acc)
Expand All @@ -418,7 +425,13 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr},
}
if shard := cur.DownstreamSampleExpr.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{Params: qry.Params, ShardsOverride: Shards{*shard}.Encode()}
qry.Params = ParamsWithShardsOverride{
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries = append(queries, qry)
cur = cur.next
Expand Down Expand Up @@ -457,8 +470,11 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{*shard}.Encode(),
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries = append(queries, qry)
Expand Down Expand Up @@ -498,12 +514,19 @@ func (ev *DownstreamEvaluator) NewIterator(
// downstream to a querier
var shards Shards
if e.shard != nil {
shards = append(shards, *e.shard)
shards = append(shards, e.shard.Shard)
params = ParamsWithChunkOverrides{
Params: params,
StoreChunksOverride: &e.shard.chunks,
}
}
acc := NewStreamAccumulator(params)
results, err := ev.Downstream(ctx, []DownstreamQuery{{
Params: ParamsWithShardsOverride{
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: e.LogSelectorExpr},
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: e.LogSelectorExpr,
},
ShardsOverride: shards.Encode(),
},
}}, acc)
Expand All @@ -520,7 +543,13 @@ func (ev *DownstreamEvaluator) NewIterator(
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr},
}
if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{Params: qry.Params, ShardsOverride: Shards{*shard}.Encode()}
qry.Params = ParamsWithShardsOverride{
Params: ParamsWithChunkOverrides{
Params: qry.Params,
StoreChunksOverride: &shard.chunks,
},
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries = append(queries, qry)
cur = cur.next
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 0,
Of: 3,
}).Ptr(),
}).Bind(nil),
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Expand All @@ -648,7 +648,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 1,
Of: 3,
}).Ptr(),
}).Bind(nil),
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Expand All @@ -664,7 +664,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 1,
Of: 3,
}).Ptr(),
}).Bind(nil),
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Expand Down
9 changes: 9 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func (p ParamsWithShardsOverride) Shards() []string {
return p.ShardsOverride
}

type ParamsWithChunkOverrides struct {
Params
StoreChunksOverride *logproto.ChunkRefGroup
}

func (p ParamsWithChunkOverrides) GetStoreChunks() *logproto.ChunkRefGroup {
return p.StoreChunksOverride
}

// Sortable logql contain sort or sort_desc.
func Sortable(q Params) (bool, error) {
var sortable bool
Expand Down
4 changes: 3 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
Of: uint32(shards),
})
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &s,
shard: &ShardWithChunkRefs{
Shard: s,
},
SampleExpr: expr,
})
}
Expand Down
Loading

0 comments on commit 4639cfd

Please sign in to comment.