diff --git a/cmd/explaintest/r/explain_indexmerge.result b/cmd/explaintest/r/explain_indexmerge.result index c70ada6513807..c0f7cd8471f8d 100644 --- a/cmd/explaintest/r/explain_indexmerge.result +++ b/cmd/explaintest/r/explain_indexmerge.result @@ -4,6 +4,21 @@ create index tb on t (b); create index tc on t (c); create index td on t (d); load stats 's/explain_indexmerge_stats_t.json'; +explain select * from t where a < 50 or b < 50; +id count task operator info +TableReader_7 4000000.00 root data:Selection_6 +└─Selection_6 4000000.00 cop[tikv] or(lt(test.t.a, 50), lt(test.t.b, 50)) + └─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false +explain select * from t where (a < 50 or b < 50) and f > 100; +id count task operator info +TableReader_7 4000000.00 root data:Selection_6 +└─Selection_6 4000000.00 cop[tikv] gt(test.t.f, 100), or(lt(test.t.a, 50), lt(test.t.b, 50)) + └─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false +explain select * from t where b < 50 or c < 50; +id count task operator info +TableReader_7 4000000.00 root data:Selection_6 +└─Selection_6 4000000.00 cop[tikv] or(lt(test.t.b, 50), lt(test.t.c, 50)) + └─TableScan_5 5000000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false set session tidb_enable_index_merge = on; explain select * from t where a < 50 or b < 50; id count task operator info diff --git a/cmd/explaintest/t/explain_indexmerge.test b/cmd/explaintest/t/explain_indexmerge.test index 7989c22426ddb..8a7089936f0fe 100644 --- a/cmd/explaintest/t/explain_indexmerge.test +++ b/cmd/explaintest/t/explain_indexmerge.test @@ -5,6 +5,9 @@ create index tc on t (c); create index td on t (d); # generate a, b, c, d, e, f from 0 to 5000000 and a = b = c = d = e = f load stats 's/explain_indexmerge_stats_t.json'; +explain select * from t where a < 50 or b < 50; +explain select * from t where (a < 50 or b < 50) and f > 100; +explain select * from t where b < 50 or c < 50; set session tidb_enable_index_merge = on; # choose the best plan based on cost explain select * from t where a < 50 or b < 50; diff --git a/executor/builder.go b/executor/builder.go index 0d05708045e93..e7a026afe5137 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -54,15 +54,16 @@ import ( ) var ( - executorCounterMergeJoinExec = metrics.ExecutorCounter.WithLabelValues("MergeJoinExec") - executorCountHashJoinExec = metrics.ExecutorCounter.WithLabelValues("HashJoinExec") - executorCounterHashAggExec = metrics.ExecutorCounter.WithLabelValues("HashAggExec") - executorStreamAggExec = metrics.ExecutorCounter.WithLabelValues("StreamAggExec") - executorCounterSortExec = metrics.ExecutorCounter.WithLabelValues("SortExec") - executorCounterTopNExec = metrics.ExecutorCounter.WithLabelValues("TopNExec") - executorCounterNestedLoopApplyExec = metrics.ExecutorCounter.WithLabelValues("NestedLoopApplyExec") - executorCounterIndexLookUpJoin = metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin") - executorCounterIndexLookUpExecutor = metrics.ExecutorCounter.WithLabelValues("IndexLookUpExecutor") + executorCounterMergeJoinExec = metrics.ExecutorCounter.WithLabelValues("MergeJoinExec") + executorCountHashJoinExec = metrics.ExecutorCounter.WithLabelValues("HashJoinExec") + executorCounterHashAggExec = metrics.ExecutorCounter.WithLabelValues("HashAggExec") + executorStreamAggExec = metrics.ExecutorCounter.WithLabelValues("StreamAggExec") + executorCounterSortExec = metrics.ExecutorCounter.WithLabelValues("SortExec") + executorCounterTopNExec = metrics.ExecutorCounter.WithLabelValues("TopNExec") + executorCounterNestedLoopApplyExec = metrics.ExecutorCounter.WithLabelValues("NestedLoopApplyExec") + executorCounterIndexLookUpJoin = metrics.ExecutorCounter.WithLabelValues("IndexLookUpJoin") + executorCounterIndexLookUpExecutor = metrics.ExecutorCounter.WithLabelValues("IndexLookUpExecutor") + executorCounterIndexMergeReaderExecutor = metrics.ExecutorCounter.WithLabelValues("IndexMergeReaderExecutor") ) // executorBuilder builds an Executor from a Plan. @@ -212,6 +213,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildSQLBindExec(v) case *plannercore.SplitRegion: return b.buildSplitRegion(v) + case *plannercore.PhysicalIndexMergeReader: + return b.buildIndexMergeReader(v) default: if mp, ok := p.(MockPhysicalPlan); ok { return mp.GetExecutor() @@ -2166,23 +2169,38 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) * return ret } +func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) { + tableReq, tableStreaming, err := b.constructDAGReq(plans) + if err != nil { + return nil, false, nil, err + } + for i := 0; i < schemaLen; i++ { + tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i)) + } + ts := plans[0].(*plannercore.PhysicalTableScan) + tbl, _ := b.is.TableByID(ts.Table.ID) + return tableReq, tableStreaming, tbl, err +} + +func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { + indexReq, indexStreaming, err := b.constructDAGReq(plans) + if err != nil { + return nil, false, err + } + indexReq.OutputOffsets = []uint32{uint32(schemaLen)} + return indexReq, indexStreaming, err +} + func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) { - indexReq, indexStreaming, err := b.constructDAGReq(v.IndexPlans) + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), v.IndexPlans) if err != nil { return nil, err } - tableReq, tableStreaming, err := b.constructDAGReq(v.TablePlans) + tableReq, tableStreaming, tbl, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) if err != nil { return nil, err } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) - indexReq.OutputOffsets = []uint32{uint32(len(is.Index.Columns))} - tbl, _ := b.is.TableByID(is.Table.ID) - - for i := 0; i < v.Schema().Len(); i++ { - tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i)) - } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) if isPartition, physicalTableID := ts.IsPartition(); isPartition { pt := tbl.(table.PartitionedTable) @@ -2258,6 +2276,96 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo return ret } +func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalIndexMergeReader) (*IndexMergeReaderExecutor, error) { + partialPlanCount := len(v.PartialPlans) + partialReqs := make([]*tipb.DAGRequest, 0, partialPlanCount) + partialStreamings := make([]bool, 0, partialPlanCount) + indexes := make([]*model.IndexInfo, 0, partialPlanCount) + keepOrders := make([]bool, 0, partialPlanCount) + descs := make([]bool, 0, partialPlanCount) + feedbacks := make([]*statistics.QueryFeedback, 0, partialPlanCount) + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + for i := 0; i < partialPlanCount; i++ { + var tempReq *tipb.DAGRequest + var tempStreaming bool + var err error + + feedback := statistics.NewQueryFeedback(0, nil, 0, ts.Desc) + feedback.Invalidate() + feedbacks = append(feedbacks, feedback) + + if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { + tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), v.PartialPlans[i]) + keepOrders = append(keepOrders, is.KeepOrder) + descs = append(descs, is.Desc) + indexes = append(indexes, is.Index) + } else { + ts := v.PartialPlans[i][0].(*plannercore.PhysicalTableScan) + tempReq, tempStreaming, _, err = buildTableReq(b, len(ts.Columns), v.PartialPlans[i]) + keepOrders = append(keepOrders, ts.KeepOrder) + descs = append(descs, ts.Desc) + indexes = append(indexes, nil) + } + if err != nil { + return nil, err + } + collect := false + tempReq.CollectRangeCounts = &collect + partialReqs = append(partialReqs, tempReq) + partialStreamings = append(partialStreamings, tempStreaming) + } + tableReq, tableStreaming, table, err := buildTableReq(b, v.Schema().Len(), v.TablePlans) + if err != nil { + return nil, err + } + startTS, err := b.getStartTS() + if err != nil { + return nil, err + } + e := &IndexMergeReaderExecutor{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + dagPBs: partialReqs, + startTS: startTS, + table: table, + indexes: indexes, + keepOrders: keepOrders, + descs: descs, + tableRequest: tableReq, + columns: ts.Columns, + partialStreamings: partialStreamings, + tableStreaming: tableStreaming, + partialPlans: v.PartialPlans, + tblPlans: v.TablePlans, + dataReaderBuilder: &dataReaderBuilder{executorBuilder: b}, + feedbacks: feedbacks, + } + collectTable := false + e.tableRequest.CollectRangeCounts = &collectTable + return e, nil +} + +func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) *IndexMergeReaderExecutor { + ret, err := buildNoRangeIndexMergeReader(b, v) + if err != nil { + b.err = err + return nil + } + ret.ranges = make([][]*ranger.Range, 0, len(v.PartialPlans)) + sctx := b.ctx.GetSessionVars().StmtCtx + for i := 0; i < len(v.PartialPlans); i++ { + if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { + ret.ranges = append(ret.ranges, is.Ranges) + sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) + } else { + ret.ranges = append(ret.ranges, v.PartialPlans[i][0].(*plannercore.PhysicalTableScan).Ranges) + } + } + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) + executorCounterIndexMergeReaderExecutor.Inc() + return ret +} + // dataReaderBuilder build an executor. // The executor can be used to read data in the ranges which are constructed by datums. // Differences from executorBuilder: diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go new file mode 100644 index 0000000000000..101a37202ce2e --- /dev/null +++ b/executor/index_merge_reader.go @@ -0,0 +1,710 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "runtime" + "strconv" + "sync" + "sync/atomic" + "unsafe" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/distsql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/ranger" + "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +var ( + _ Executor = &IndexMergeReaderExecutor{} +) + +// IndexMergeReaderExecutor accesses a table with multiple index/table scan. +// There are three types of workers: +// 1. partialTableWorker/partialIndexWorker, which are used to fetch the handles +// 2. indexMergeProcessWorker, which is used to do the `Union` operation. +// 3. indexMergeTableScanWorker, which is used to get the table tuples with the given handles. +// +// The execution flow is really like IndexLookUpReader. However, it uses multiple index scans +// or table scans to get the handles: +// 1. use the partialTableWorkers and partialIndexWorkers to fetch the handles (a batch per time) +// and send them to the indexMergeProcessWorker. +// 2. indexMergeProcessWorker do the `Union` operation for a batch of handles it have got. +// For every handle in the batch: +// 1. check whether it has been accessed. +// 2. if not, record it and send it to the indexMergeTableScanWorker. +// 3. if accessed, just ignore it. +type IndexMergeReaderExecutor struct { + baseExecutor + + table table.Table + indexes []*model.IndexInfo + keepOrders []bool + descs []bool + ranges [][]*ranger.Range + dagPBs []*tipb.DAGRequest + startTS uint64 + // handleIdx is the index of handle, which is only used for case of keeping order. + handleIdx int + tableRequest *tipb.DAGRequest + // columns are only required by union scan. + columns []*model.ColumnInfo + partialStreamings []bool + tableStreaming bool + *dataReaderBuilder + // All fields above are immutable. + + partialWorkerWg sync.WaitGroup + tblWorkerWg sync.WaitGroup + processWokerWg sync.WaitGroup + finished chan struct{} + + kvRanges []kv.KeyRange + + resultCh chan *lookupTableTask + resultCurr *lookupTableTask + feedbacks []*statistics.QueryFeedback + + // memTracker is used to track the memory usage of this executor. + memTracker *memory.Tracker + + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue + + corColInIdxSide bool + partialPlans [][]plannercore.PhysicalPlan + corColInTblSide bool + tblPlans []plannercore.PhysicalPlan + corColInAccess bool + idxCols [][]*expression.Column + colLens [][]int +} + +// Open implements the Executor Open interface +func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error { + kvRangess := make([][]kv.KeyRange, 0, len(e.partialPlans)) + for i, plan := range e.partialPlans { + _, ok := plan[0].(*plannercore.PhysicalIndexScan) + if !ok { + kvRangess = append(kvRangess, nil) + continue + } + kvRanges, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.indexes[i].ID, e.ranges[i], e.feedbacks[i]) + if err != nil { + return err + } + kvRangess = append(kvRangess, kvRanges) + } + + e.finished = make(chan struct{}) + e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) + workCh := make(chan *lookupTableTask, 1) + fetchCh := make(chan *lookupTableTask, len(kvRangess)) + + e.startIndexMergeProcessWorker(ctx, workCh, fetchCh, len(kvRangess)) + for i := 0; i < len(kvRangess); i++ { + e.partialWorkerWg.Add(1) + if e.indexes[i] != nil { + err := e.startPartialIndexWorker(ctx, kvRangess[i], fetchCh, i) + if err != nil { + return err + } + } else { + e.startPartialTableWorker(ctx, fetchCh, i) + } + } + e.startIndexMergeTableScanWorker(ctx, workCh) + return nil +} + +func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *lookupTableTask, fetch <-chan *lookupTableTask, partialWorkerCount int) { + idxMergeProcessWorker := &indexMergeProcessWorker{ + maps: make(map[int64]byte), + } + e.processWokerWg.Add(1) + go func() { + idxMergeProcessWorker.fetchLoop(ctx, partialWorkerCount, fetch, workCh, e.resultCh, e.finished) + e.processWokerWg.Done() + }() +} + +func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, kvRanges []kv.KeyRange, fetchCh chan<- *lookupTableTask, workID int) error { + if e.runtimeStats != nil { + collExec := true + e.dagPBs[workID].CollectExecutionSummaries = &collExec + } + + var builder distsql.RequestBuilder + kvReq, err := builder.SetKeyRanges(kvRanges). + SetDAGRequest(e.dagPBs[workID]). + SetStartTS(e.startTS). + SetDesc(e.descs[workID]). + SetKeepOrder(e.keepOrders[workID]). + SetStreaming(e.partialStreamings[workID]). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return err + } + + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id) + if err != nil { + return err + } + + result.Fetch(ctx) + worker := &partialIndexWorker{ + keepOrder: e.keepOrders[workID], + batchSize: e.maxChunkSize, + maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, + maxChunkSize: e.maxChunkSize, + } + + if worker.batchSize > worker.maxBatchSize { + worker.batchSize = worker.maxBatchSize + } + + go func() { + ctx1, cancel := context.WithCancel(ctx) + _, err := worker.fetchHandles(ctx1, result, fetchCh, e.resultCh, e.finished) + if err != nil { + e.feedbacks[workID].Invalidate() + } + cancel() + if err := result.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) + } + e.ctx.StoreQueryFeedback(e.feedbacks[workID]) + e.partialWorkerWg.Done() + }() + + return nil +} + +func (e *IndexMergeReaderExecutor) buildPartialTableReader(ctx context.Context, workID int) Executor { + tableReaderExec := &TableReaderExecutor{ + baseExecutor: newBaseExecutor(e.ctx, e.schema, stringutil.MemoizeStr(func() string { return e.id.String() + "_tableReader" })), + table: e.table, + dagPB: e.dagPBs[workID], + startTS: e.startTS, + streaming: e.partialStreamings[workID], + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + plans: e.partialPlans[workID], + ranges: e.ranges[workID], + } + return tableReaderExec +} + +func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, fetchCh chan<- *lookupTableTask, workID int) { + partialTableReader := e.buildPartialTableReader(ctx, workID) + err := partialTableReader.Open(ctx) + if err != nil { + logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) + } + tableInfo := e.partialPlans[workID][0].(*plannercore.PhysicalTableScan).Table + worker := &partialTableWorker{ + keepOrder: e.keepOrders[workID], + batchSize: e.maxChunkSize, + maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, + maxChunkSize: e.maxChunkSize, + tableReader: partialTableReader, + tableInfo: tableInfo, + } + + if worker.batchSize > worker.maxBatchSize { + worker.batchSize = worker.maxBatchSize + } + go func() { + ctx1, cancel := context.WithCancel(ctx) + _, err := worker.fetchHandles(ctx1, fetchCh, e.resultCh, e.finished) + if err != nil { + e.feedbacks[workID].Invalidate() + } + cancel() + if err := worker.tableReader.Close(); err != nil { + logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) + } + e.ctx.StoreQueryFeedback(e.feedbacks[workID]) + e.partialWorkerWg.Done() + }() + +} + +type partialTableWorker struct { + keepOrder bool + batchSize int + maxBatchSize int + maxChunkSize int + tableReader Executor + tableInfo *model.TableInfo +} + +func (w *partialTableWorker) fetchHandles(ctx context.Context, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) (count int64, err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("partialTableWorker in IndexMergeReaderExecutor panicked", zap.String("stack", string(buf))) + err4Panic := errors.Errorf("%v", r) + doneCh := make(chan error, 1) + doneCh <- err4Panic + resultCh <- &lookupTableTask{ + doneCh: doneCh, + } + if err != nil { + err = errors.Trace(err4Panic) + } + } + }() + var chk *chunk.Chunk + handleOffset := -1 + if w.tableInfo.PKIsHandle { + handleCol := w.tableInfo.GetPkColInfo() + columns := w.tableInfo.Columns + for i := 0; i < len(columns); i++ { + if columns[i].Name.L == handleCol.Name.L { + handleOffset = i + break + } + } + } else { + return 0, errors.Errorf("cannot find the column for handle") + } + + chk = chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize) + for { + handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleOffset) + if err != nil { + doneCh := make(chan error, 1) + doneCh <- err + resultCh <- &lookupTableTask{ + doneCh: doneCh, + } + return count, err + } + if len(handles) == 0 { + task := w.buildTableTask(handles, retChunk) + fetchCh <- task + return count, nil + } + count += int64(len(handles)) + task := w.buildTableTask(handles, retChunk) + select { + case <-ctx.Done(): + return count, nil + case <-finished: + return count, nil + case fetchCh <- task: + } + } +} + +func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleOffset int) ( + handles []int64, retChk *chunk.Chunk, err error) { + handles = make([]int64, 0, w.batchSize) + for len(handles) < w.batchSize { + chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) + err = errors.Trace(w.tableReader.Next(ctx, chk)) + if err != nil { + return handles, nil, err + } + if chk.NumRows() == 0 { + return handles, retChk, nil + } + for i := 0; i < chk.NumRows(); i++ { + h := chk.GetRow(i).GetInt64(handleOffset) + handles = append(handles, h) + } + } + w.batchSize *= 2 + if w.batchSize > w.maxBatchSize { + w.batchSize = w.maxBatchSize + } + return handles, retChk, nil +} + +func (w *partialTableWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { + var indexOrder map[int64]int + var duplicatedIndexOrder map[int64]int + if w.keepOrder { + indexOrder = make(map[int64]int, len(handles)) + for i, h := range handles { + indexOrder[h] = i + } + } + task := &lookupTableTask{ + handles: handles, + indexOrder: indexOrder, + duplicatedIndexOrder: duplicatedIndexOrder, + idxRows: retChk, + } + + task.doneCh = make(chan error, 1) + return task +} + +func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Context, workCh <-chan *lookupTableTask) { + lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency + e.tblWorkerWg.Add(lookupConcurrencyLimit) + for i := 0; i < lookupConcurrencyLimit; i++ { + worker := &indexMergeTableScanWorker{ + workCh: workCh, + finished: e.finished, + buildTblReader: e.buildFinalTableReader, + handleIdx: e.handleIdx, + tblPlans: e.tblPlans, + memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), + e.ctx.GetSessionVars().MemQuotaIndexLookupReader), + } + ctx1, cancel := context.WithCancel(ctx) + go func() { + worker.pickAndExecTask(ctx1) + cancel() + e.tblWorkerWg.Done() + }() + } +} + +func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, handles []int64) (Executor, error) { + tableReaderExec := &TableReaderExecutor{ + baseExecutor: newBaseExecutor(e.ctx, e.schema, stringutil.MemoizeStr(func() string { return e.id.String() + "_tableReader" })), + table: e.table, + dagPB: e.tableRequest, + startTS: e.startTS, + streaming: e.tableStreaming, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + plans: e.tblPlans, + } + tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles) + if err != nil { + logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err)) + return nil, err + } + return tableReader, nil +} + +// Next implements Executor Next interface. +func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + for { + resultTask, err := e.getResultTask() + if err != nil { + return errors.Trace(err) + } + if resultTask == nil { + return nil + } + for resultTask.cursor < len(resultTask.rows) { + req.AppendRow(resultTask.rows[resultTask.cursor]) + resultTask.cursor++ + if req.NumRows() >= e.maxChunkSize { + return nil + } + } + } +} + +func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { + if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { + return e.resultCurr, nil + } + task, ok := <-e.resultCh + if !ok { + return nil, nil + } + if err := <-task.doneCh; err != nil { + return nil, errors.Trace(err) + } + + // Release the memory usage of last task before we handle a new task. + if e.resultCurr != nil { + e.resultCurr.memTracker.Consume(-e.resultCurr.memUsage) + } + e.resultCurr = task + return e.resultCurr, nil +} + +// Close implements Exec Close interface. +func (e *IndexMergeReaderExecutor) Close() error { + if e.finished == nil { + return nil + } + close(e.finished) + e.processWokerWg.Wait() + e.tblWorkerWg.Wait() + e.partialWorkerWg.Wait() + e.finished = nil + // TODO: how to store e.feedbacks + return nil +} + +type indexMergeProcessWorker struct { + maps map[int64]byte +} + +func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, partialWorkerCount int, + fetchCh <-chan *lookupTableTask, workCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("indexMergeTableWorker in IndexMergeReaderExecutor panicked", zap.String("stack", string(buf))) + err4Panic := errors.Errorf("%v", r) + doneCh := make(chan error, 1) + doneCh <- err4Panic + resultCh <- &lookupTableTask{ + doneCh: doneCh, + } + } + }() + + var task *lookupTableTask + var ok bool + for { + if partialWorkerCount == 0 { + close(workCh) + close(resultCh) + return + } + select { + case task, ok = <-fetchCh: + if !ok { + return + } + handles := task.handles + hc := len(handles) + if hc == 0 { + partialWorkerCount-- + continue + } + fhs := make([]int64, 0, 8) + for i := 0; i < hc; i++ { + if _, ok := w.maps[handles[i]]; !ok { + fhs = append(fhs, handles[i]) + w.maps[handles[i]] = 0 + } + } + if len(fhs) == 0 { + continue + } + task := &lookupTableTask{ + handles: fhs, + doneCh: make(chan error, 1), + } + select { + case <-ctx.Done(): + return + case <-finished: + return + case workCh <- task: + resultCh <- task + } + } + } +} + +type partialIndexWorker struct { + keepOrder bool + batchSize int + maxBatchSize int + maxChunkSize int +} + +func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) (count int64, err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("indexWorker in IndexMergeReaderExecutor panicked", zap.String("stack", string(buf))) + err4Panic := errors.Errorf("%v", r) + doneCh := make(chan error, 1) + doneCh <- err4Panic + resultCh <- &lookupTableTask{ + doneCh: doneCh, + } + if err != nil { + err = errors.Trace(err4Panic) + } + } + }() + var chk *chunk.Chunk + chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize) + for { + handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) + if err != nil { + doneCh := make(chan error, 1) + doneCh <- err + resultCh <- &lookupTableTask{ + doneCh: doneCh, + } + return count, err + } + if len(handles) == 0 { + task := w.buildTableTask(handles, retChunk) + fetchCh <- task + return count, nil + } + count += int64(len(handles)) + task := w.buildTableTask(handles, retChunk) + select { + case <-ctx.Done(): + return count, nil + case <-finished: + return count, nil + case fetchCh <- task: + } + } +} + +func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( + handles []int64, retChk *chunk.Chunk, err error) { + handleOffset := chk.NumCols() - 1 + handles = make([]int64, 0, w.batchSize) + for len(handles) < w.batchSize { + chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) + err = errors.Trace(idxResult.Next(ctx, chk)) + if err != nil { + return handles, nil, err + } + if chk.NumRows() == 0 { + return handles, retChk, nil + } + for i := 0; i < chk.NumRows(); i++ { + h := chk.GetRow(i).GetInt64(handleOffset) + handles = append(handles, h) + } + } + w.batchSize *= 2 + if w.batchSize > w.maxBatchSize { + w.batchSize = w.maxBatchSize + } + return handles, retChk, nil +} + +func (w *partialIndexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { + var indexOrder map[int64]int + var duplicatedIndexOrder map[int64]int + if w.keepOrder { + // Save the index order. + indexOrder = make(map[int64]int, len(handles)) + for i, h := range handles { + indexOrder[h] = i + } + } + task := &lookupTableTask{ + handles: handles, + indexOrder: indexOrder, + duplicatedIndexOrder: duplicatedIndexOrder, + idxRows: retChk, + } + + task.doneCh = make(chan error, 1) + return task +} + +type indexMergeTableScanWorker struct { + workCh <-chan *lookupTableTask + finished <-chan struct{} + buildTblReader func(ctx context.Context, handles []int64) (Executor, error) + handleIdx int + tblPlans []plannercore.PhysicalPlan + + // memTracker is used to track the memory usage of this executor. + memTracker *memory.Tracker +} + +func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) { + var task *lookupTableTask + var ok bool + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("indexMergeTableWorker in IndexMergeReaderExecutor panicked", zap.String("stack", string(buf))) + task.doneCh <- errors.Errorf("%v", r) + } + }() + for { + select { + case task, ok = <-w.workCh: + if !ok { + return + } + case <-w.finished: + return + } + err := w.executeTask(ctx, task) + task.doneCh <- err + } +} + +func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *lookupTableTask) error { + tableReader, err := w.buildTblReader(ctx, task.handles) + if err != nil { + logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) + return err + } + defer terror.Call(tableReader.Close) + task.memTracker = w.memTracker + memUsage := int64(cap(task.handles) * 8) + task.memUsage = memUsage + task.memTracker.Consume(memUsage) + handleCnt := len(task.handles) + task.rows = make([]chunk.Row, 0, handleCnt) + for { + chk := newFirstChunk(tableReader) + err = Next(ctx, tableReader, chk) + if err != nil { + logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err)) + return err + } + if chk.NumRows() == 0 { + break + } + memUsage = chk.MemoryUsage() + task.memUsage += memUsage + task.memTracker.Consume(memUsage) + iter := chunk.NewIterator4Chunk(chk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + task.rows = append(task.rows, row) + } + } + + memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{})) + task.memUsage += memUsage + task.memTracker.Consume(memUsage) + if handleCnt != len(task.rows) && len(w.tblPlans) == 1 { + return errors.Errorf("handle count %d isn't equal to value count %d", handleCnt, len(task.rows)) + } + return nil +} diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go new file mode 100644 index 0000000000000..4eb03d33fbff6 --- /dev/null +++ b/executor/index_merge_reader_test.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/testkit" +) + +func (s *testSuite1) TestSingleTableRead(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)") + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ a from t1 where id < 2 or a > 4 order by a").Check(testkit.Rows("1", + "5")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ sum(a) from t1 where id < 2 or a > 4").Check(testkit.Rows("6")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ * from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ a from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1", + "5")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6")) +} + +func (s *testSuite1) TestJoin(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(id int primary key, a int, b int, c int, d int)") + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec("create table t2(id int primary key, a int)") + tk.MustExec("create index t2a on t2(a)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustExec("insert into t2 values(1,1),(5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 4").Check(testkit.Rows("6")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 5").Check(testkit.Rows("1")) +} diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 99e86b29d85f8..c91f863a606b6 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -515,7 +515,7 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, idx := path.Index is, partialCost, rowCount := ds.getOriginalPhysicalIndexScan(prop, path, false, false) rowSize := is.indexScanRowSize(idx, ds, false) - isCovered = isCoveringIndex(ds.schema.Columns, path.FullIdxCols, path.FullIdxColLens, ds.tableInfo.PKIsHandle) + // TODO: Consider using isCoveringIndex() to avoid another TableRead indexConds := path.IndexFilters sessVars := ds.ctx.GetSessionVars() if indexConds != nil { @@ -533,11 +533,11 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, indexPlan := PhysicalSelection{Conditions: indexConds}.Init(is.ctx, stats, ds.blockOffset) indexPlan.SetChildren(is) partialCost += rowCount * rowSize * sessVars.NetworkFactor - return indexPlan, partialCost, rowCount, isCovered + return indexPlan, partialCost, rowCount, false } partialCost += rowCount * rowSize * sessVars.NetworkFactor indexPlan = is - return indexPlan, partialCost, rowCount, isCovered + return indexPlan, partialCost, rowCount, false } func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, path *util.AccessPath) ( diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 2da860024e37a..e8a13b8bc6ddb 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -153,9 +153,11 @@ type PhysicalIndexMergeReader struct { // PartialPlans flats the partialPlans to construct executor pb. PartialPlans [][]PhysicalPlan // TablePlans flats the tablePlan to construct executor pb. - TablePlans []PhysicalPlan + TablePlans []PhysicalPlan + // partialPlans are the partial plans that have not been flatted. The type of each element is permitted PhysicalIndexScan or PhysicalTableScan. partialPlans []PhysicalPlan - tablePlan PhysicalPlan + // tablePlan is a PhysicalTableScan to get the table tuples. Current, it must be not nil. + tablePlan PhysicalPlan } // PhysicalIndexScan represents an index scan plan. diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index 59dedcb4085f8..a83d38c355162 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -298,6 +298,23 @@ func (p *PhysicalIndexLookUpReader) ResolveIndices() (err error) { return } +// ResolveIndices implements Plan interface. +func (p *PhysicalIndexMergeReader) ResolveIndices() (err error) { + if p.tablePlan != nil { + err = p.tablePlan.ResolveIndices() + if err != nil { + return err + } + } + for i := 0; i < len(p.partialPlans); i++ { + err = p.partialPlans[i].ResolveIndices() + if err != nil { + return err + } + } + return nil +} + // ResolveIndices implements Plan interface. func (p *PhysicalSelection) ResolveIndices() (err error) { err = p.basePhysicalPlan.ResolveIndices() diff --git a/planner/core/stats.go b/planner/core/stats.go index 7cda2a470523a..ed3c463de8f80 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -211,6 +211,16 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * break } } + + // TODO: implement UnionScan + IndexMerge + isReadOnlyTxn := true + txn, err := ds.ctx.Txn(false) + if err != nil { + return nil, err + } + if txn.Valid() && !txn.IsReadOnly() { + isReadOnlyTxn = false + } // Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case. isPossibleIdxMerge := len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 sessionAndStmtPermission := (ds.ctx.GetSessionVars().GetEnableIndexMerge() || ds.indexMergeHints != nil) && !ds.ctx.GetSessionVars().StmtCtx.NoIndexMergeHint @@ -222,8 +232,8 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * break } } - if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge { - ds.generateAndPruneIndexMergePath() + if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn { + ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil) } else if ds.indexMergeHints != nil { ds.indexMergeHints = nil ds.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("IndexMerge is inapplicable or disabled")) @@ -231,7 +241,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema * return ds.stats, nil } -func (ds *DataSource) generateAndPruneIndexMergePath() { +func (ds *DataSource) generateAndPruneIndexMergePath(needPrune bool) { regularPathCount := len(ds.possibleAccessPaths) ds.generateIndexMergeOrPaths() // If without hints, it means that `enableIndexMerge` is true @@ -245,7 +255,9 @@ func (ds *DataSource) generateAndPruneIndexMergePath() { return } // Do not need to consider the regular paths in find_best_task(). - ds.possibleAccessPaths = ds.possibleAccessPaths[regularPathCount:] + if needPrune { + ds.possibleAccessPaths = ds.possibleAccessPaths[regularPathCount:] + } } // DeriveStats implements LogicalPlan DeriveStats interface.