Skip to content

Commit

Permalink
planner, executor: push limit down into IndexLookUpReader executor (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored and sre-bot committed Sep 25, 2019
1 parent 3b8445d commit 89b35b3
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 37 deletions.
13 changes: 6 additions & 7 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 10000.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 10000.00 root left outer join, inner:Limit_21
└─Apply_14 10000.00 root left outer join, inner:Projection_44
├─TableReader_16 10000.00 root data:TableScan_15
│ └─TableScan_15 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
└─Limit_21 1.00 root offset:0, count:1
└─Projection_44 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_43 1.00 root
├─Limit_42 1.00 cop offset:0, count:1
│ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_44 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_43 1.00 root limit embedded(offset:0, count:1)
├─Limit_42 1.00 cop offset:0, count:1
│ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo
└─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down
31 changes: 14 additions & 17 deletions cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,14 @@ MemTableScan_4 10000.00 root
explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1;
id count task operator info
Projection_12 1999.00 root eq(test.t1.c2, test.t2.c2)
└─Apply_14 1999.00 root left outer join, inner:Limit_21
└─Apply_14 1999.00 root left outer join, inner:Projection_44
├─TableReader_16 1999.00 root data:TableScan_15
│ └─TableScan_15 1999.00 cop table:t1, range:[-inf,+inf], keep order:false
└─Limit_21 1.00 root offset:0, count:1
└─Projection_44 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_43 1.00 root
├─Limit_42 1.00 cop offset:0, count:1
│ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo
└─Projection_44 1.00 root test.t2.c1, test.t2.c2
└─IndexLookUp_43 1.00 root limit embedded(offset:0, count:1)
├─Limit_42 1.00 cop offset:0, count:1
│ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true
└─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo
explain select * from t1 order by c1 desc limit 1;
id count task operator info
Limit_10 1.00 root offset:0, count:1
Expand Down Expand Up @@ -160,18 +159,16 @@ id count task operator info
TableDual_5 0.00 root rows:0
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 1;
id count task operator info
Limit_9 1.00 root offset:1, count:1
└─IndexLookUp_15 1.00 root
├─Limit_14 1.00 cop offset:0, count:2
│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_13 1.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_15 1.00 root limit embedded(offset:1, count:1)
├─Limit_14 1.00 cop offset:0, count:2
│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_13 1.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 0;
id count task operator info
Limit_9 0.00 root offset:1, count:0
└─IndexLookUp_15 0.00 root
├─Limit_14 0.00 cop offset:0, count:1
│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_13 0.00 cop table:index_prune, keep order:false, stats:pseudo
IndexLookUp_15 0.00 root limit embedded(offset:1, count:0)
├─Limit_14 0.00 cop offset:0, count:1
│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false
└─TableScan_13 0.00 cop table:index_prune, keep order:false, stats:pseudo
explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 0, 1;
id count task operator info
Point_Get_1 1.00 root table:index_prune, index:a b
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
colLens: is.IdxColLens,
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
e.physicalTableID = physicalTableID
Expand Down
44 changes: 37 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ type IndexLookUpExecutor struct {
corColInAccess bool
idxCols []*expression.Column
colLens []int
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

type checkIndexValue struct {
Expand Down Expand Up @@ -468,6 +470,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
checkIndexValue: e.checkIndexValue,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
PushedLimit: e.PushedLimit,
}
if worker.batchSize > worker.maxBatchSize {
worker.batchSize = worker.maxBatchSize
Expand Down Expand Up @@ -623,6 +626,8 @@ type indexWorker struct {

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue
// PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader.
PushedLimit *plannercore.PushedDownLimit
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
Expand Down Expand Up @@ -652,8 +657,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
} else {
chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize)
}
var count uint64
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result)
handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- errors.Trace(err)
Expand All @@ -662,6 +668,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
return err
}
count += scannedKeys
if len(handles) == 0 {
return nil
}
Expand All @@ -677,20 +684,43 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}

func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (
handles []int64, retChk *chunk.Chunk, err error) {
func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, count uint64) (
handles []int64, retChk *chunk.Chunk, scannedKeys uint64, err error) {
handleOffset := chk.NumCols() - 1
handles = make([]int64, 0, w.batchSize)
// PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance.
checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
requiredRows := w.batchSize - len(handles)
if checkLimit {
if w.PushedLimit.Offset+w.PushedLimit.Count <= scannedKeys+count {
return handles, nil, scannedKeys, nil
}
leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - scannedKeys - count
if uint64(requiredRows) > leftCnt {
requiredRows = int(leftCnt)
}
}
chk.SetRequiredRows(requiredRows, w.maxChunkSize)
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
return handles, nil, scannedKeys, err
}
if chk.NumRows() == 0 {
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}
for i := 0; i < chk.NumRows(); i++ {
scannedKeys++
if checkLimit {
if (count + scannedKeys) <= w.PushedLimit.Offset {
// Skip the preceding Offset handles.
continue
}
if (count + scannedKeys) > (w.PushedLimit.Offset + w.PushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, nil, scannedKeys, nil
}
}
h := chk.GetRow(i).GetInt64(handleOffset)
handles = append(handles, h)
}
Expand All @@ -705,7 +735,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
if w.batchSize > w.maxBatchSize {
w.batchSize = w.maxBatchSize
}
return handles, retChk, nil
return handles, retChk, scannedKeys, nil
}

func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask {
Expand Down
15 changes: 15 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,18 @@ func (s *testSuite) TestIssue10178(c *C) {
tk.MustQuery("select * from t where a > 9223372036854775807").Check(testkit.Rows("18446744073709551615"))
tk.MustQuery("select * from t where a < 9223372036854775808").Check(testkit.Rows("9223372036854775807"))
}

func (s *testSuite) TestPushLimitDownIndexLookUpReader(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl")
tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))")
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
}
3 changes: 3 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (p *PhysicalIndexReader) ExplainInfo() string {
// ExplainInfo implements PhysicalPlan interface.
func (p *PhysicalIndexLookUpReader) ExplainInfo() string {
// The children can be inferred by the relation symbol.
if p.PushedLimit != nil {
return fmt.Sprintf("limit embedded(offset:%v, count:%v)", p.PushedLimit.Offset, p.PushedLimit.Count)
}
return ""
}

Expand Down
8 changes: 4 additions & 4 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test TopN to Limit in double read.
{
sql: "select * from t where t.c = 1 and t.e = 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))",
},
// Test TopN to Limit in index single read.
{
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test Limit push down in double single read.
{
sql: "select c, b from t where c = 1 limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit->Projection",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Projection",
},
// Test Selection + Limit push down in double single read.
{
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
// Test PK in index double read.
{
sql: "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))",
},
// Test index filter condition push down.
{
Expand Down Expand Up @@ -542,7 +542,7 @@ func (s *testPlanSuite) TestDAGPlanTopN(c *C) {
},
{
sql: "select * from t where c = 1 order by c limit 1",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit",
best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))",
},
{
sql: "select * from t order by a limit 1",
Expand Down
9 changes: 9 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type PhysicalIndexReader struct {
OutputColumns []*expression.Column
}

// PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader.
type PushedDownLimit struct {
Offset uint64
Count uint64
}

// PhysicalIndexLookUpReader is the index look up reader in tidb. It's used in case of double reading.
type PhysicalIndexLookUpReader struct {
physicalSchemaProducer
Expand All @@ -80,6 +86,9 @@ type PhysicalIndexLookUpReader struct {
TablePlans []PhysicalPlan
indexPlan PhysicalPlan
tablePlan PhysicalPlan

// PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader.
PushedLimit *PushedDownLimit
}

// PhysicalIndexScan represents an index scan plan.
Expand Down
38 changes: 36 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (t *rootTask) plan() PhysicalPlan {

func (p *PhysicalLimit) attach2Task(tasks ...task) task {
t := tasks[0].copy()
sunk := false
if cop, ok := t.(*copTask); ok {
// If the table/index scans data by order and applies a double read, the limit cannot be pushed to the table side.
if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil {
Expand All @@ -273,9 +274,42 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
}
t = finishCopTask(p.ctx, cop)
sunk = p.sinkIntoIndexLookUp(t)
}
t = attachPlan2Task(p, t)
return t
if sunk {
return t
}
return attachPlan2Task(p, t)
}

func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
root := t.(*rootTask)
reader, isDoubleRead := root.p.(*PhysicalIndexLookUpReader)
proj, isProj := root.p.(*PhysicalProjection)
if !isDoubleRead && !isProj {
return false
}
if isProj {
reader, isDoubleRead = proj.Children()[0].(*PhysicalIndexLookUpReader)
if !isDoubleRead {
return false
}
}
// We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection.
ts, isTableScan := reader.tablePlan.(*PhysicalTableScan)
if !isTableScan {
return false
}
reader.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
ts.stats = p.stats
reader.stats = p.stats
if isProj {
proj.stats = p.stats
}
return true
}

func (p *PhysicalSort) getCost(count float64) float64 {
Expand Down

0 comments on commit 89b35b3

Please sign in to comment.