Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support read global indexes in IndexMergeReader and index join #20350

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2816,13 +2816,13 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
return tableReq, tableStreaming, tbl, err
}

func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
func buildIndexReq(b *executorBuilder, schemaLen, handleAndPidLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, err
}
indexReq.OutputOffsets = []uint32{}
for i := 0; i < handleLen; i++ {
for i := 0; i < handleAndPidLen; i++ {
indexReq.OutputOffsets = append(indexReq.OutputOffsets, uint32(schemaLen+i))
}
if len(indexReq.OutputOffsets) == 0 {
Expand All @@ -2831,19 +2831,24 @@ func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []planner
return indexReq, indexStreaming, err
}

func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
var handleLen int
if len(v.CommonHandleCols) != 0 {
handleLen = len(v.CommonHandleCols)
func getHandleAndPidLen(commonHandleLen int, global bool) int {
var ret int
if commonHandleLen != 0 {
ret = commonHandleLen
} else {
handleLen = 1
// int handle
ret = 1
}
if is.Index.Global {
// Should output pid col.
handleLen++
if global {
ret++
}
indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), handleLen, v.IndexPlans)
return ret
}

func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
handleAndPidLen := getHandleAndPidLen(len(v.CommonHandleCols), is.Index.Global)
indexReq, indexStreaming, err := buildIndexReq(b, len(is.Index.Columns), handleAndPidLen, v.IndexPlans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2968,7 +2973,8 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
feedbacks = append(feedbacks, feedback)

if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i])
handleAndPidLen := getHandleAndPidLen(ts.HandleCols.NumCols(), is.Index.Global)
tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), handleAndPidLen, v.PartialPlans[i])
keepOrders = append(keepOrders, is.KeepOrder)
descs = append(descs, is.Desc)
indexes = append(indexes, is.Index)
Expand Down Expand Up @@ -3353,7 +3359,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
return nil, err
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if tbInfo.GetPartitionInfo() == nil || e.index.Global || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3403,7 +3409,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}

tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if tbInfo.GetPartitionInfo() == nil || e.index.Global || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
Expand Down
104 changes: 92 additions & 12 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type IndexMergeReaderExecutor struct {
// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue

hasGlobalIndex bool
// skipGlobalIndex indicates whether global indexes has been read (when reading first partition).
skipGlobalIndex bool

corColInIdxSide bool
partialPlans [][]plannercore.PhysicalPlan
corColInTblSide bool
Expand All @@ -102,13 +106,16 @@ type IndexMergeReaderExecutor struct {
colLens [][]int

handleCols plannercore.HandleCols

// handleMaps use to temporarily store handles read from global indexes.
handleMaps map[int64]*kv.HandleMap
}

// Open implements the Executor Open interface
func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
for i, plan := range e.partialPlans {
_, ok := plan[0].(*plannercore.PhysicalIndexScan)
is, ok := plan[0].(*plannercore.PhysicalIndexScan)
if !ok {
if e.table.Meta().IsCommonHandle {
keyRanges, err := distsql.CommonHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, []int64{getPhysicalTableID(e.table)}, e.ranges[i])
Expand All @@ -121,14 +128,27 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error {
}
continue
}
keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
var pid int64
if is.Index.Global {
e.hasGlobalIndex = true
pid = e.table.Meta().ID
} else {
pid = getPhysicalTableID(e.table)
}
keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, pid, e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
if err != nil {
return err
}
e.keyRanges = append(e.keyRanges, keyRange)
}
e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
if e.hasGlobalIndex && !e.skipGlobalIndex {
e.handleMaps = make(map[int64]*kv.HandleMap)
for _, p := range e.table.Meta().GetPartitionInfo().Definitions {
e.handleMaps[p.ID] = kv.NewHandleMap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the maximum partitions count is 4096, so there is a potential performance regression here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we make the may lazy initialized, maybe there would be a data race

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that handles are unique in whole partitioned table. How about use a single global handleMap to store all handles and not clear when call nextPartition. But memory resource can't immediately recycle after processed every partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or use two handle maps, one store handles read from global index, another store handles read from local index, so that local handle map could free after process every partition. But we must check twice for every handles.

}
}
return nil
}

Expand All @@ -142,10 +162,14 @@ func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error {
var err error
var partialWorkerWg sync.WaitGroup
for i := 0; i < len(e.keyRanges); i++ {
partialWorkerWg.Add(1)
if e.indexes[i] != nil {
if e.indexes[i].Global && e.skipGlobalIndex {
continue
}
partialWorkerWg.Add(1)
err = e.startPartialIndexWorker(ctx, exitCh, fetchCh, i, &partialWorkerWg, e.keyRanges[i])
} else {
partialWorkerWg.Add(1)
err = e.startPartialTableWorker(ctx, exitCh, fetchCh, i, &partialWorkerWg)
}
if err != nil {
Expand All @@ -160,6 +184,7 @@ func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error {
}
e.startIndexMergeTableScanWorker(ctx, workCh)
e.workerStarted = true
e.skipGlobalIndex = true
return nil
}

Expand All @@ -169,7 +194,11 @@ func (e *IndexMergeReaderExecutor) waitPartialWorkersAndCloseFetchChan(partialWo
}

func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *lookupTableTask, fetch <-chan *lookupTableTask) {
idxMergeProcessWorker := &indexMergeProcessWorker{}
idxMergeProcessWorker := &indexMergeProcessWorker{
handleMaps: e.handleMaps,
physicalTableID: getPhysicalTableID(e.table),
processGlobalIndex: e.hasGlobalIndex && !e.skipGlobalIndex,
}
e.processWokerWg.Add(1)
go func() {
defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End()
Expand Down Expand Up @@ -203,7 +232,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
return err
}

result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id)
global := e.indexes[workID].Global
fieldsTypes := e.handleCols.GetFieldsTypesOfPartitionedTableIndex(global)
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, fieldsTypes, e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id)
if err != nil {
return err
}
Expand All @@ -214,6 +245,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
batchSize: e.maxChunkSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
global: global,
}

if worker.batchSize > worker.maxBatchSize {
Expand Down Expand Up @@ -505,6 +537,9 @@ func (e *IndexMergeReaderExecutor) Close() error {
}

type indexMergeProcessWorker struct {
handleMaps map[int64]*kv.HandleMap
physicalTableID int64
processGlobalIndex bool
}

func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask,
Expand All @@ -514,15 +549,57 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan
close(resultCh)
}()

distinctHandles := kv.NewHandleMap()
var distinctHandles *kv.HandleMap
if w.handleMaps != nil {
distinctHandles = w.handleMaps[w.physicalTableID]
// Process handles read from global indexes.
fhs := make([]kv.Handle, 0, 8)
distinctHandles.Range(func(h kv.Handle, val interface{}) bool {
fhs = append(fhs, h)
return true
})
if len(fhs) != 0 {
task := &lookupTableTask{
handles: fhs,
doneCh: make(chan error, 1),
}
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
resultCh <- task
}
}
} else {
distinctHandles = kv.NewHandleMap()
}

for task := range fetchCh {
handles := task.handles
if len(handles) == 0 {
continue
}
fhs := make([]kv.Handle, 0, 8)
for _, h := range handles {
if _, ok := distinctHandles.Get(h); !ok {
fhs = append(fhs, h)
distinctHandles.Set(h, true)
_, ok := handles[0].(kv.PartitionHandle)
if w.processGlobalIndex && ok {
for _, ph := range handles {
h := ph.(kv.PartitionHandle).Handle
pid := ph.(kv.PartitionHandle).PartitionID
if _, ok := w.handleMaps[pid].Get(h); !ok {
if pid == w.physicalTableID {
fhs = append(fhs, h)
}
w.handleMaps[pid].Set(h, true)
}
}
} else {
for _, h := range handles {
if _, ok := distinctHandles.Get(h); !ok {
fhs = append(fhs, h)
distinctHandles.Set(h, true)
}
}
}
if len(fhs) == 0 {
Expand Down Expand Up @@ -564,6 +641,7 @@ type partialIndexWorker struct {
batchSize int
maxBatchSize int
maxChunkSize int
global bool
}

func (w *partialIndexWorker) fetchHandles(
Expand All @@ -574,7 +652,7 @@ func (w *partialIndexWorker) fetchHandles(
resultCh chan<- *lookupTableTask,
finished <-chan struct{},
handleCols plannercore.HandleCols) (count int64, err error) {
chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize)
chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypesOfPartitionedTableIndex(w.global), w.maxChunkSize)
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols)
if err != nil {
Expand Down Expand Up @@ -615,7 +693,9 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return handles, retChk, nil
}
for i := 0; i < chk.NumRows(); i++ {
handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i))
var handle kv.Handle
var err error
handle, err = handleCols.BuildHandleFromPartitionedTableIndexRow(chk.GetRow(i), w.global)
if err != nil {
return nil, nil, err
}
Expand Down
68 changes: 68 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,71 @@ partition p2 values less than (10))`)
tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)")
tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9"))
}

func (s *globalIndexSuite) TestIndexMergeReaderWithGlobalIndex(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1, t2")
tk.MustExec(`create table t1 (id int primary key, a int, b int, c int, d int)
partition by range (id) (
partition p0 values less than (3),
partition p1 values less than (6),
partition p2 values less than (10)
)`)
// Global index on a
tk.MustExec("create unique index t1a on t1(a)")
// Local index on b
tk.MustExec("create index t1b on t1(b)")
tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)")
tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ * from t1 where id < 2 or a > 4 order by id").Check(testkit.Rows("1 1 1 1 1",
"5 5 5 5 5"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ a from t1 where id < 2 or a > 4 order by a").Check(testkit.Rows("1",
"5"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, t1a) */ sum(a) from t1 where id < 2 or a > 4").Check(testkit.Rows("6"))
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ * from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1 1 1 1 1",
"5 5 5 5 5"))
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ a from t1 where a < 2 or b > 4 order by a").Check(testkit.Rows("1",
"5"))
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(a) from t1 where a < 2 or b > 4").Check(testkit.Rows("6"))

tk.MustExec("drop table if exists t1, t2")
tk.MustExec(`create table t1 (id int primary key, a int, b int, c int, d int)
partition by range (id) (
partition p0 values less than (4),
partition p1 values less than (7),
partition p2 values less than (10)
)`)
// Global index on a
tk.MustExec("create unique index t1a on t1(a)")
// Local index on b
tk.MustExec("create index t1b on t1(b)")
tk.MustExec(`create table t2 (id int primary key, a int)
partition by range (id) (
partition p0 values less than (4),
partition p1 values less than (7),
partition p2 values less than (10)
)`)
tk.MustExec("create unique index t2a on t2(a)")
tk.MustExec("insert into t1 values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5)")
tk.MustExec("insert into t2 values(1,1),(5,5)")
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 4").Check(testkit.Rows("6"))
tk.MustQuery("select /*+ use_index_merge(t1, t1a, t1b) */ sum(t1.a) from t1 join t2 on t1.id = t2.id where t1.a < 2 or t1.b > 5").Check(testkit.Rows("1"))
}

func (s *globalIndexSuite) TestPartitionIndexJoinWithGlobalIndex(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists p, t")
tk.MustExec(`create table p (id int, c int) partition by range (c) (
partition p0 values less than (4),
partition p1 values less than (7),
partition p2 values less than (10))`)
tk.MustExec(`create unique index i_id on p(id)`)
tk.MustExec(`create unique index i_c on p(c)`)
tk.MustExec("create table t (id int)")
tk.MustExec("insert into p values (3,3), (4,4), (6,6), (9,9)")
tk.MustExec("insert into t values (4), (9)")

// Build indexLookUp in index join
tk.MustQuery("select /*+ INL_JOIN(p) */ * from p, t where p.id = t.id").Sort().Check(testkit.Rows("4 4 4", "9 9 9"))
// Build index reader in index join
tk.MustQuery("select /*+ INL_JOIN(p) */ p.id from p, t where p.id = t.id").Check(testkit.Rows("4", "9"))
}
Loading