Skip to content

Commit

Permalink
Bypass sharding middleware when a query can't be sharded. (#2709)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Oct 1, 2020
1 parent c00c7ed commit 3640429
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 29 deletions.
19 changes: 3 additions & 16 deletions pkg/logql/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql"

Expand Down Expand Up @@ -47,25 +46,13 @@ func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *S
}

// Query constructs a Query
func (ng *ShardedEngine) Query(p Params, shards int) Query {
func (ng *ShardedEngine) Query(p Params, mapped Expr) Query {
return &query{
timeout: ng.timeout,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()),
parse: func(ctx context.Context, query string) (Expr, error) {
logger := spanlogger.FromContext(ctx)
mapper, err := NewShardMapper(shards, ng.metrics)
if err != nil {
return nil, err
}
noop, parsed, err := mapper.Parse(query)
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", query)
return nil, err
}

level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())
return parsed, nil
parse: func(_ context.Context, _ string) (Expr, error) {
return mapped, nil
},
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/logql/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,19 @@ func TestMappingEquivalence(t *testing.T) {
nil,
)
qry := regular.Query(params)
shardedQry := sharded.Query(params, shards)
ctx := context.Background()

res, err := qry.Exec(context.Background())
mapper, err := NewShardMapper(shards, nilMetrics)
require.Nil(t, err)
_, mapped, err := mapper.Parse(tc.query)
require.Nil(t, err)

shardedQry := sharded.Query(params, mapped)

res, err := qry.Exec(ctx)
require.Nil(t, err)

shardedRes, err := shardedQry.Exec(context.Background())
shardedRes, err := shardedQry.Exec(ctx)
require.Nil(t, err)

if tc.approximate {
Expand Down
40 changes: 30 additions & 10 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,20 @@ func newASTMapperware(
) *astMapperware {

return &astMapperware{
confs: confs,
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
next: next,
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics),
confs: confs,
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
next: next,
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics),
metrics: metrics,
}
}

type astMapperware struct {
confs queryrange.ShardingConfigs
logger log.Logger
next queryrange.Handler
ng *logql.ShardedEngine
confs queryrange.ShardingConfigs
logger log.Logger
next queryrange.Handler
ng *logql.ShardedEngine
metrics *logql.ShardingMetrics
}

func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
Expand All @@ -92,8 +94,26 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
if !ok {
return nil, fmt.Errorf("expected *LokiRequest, got (%T)", r)
}
params := paramsFromRequest(req)
query := ast.ng.Query(params, int(conf.RowShards))

mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics)
if err != nil {
return nil, err
}

noop, parsed, err := mapper.Parse(r.GetQuery())
if err != nil {
level.Warn(shardedLog).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(shardedLog).Log("no-op", noop, "mapped", parsed.String())

if noop {
// the ast can't be mapped to a sharded equivalent
// so we can bypass the sharding engine.
return ast.next.Do(ctx, r)
}

query := ast.ng.Query(paramsFromRequest(req), parsed)

res, err := query.Exec(ctx)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,29 @@ func Test_astMapper(t *testing.T) {

}

func Test_ShardingByPass(t *testing.T) {
called := 0
handler := queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
called++
return nil, nil
})

mware := newASTMapperware(
queryrange.ShardingConfigs{
chunk.PeriodConfig{
RowShards: 2,
},
},
handler,
log.NewNopLogger(),
nilShardingMetrics,
)

_, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`))
require.Nil(t, err)
require.Equal(t, called, 1)
}

func Test_hasShards(t *testing.T) {
for i, tc := range []struct {
input queryrange.ShardingConfigs
Expand Down

0 comments on commit 3640429

Please sign in to comment.