From 30c068a14b36e1d1a0dc237b74210b356f8b24dc Mon Sep 17 00:00:00 2001 From: ldeng Date: Tue, 6 Oct 2020 20:10:36 +0800 Subject: [PATCH 1/3] executor: support read global indexes in IndexMergeReader --- executor/builder.go | 32 ++++++---- executor/index_merge_reader.go | 104 +++++++++++++++++++++++++++---- executor/partition_table_test.go | 46 ++++++++++++++ planner/core/handle_cols.go | 64 +++++++++++++++++++ 4 files changed, 221 insertions(+), 25 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index cbedfc175399b..ba48c81e0017c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2809,13 +2809,13 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tableStreaming, tbl, err } -func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { +func buildIndexReq(b *executorBuilder, schemaLen, handleAndPidLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV) if err != nil { return nil, false, err } indexReq.OutputOffsets = []uint32{} - for i := 0; i < handleLen; i++ { + for i := 0; i < handleAndPidLen; i++ { indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(schemaLen+i)) } if len(indexReq.OutputOffsets) == 0 { @@ -2824,19 +2824,24 @@ func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []planner return indexReq, indexStreaming, err } -func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) { - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) - var handleLen int - if len(v.CommonHandleCols) != 0 { - handleLen = len(v.CommonHandleCols) +func getHandleAndPidLen(commonHandleLen int, global bool) int { + var ret int + if commonHandleLen != 0 { + ret = commonHandleLen } else { - handleLen = 1 + // int handle + ret = 1 } - if is.Index.Global { - // Should output pid col. - handleLen++ + if global { + ret++ } - indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), handleLen, v.IndexPlans) + return ret +} + +func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + handleAndPidLen := getHandleAndPidLen(len(v.CommonHandleCols), is.Index.Global) + indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), handleAndPidLen, v.IndexPlans) if err != nil { return nil, err } @@ -2961,7 +2966,8 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd feedbacks = append(feedbacks, feedback) if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok { - tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i]) + handleAndPidLen := getHandleAndPidLen(ts.HandleCols.NumCols(), is.Index.Global) + tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), handleAndPidLen, v.PartialPlans[i]) keepOrders = append(keepOrders, is.KeepOrder) descs = append(descs, is.Desc) indexes = append(indexes, is.Index) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 689e466e3659c..ddb6a82a2eeb9 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -93,6 +93,10 @@ type IndexMergeReaderExecutor struct { // checkIndexValue is used to check the consistency of the index data. *checkIndexValue + hasGlobalIndex bool + // skipGlobalIndex indicates whether global indexes has been read (when reading first partition). + skipGlobalIndex bool + corColInIdxSide bool partialPlans [][]plannercore.PhysicalPlan corColInTblSide bool @@ -102,13 +106,16 @@ type IndexMergeReaderExecutor struct { colLens [][]int handleCols plannercore.HandleCols + + // handleMaps use to temporarily store handles read from global indexes. + handleMaps map[int64]*kv.HandleMap } // Open implements the Executor Open interface func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error { e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans)) for i, plan := range e.partialPlans { - _, ok := plan[0].(*plannercore.PhysicalIndexScan) + is, ok := plan[0].(*plannercore.PhysicalIndexScan) if !ok { if e.table.Meta().IsCommonHandle { keyRanges, err := distsql.CommonHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{getPhysicalTableID(e.table)}, e.ranges[i]) @@ -121,7 +128,14 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error { } continue } - keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.indexes[i].ID, e.ranges[i], e.feedbacks[i]) + var pid int64 + if is.Index.Global { + e.hasGlobalIndex = true + pid = e.table.Meta().ID + } else { + pid = getPhysicalTableID(e.table) + } + keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, pid, e.indexes[i].ID, e.ranges[i], e.feedbacks[i]) if err != nil { return err } @@ -129,6 +143,12 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error { } e.finished = make(chan struct{}) e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) + if e.hasGlobalIndex && !e.skipGlobalIndex { + e.handleMaps = make(map[int64]*kv.HandleMap) + for _, p := range e.table.Meta().GetPartitionInfo().Definitions { + e.handleMaps[p.ID] = kv.NewHandleMap() + } + } return nil } @@ -142,10 +162,14 @@ func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { var err error var partialWorkerWg sync.WaitGroup for i := 0; i < len(e.keyRanges); i++ { - partialWorkerWg.Add(1) if e.indexes[i] != nil { + if e.indexes[i].Global && e.skipGlobalIndex { + continue + } + partialWorkerWg.Add(1) err = e.startPartialIndexWorker(ctx, exitCh, fetchCh, i, &partialWorkerWg, e.keyRanges[i]) } else { + partialWorkerWg.Add(1) err = e.startPartialTableWorker(ctx, exitCh, fetchCh, i, &partialWorkerWg) } if err != nil { @@ -160,6 +184,7 @@ func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { } e.startIndexMergeTableScanWorker(ctx, workCh) e.workerStarted = true + e.skipGlobalIndex = true return nil } @@ -169,7 +194,11 @@ func (e *IndexMergeReaderExecutor) waitPartialWorkersAndCloseFetchChan(partialWo } func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *lookupTableTask, fetch <-chan *lookupTableTask) { - idxMergeProcessWorker := &indexMergeProcessWorker{} + idxMergeProcessWorker := &indexMergeProcessWorker{ + handleMaps: e.handleMaps, + physicalTableID: getPhysicalTableID(e.table), + processGlobalIndex: e.hasGlobalIndex && !e.skipGlobalIndex, + } e.processWokerWg.Add(1) go func() { defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() @@ -203,7 +232,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, return err } - result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id) + global := e.indexes[workID].Global + fieldsTypes := e.handleCols.GetFieldsTypesOfPartitionedTableIndex(global) + result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, fieldsTypes, e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id) if err != nil { return err } @@ -214,6 +245,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, batchSize: e.maxChunkSize, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + global: global, } if worker.batchSize > worker.maxBatchSize { @@ -505,6 +537,9 @@ func (e *IndexMergeReaderExecutor) Close() error { } type indexMergeProcessWorker struct { + handleMaps map[int64]*kv.HandleMap + physicalTableID int64 + processGlobalIndex bool } func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask, @@ -514,15 +549,57 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan close(resultCh) }() - distinctHandles := kv.NewHandleMap() + var distinctHandles *kv.HandleMap + if w.handleMaps != nil { + distinctHandles = w.handleMaps[w.physicalTableID] + // Process handles read from global indexes. + fhs := make([]kv.Handle, 0, 8) + distinctHandles.Range(func(h kv.Handle, val interface{}) bool { + fhs = append(fhs, h) + return true + }) + if len(fhs) != 0 { + task := &lookupTableTask{ + handles: fhs, + doneCh: make(chan error, 1), + } + select { + case <-ctx.Done(): + return + case <-finished: + return + case workCh <- task: + resultCh <- task + } + } + } else { + distinctHandles = kv.NewHandleMap() + } for task := range fetchCh { handles := task.handles + if len(handles) == 0 { + continue + } fhs := make([]kv.Handle, 0, 8) - for _, h := range handles { - if _, ok := distinctHandles.Get(h); !ok { - fhs = append(fhs, h) - distinctHandles.Set(h, true) + _, ok := handles[0].(kv.PartitionHandle) + if w.processGlobalIndex && ok { + for _, ph := range handles { + h := ph.(kv.PartitionHandle).Handle + pid := ph.(kv.PartitionHandle).PartitionID + if _, ok := w.handleMaps[pid].Get(h); !ok { + if pid == w.physicalTableID { + fhs = append(fhs, h) + } + w.handleMaps[pid].Set(h, true) + } + } + } else { + for _, h := range handles { + if _, ok := distinctHandles.Get(h); !ok { + fhs = append(fhs, h) + distinctHandles.Set(h, true) + } } } if len(fhs) == 0 { @@ -564,6 +641,7 @@ type partialIndexWorker struct { batchSize int maxBatchSize int maxChunkSize int + global bool } func (w *partialIndexWorker) fetchHandles( @@ -574,7 +652,7 @@ func (w *partialIndexWorker) fetchHandles( resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { - chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) + chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypesOfPartitionedTableIndex(w.global), w.maxChunkSize) for { handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { @@ -615,7 +693,9 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, retChk, nil } for i := 0; i < chk.NumRows(); i++ { - handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i)) + var handle kv.Handle + var err error + handle, err = handleCols.BuildHandleFromPartitionedTableIndexRow(chk.GetRow(i), w.global) if err != nil { return nil, nil, err } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 920fa09a5acef..b6145893e64bc 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -200,3 +200,49 @@ partition p2 values less than (10))`) tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)") tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9")) } + +func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec(`create table t1 (id int primary key, a int, b int, c int, d int) + partition by range (id) ( + partition p0 values less than (3), + partition p1 values less than (6), + partition p2 values less than (10) + )`) + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ a from t1 where id < 2 or a > 4 order by a").Check(testkit.Rows("1", + "5")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ sum(a) from t1 where id < 2 or a > 4").Check(testkit.Rows("6")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ * from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1 1 1 1 1", + "5 5 5 5 5")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ a from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1", + "5")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6")) + + tk.MustExec("drop table if exists t1, t2") + tk.MustExec(`create table t1 (id int primary key, a int, b int, c int, d int) + partition by range (id) ( + partition p0 values less than (4), + partition p1 values less than (7), + partition p2 values less than (10) + )`) + tk.MustExec("create index t1a on t1(a)") + tk.MustExec("create index t1b on t1(b)") + tk.MustExec(`create table t2 (id int primary key, a int) + partition by range (id) ( + partition p0 values less than (4), + partition p1 values less than (7), + partition p2 values less than (10) + )`) + tk.MustExec("create index t2a on t2(a)") + tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") + tk.MustExec("insert into t2 values(1,1),(5,5)") + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 4").Check(testkit.Rows("6")) + tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 5").Check(testkit.Rows("1")) + +} diff --git a/planner/core/handle_cols.go b/planner/core/handle_cols.go index b66aa964d5e51..9f79ae01cd39a 100644 --- a/planner/core/handle_cols.go +++ b/planner/core/handle_cols.go @@ -35,6 +35,10 @@ type HandleCols interface { BuildHandleByDatums(row []types.Datum) (kv.Handle, error) // BuildHandleFromIndexRow builds a Handle from index row data. BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, error) + // BuildHandleFromPartitionedTableIndexRow builds a Handle from index row data of partitioned table. + // If global is true, partition ID will be decoded from the last column, and return PartitionHandle, + // otherwise return value is the same as BuildHandleFromIndexRow. + BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) // ResolveIndices resolves handle column indices. ResolveIndices(schema *expression.Schema) (HandleCols, error) // IsInt returns if the HandleCols is a single tnt column. @@ -49,6 +53,9 @@ type HandleCols interface { Compare(a, b []types.Datum) (int, error) // GetFieldTypes return field types of columns GetFieldsTypes() []*types.FieldType + // GetFieldsTypesOfPartitionedTableIndex return field types of columns, + // if global is true, partition ID column will append to the end. + GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType } // CommonHandleCols implements the kv.HandleCols interface. @@ -86,6 +93,27 @@ func (cb *CommonHandleCols) BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, e return cb.buildHandleByDatumsBuffer(datumBuf) } +// BuildHandleFromPartitionedTableIndexRow implements the kv.HandleCols interface. +func (cb *CommonHandleCols) BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) { + handleColsOffset := row.Len() - cb.NumCols() + if global { + handleColsOffset-- + } + datumBuf := make([]types.Datum, 0, 4) + for i := 0; i < cb.NumCols(); i++ { + datumBuf = append(datumBuf, row.GetDatum(handleColsOffset+i, cb.columns[i].RetType)) + } + h, err := cb.buildHandleByDatumsBuffer(datumBuf) + if err != nil { + return h, err + } + if global { + pid := row.GetInt64(row.Len() - 1) + h = kv.NewPartitionHandle(pid, h) + } + return h, nil +} + // BuildHandleByDatums implements the kv.HandleCols interface. func (cb *CommonHandleCols) BuildHandleByDatums(row []types.Datum) (kv.Handle, error) { datumBuf := make([]types.Datum, 0, 4) @@ -167,6 +195,18 @@ func (cb *CommonHandleCols) GetFieldsTypes() []*types.FieldType { return fieldTps } +// GetFieldsTypesOfPartitionedTableIndex implements the kv.HandleCols interface. +func (cb *CommonHandleCols) GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType { + fieldTps := make([]*types.FieldType, 0, len(cb.columns)+1) + for _, col := range cb.columns { + fieldTps = append(fieldTps, col.RetType) + } + if global { + fieldTps = append(fieldTps, types.NewFieldType(mysql.TypeLonglong)) + } + return fieldTps +} + // NewCommonHandleCols creates a new CommonHandleCols. func NewCommonHandleCols(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, tableColumns []*expression.Column) *CommonHandleCols { @@ -197,6 +237,21 @@ func (ib *IntHandleCols) BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, erro return kv.IntHandle(row.GetInt64(row.Len() - 1)), nil } +// BuildHandleFromPartitionedTableIndexRow implements the kv.HandleCols interface. +func (ib *IntHandleCols) BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) { + handleColsOffset := row.Len() - 1 + if global { + handleColsOffset-- + } + var h kv.Handle + h = kv.IntHandle(row.GetInt64(handleColsOffset)) + if global { + pid := row.GetInt64(row.Len() - 1) + h = kv.NewPartitionHandle(pid, h) + } + return h, nil +} + // BuildHandleByDatums implements the kv.HandleCols interface. func (ib *IntHandleCols) BuildHandleByDatums(row []types.Datum) (kv.Handle, error) { return kv.IntHandle(row[ib.col.Index].GetInt64()), nil @@ -252,6 +307,15 @@ func (ib *IntHandleCols) GetFieldsTypes() []*types.FieldType { return []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} } +// GetFieldsTypesOfPartitionedTableIndex implements the kv.HandleCols interface. +func (ib *IntHandleCols) GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType { + fieldTps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + if global { + fieldTps = append(fieldTps, types.NewFieldType(mysql.TypeLonglong)) + } + return fieldTps +} + // NewIntHandleCols creates a new IntHandleCols. func NewIntHandleCols(col *expression.Column) HandleCols { return &IntHandleCols{col: col} From 41b2a5c652a8552a04629921e0a5799a3c177b34 Mon Sep 17 00:00:00 2001 From: ldeng Date: Wed, 7 Oct 2020 20:16:49 +0800 Subject: [PATCH 2/3] correctly test --- executor/partition_table_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index b6145893e64bc..eab79da88cba7 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -210,7 +210,9 @@ func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) { partition p1 values less than (6), partition p2 values less than (10) )`) - tk.MustExec("create index t1a on t1(a)") + // Global index on a + tk.MustExec("create unique index t1a on t1(a)") + // Local index on b tk.MustExec("create index t1b on t1(b)") tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1", @@ -231,7 +233,9 @@ func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) { partition p1 values less than (7), partition p2 values less than (10) )`) - tk.MustExec("create index t1a on t1(a)") + // Global index on a + tk.MustExec("create unique index t1a on t1(a)") + // Local index on b tk.MustExec("create index t1b on t1(b)") tk.MustExec(`create table t2 (id int primary key, a int) partition by range (id) ( @@ -239,7 +243,7 @@ func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) { partition p1 values less than (7), partition p2 values less than (10) )`) - tk.MustExec("create index t2a on t2(a)") + tk.MustExec("create unique index t2a on t2(a)") tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)") tk.MustExec("insert into t2 values(1,1),(5,5)") tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 4").Check(testkit.Rows("6")) From 3e914594d4dbf386cb21759b81db7a37d58ef011 Mon Sep 17 00:00:00 2001 From: ldeng Date: Fri, 9 Oct 2020 09:55:27 +0800 Subject: [PATCH 3/3] executor: support index join with global index --- executor/builder.go | 4 ++-- executor/partition_table_test.go | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index ba48c81e0017c..591a553f4412c 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3299,7 +3299,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte return nil, err } tbInfo := e.table.Meta() - if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { + if tbInfo.GetPartitionInfo() == nil || e.index.Global || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err @@ -3349,7 +3349,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } tbInfo := e.table.Meta() - if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { + if tbInfo.GetPartitionInfo() == nil || e.index.Global || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index eab79da88cba7..02814b22bcfd8 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -248,5 +248,23 @@ func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) { tk.MustExec("insert into t2 values(1,1),(5,5)") tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 4").Check(testkit.Rows("6")) tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 5").Check(testkit.Rows("1")) +} + +func (s *globalIndexSuite) TestPartitionIndexJoinWithGlobalIndex(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists p, t") + tk.MustExec(`create table p (id int, c int) partition by range (c) ( + partition p0 values less than (4), + partition p1 values less than (7), + partition p2 values less than (10))`) + tk.MustExec(`create unique index i_id on p(id)`) + tk.MustExec(`create unique index i_c on p(c)`) + tk.MustExec("create table t (id int)") + tk.MustExec("insert into p values (3,3), (4,4), (6,6), (9,9)") + tk.MustExec("insert into t values (4), (9)") + // Build indexLookUp in index join + tk.MustQuery("select /*+ INL_JOIN(p) */ * from p, t where p.id = t.id").Sort().Check(testkit.Rows("4 4 4", "9 9 9")) + // Build index reader in index join + tk.MustQuery("select /*+ INL_JOIN(p) */ p.id from p, t where p.id = t.id").Check(testkit.Rows("4", "9")) }