Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support IndexMergeReaderExecutor #12305

Merged
merged 8 commits into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cmd/explaintest/r/explain_indexmerge.result
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ create index tb on t (b);
create index tc on t (c);
create index td on t (d);
load stats 's/explain_indexmerge_stats_t.json';
explain select * from t where a < 50 or b < 50;
id count task operator info
TableReader_7 4000000.00 root data:Selection_6
└─Selection_6 4000000.00 cop[tikv] or(lt(test.t.a, 50), lt(test.t.b, 50))
└─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false
explain select * from t where (a < 50 or b < 50) and f > 100;
id count task operator info
TableReader_7 4000000.00 root data:Selection_6
└─Selection_6 4000000.00 cop[tikv] gt(test.t.f, 100), or(lt(test.t.a, 50), lt(test.t.b, 50))
└─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false
explain select * from t where b < 50 or c < 50;
id count task operator info
TableReader_7 4000000.00 root data:Selection_6
└─Selection_6 4000000.00 cop[tikv] or(lt(test.t.b, 50), lt(test.t.c, 50))
└─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false
set session tidb_enable_index_merge = on;
explain select * from t where a < 50 or b < 50;
id count task operator info
Expand Down
3 changes: 3 additions & 0 deletions cmd/explaintest/t/explain_indexmerge.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ create index tc on t (c);
create index td on t (d);
# generate a, b, c, d, e, f from 0 to 5000000 and a = b = c = d = e = f
load stats 's/explain_indexmerge_stats_t.json';
explain select * from t where a < 50 or b < 50;
explain select * from t where (a < 50 or b < 50) and f > 100;
explain select * from t where b < 50 or c < 50;
set session tidb_enable_index_merge = on;
# choose the best plan based on cost
explain select * from t where a < 50 or b < 50;
Expand Down
146 changes: 127 additions & 19 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ import (
)

var (
executorCounterMergeJoinExec = metrics.ExecutorCounter.WithLabelValues("MergeJoinExec")
executorCountHashJoinExec = metrics.ExecutorCounter.WithLabelValues("HashJoinExec")
executorCounterHashAggExec = metrics.ExecutorCounter.WithLabelValues("HashAggExec")
executorStreamAggExec = metrics.ExecutorCounter.WithLabelValues("StreamAggExec")
executorCounterSortExec = metrics.ExecutorCounter.WithLabelValues("SortExec")
executorCounterTopNExec = metrics.ExecutorCounter.WithLabelValues("TopNExec")
executorCounterNestedLoopApplyExec = metrics.ExecutorCounter.WithLabelValues("NestedLoopApplyExec")
executorCounterIndexLookUpJoin = metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin")
executorCounterIndexLookUpExecutor = metrics.ExecutorCounter.WithLabelValues("IndexLookUpExecutor")
executorCounterMergeJoinExec = metrics.ExecutorCounter.WithLabelValues("MergeJoinExec")
executorCountHashJoinExec = metrics.ExecutorCounter.WithLabelValues("HashJoinExec")
executorCounterHashAggExec = metrics.ExecutorCounter.WithLabelValues("HashAggExec")
executorStreamAggExec = metrics.ExecutorCounter.WithLabelValues("StreamAggExec")
executorCounterSortExec = metrics.ExecutorCounter.WithLabelValues("SortExec")
executorCounterTopNExec = metrics.ExecutorCounter.WithLabelValues("TopNExec")
executorCounterNestedLoopApplyExec = metrics.ExecutorCounter.WithLabelValues("NestedLoopApplyExec")
executorCounterIndexLookUpJoin = metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin")
executorCounterIndexLookUpExecutor = metrics.ExecutorCounter.WithLabelValues("IndexLookUpExecutor")
executorCounterIndexMergeReaderExecutor = metrics.ExecutorCounter.WithLabelValues("IndexMergeReaderExecutor")
)

// executorBuilder builds an Executor from a Plan.
Expand Down Expand Up @@ -212,6 +213,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildSQLBindExec(v)
case *plannercore.SplitRegion:
return b.buildSplitRegion(v)
case *plannercore.PhysicalIndexMergeReader:
return b.buildIndexMergeReader(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -2166,23 +2169,38 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
return ret
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) {
tableReq, tableStreaming, err := b.constructDAGReq(plans)
if err != nil {
return nil, false, nil, err
}
for i := 0; i < schemaLen; i++ {
tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i))
}
ts := plans[0].(*plannercore.PhysicalTableScan)
tbl, _ := b.is.TableByID(ts.Table.ID)
return tableReq, tableStreaming, tbl, err
}

func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
indexReq, indexStreaming, err := b.constructDAGReq(plans)
if err != nil {
return nil, false, err
}
indexReq.OutputOffsets = []uint32{uint32(schemaLen)}
return indexReq, indexStreaming, err
}

func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) {
indexReq, indexStreaming, err := b.constructDAGReq(v.IndexPlans)
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), v.IndexPlans)
if err != nil {
return nil, err
}
tableReq, tableStreaming, err := b.constructDAGReq(v.TablePlans)
tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
if err != nil {
return nil, err
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
indexReq.OutputOffsets = []uint32{uint32(len(is.Index.Columns))}
tbl, _ := b.is.TableByID(is.Table.ID)

for i := 0; i < v.Schema().Len(); i++ {
tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i))
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
pt := tbl.(table.PartitionedTable)
Expand Down Expand Up @@ -2258,6 +2276,96 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
return ret
}

func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalIndexMergeReader) (*IndexMergeReaderExecutor, error) {
partialPlanCount := len(v.PartialPlans)
partialReqs := make([]*tipb.DAGRequest, 0, partialPlanCount)
partialStreamings := make([]bool, 0, partialPlanCount)
indexes := make([]*model.IndexInfo, 0, partialPlanCount)
keepOrders := make([]bool, 0, partialPlanCount)
descs := make([]bool, 0, partialPlanCount)
feedbacks := make([]*statistics.QueryFeedback, 0, partialPlanCount)
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
for i := 0; i < partialPlanCount; i++ {
var tempReq *tipb.DAGRequest
var tempStreaming bool
var err error

feedback := statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
feedback.Invalidate()
feedbacks = append(feedbacks, feedback)

if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), v.PartialPlans[i])
keepOrders = append(keepOrders, is.KeepOrder)
descs = append(descs, is.Desc)
indexes = append(indexes, is.Index)
} else {
ts := v.PartialPlans[i][0].(*plannercore.PhysicalTableScan)
tempReq, tempStreaming, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i])
keepOrders = append(keepOrders, ts.KeepOrder)
descs = append(descs, ts.Desc)
indexes = append(indexes, nil)
}
if err != nil {
return nil, err
}
collect := false
tempReq.CollectRangeCounts = &collect
partialReqs = append(partialReqs, tempReq)
partialStreamings = append(partialStreamings, tempStreaming)
}
tableReq, tableStreaming, table, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
if err != nil {
return nil, err
}
startTS, err := b.getStartTS()
if err != nil {
return nil, err
}
e := &IndexMergeReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPBs: partialReqs,
startTS: startTS,
table: table,
indexes: indexes,
keepOrders: keepOrders,
descs: descs,
tableRequest: tableReq,
columns: ts.Columns,
partialStreamings: partialStreamings,
tableStreaming: tableStreaming,
partialPlans: v.PartialPlans,
tblPlans: v.TablePlans,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
feedbacks: feedbacks,
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
return e, nil
}

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) *IndexMergeReaderExecutor {
ret, err := buildNoRangeIndexMergeReader(b, v)
if err != nil {
b.err = err
return nil
}
ret.ranges = make([][]*ranger.Range, 0, len(v.PartialPlans))
sctx := b.ctx.GetSessionVars().StmtCtx
for i := 0; i < len(v.PartialPlans); i++ {
if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
ret.ranges = append(ret.ranges, is.Ranges)
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
} else {
ret.ranges = append(ret.ranges, v.PartialPlans[i][0].(*plannercore.PhysicalTableScan).Ranges)
}
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
executorCounterIndexMergeReaderExecutor.Inc()
return ret
}

// dataReaderBuilder build an executor.
// The executor can be used to read data in the ranges which are constructed by datums.
// Differences from executorBuilder:
Expand Down
Loading