From 3640429532303fefc447e993141ab981bf040665 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Oct 2020 18:30:10 +0000 Subject: [PATCH] Bypass sharding middleware when a query can't be sharded. (#2709) Signed-off-by: Cyril Tovena --- pkg/logql/sharding.go | 19 ++-------- pkg/logql/sharding_test.go | 13 +++++-- pkg/querier/queryrange/querysharding.go | 40 +++++++++++++++----- pkg/querier/queryrange/querysharding_test.go | 23 +++++++++++ 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index 6fadb1c682136..01afe7fe1d4d8 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -8,7 +8,6 @@ import ( "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/prometheus/prometheus/promql" @@ -47,25 +46,13 @@ func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *S } // Query constructs a Query -func (ng *ShardedEngine) Query(p Params, shards int) Query { +func (ng *ShardedEngine) Query(p Params, mapped Expr) Query { return &query{ timeout: ng.timeout, params: p, evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()), - parse: func(ctx context.Context, query string) (Expr, error) { - logger := spanlogger.FromContext(ctx) - mapper, err := NewShardMapper(shards, ng.metrics) - if err != nil { - return nil, err - } - noop, parsed, err := mapper.Parse(query) - if err != nil { - level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", query) - return nil, err - } - - level.Debug(logger).Log("no-op", noop, "mapped", parsed.String()) - return parsed, nil + parse: func(_ context.Context, _ string) (Expr, error) { + return mapped, nil }, } } diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index adfd487f5bb2c..49b56ceab7a99 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -71,12 +71,19 @@ func TestMappingEquivalence(t *testing.T) { nil, ) qry := regular.Query(params) - shardedQry := sharded.Query(params, shards) + ctx := context.Background() - res, err := qry.Exec(context.Background()) + mapper, err := NewShardMapper(shards, nilMetrics) + require.Nil(t, err) + _, mapped, err := mapper.Parse(tc.query) + require.Nil(t, err) + + shardedQry := sharded.Query(params, mapped) + + res, err := qry.Exec(ctx) require.Nil(t, err) - shardedRes, err := shardedQry.Exec(context.Background()) + shardedRes, err := shardedQry.Exec(ctx) require.Nil(t, err) if tc.approximate { diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 394fcfc4d3262..cf1c3234ac260 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -63,18 +63,20 @@ func newASTMapperware( ) *astMapperware { return &astMapperware{ - confs: confs, - logger: log.With(logger, "middleware", "QueryShard.astMapperware"), - next: next, - ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics), + confs: confs, + logger: log.With(logger, "middleware", "QueryShard.astMapperware"), + next: next, + ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics), + metrics: metrics, } } type astMapperware struct { - confs queryrange.ShardingConfigs - logger log.Logger - next queryrange.Handler - ng *logql.ShardedEngine + confs queryrange.ShardingConfigs + logger log.Logger + next queryrange.Handler + ng *logql.ShardedEngine + metrics *logql.ShardingMetrics } func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { @@ -92,8 +94,26 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra if !ok { return nil, fmt.Errorf("expected *LokiRequest, got (%T)", r) } - params := paramsFromRequest(req) - query := ast.ng.Query(params, int(conf.RowShards)) + + mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics) + if err != nil { + return nil, err + } + + noop, parsed, err := mapper.Parse(r.GetQuery()) + if err != nil { + level.Warn(shardedLog).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery()) + return nil, err + } + level.Debug(shardedLog).Log("no-op", noop, "mapped", parsed.String()) + + if noop { + // the ast can't be mapped to a sharded equivalent + // so we can bypass the sharding engine. + return ast.next.Do(ctx, r) + } + + query := ast.ng.Query(paramsFromRequest(req), parsed) res, err := query.Exec(ctx) if err != nil { diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 3f80e42e58518..5c6ac12138b23 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -159,6 +159,29 @@ func Test_astMapper(t *testing.T) { } +func Test_ShardingByPass(t *testing.T) { + called := 0 + handler := queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) { + called++ + return nil, nil + }) + + mware := newASTMapperware( + queryrange.ShardingConfigs{ + chunk.PeriodConfig{ + RowShards: 2, + }, + }, + handler, + log.NewNopLogger(), + nilShardingMetrics, + ) + + _, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`)) + require.Nil(t, err) + require.Equal(t, called, 1) +} + func Test_hasShards(t *testing.T) { for i, tc := range []struct { input queryrange.ShardingConfigs