From 44e6f61a86d8fc567b31c2ac0a665ee3b56a74c7 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Tue, 15 Sep 2020 10:31:43 +0800 Subject: [PATCH 1/6] support runtime prune for index join inner table --- executor/builder.go | 75 ++++++++++++++++++++++++++++++++--- executor/index_lookup_join.go | 3 +- table/tables/partition.go | 37 +++++++++++++++-- 3 files changed, 104 insertions(+), 11 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 5f6154c1ee6d3..d0f28e1d26740 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2590,6 +2590,71 @@ func buildPartitionTable(b *executorBuilder, tblInfo *model.TableInfo, partition }, nil } +func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.TableInfo, partitionInfo *plannercore.PartitionInfo, + lookUpContent []*indexJoinLookUpContent, e Executor, n nextPartition) (Executor, error) { + tbl, _ := b.is.TableByID(tblInfo.ID) + partitionTbl := tbl.(table.PartitionedTable) + locateKey := make([]types.Datum, e.Schema().Len()) + // TODO: condition based pruning can be do in advance. + condPruneResult, err := partitionPruning(b.ctx, partitionTbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) + if err != nil { + return nil, err + } + + // check whether can runtime prune. + type partitionExpr interface { + PartitionExpr() (*tables.PartitionExpr, error) + } + pe, err := tbl.(partitionExpr).PartitionExpr() + if err != nil { + return nil, err + } + offsetMap := make(map[int]bool) + for _, offset := range lookUpContent[0].keyCols { + offsetMap[offset] = true + } + for _, offset := range pe.ColumnOffset { + if _, ok := offsetMap[offset]; !ok { + logutil.BgLogger().Warn("can not runtime prune in index join") + return &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: condPruneResult, + nextPartition: n, + }, nil + } + } + + partitions := make(map[int64]table.PhysicalTable) + for _, content := range lookUpContent { + for i, date := range content.keys { + locateKey[content.keyCols[i]] = date + } + p, err := partitionTbl.GetPartitionByRow(b.ctx, locateKey) + if err != nil { + return nil, err + } + if _, ok := partitions[p.GetPhysicalID()]; !ok { + partitions[p.GetPhysicalID()] = p + } + } + + usedPartition := make([]table.PhysicalTable, 0, len(partitions)) + for _, p := range condPruneResult { + if _, ok := partitions[p.GetPhysicalID()]; ok { + usedPartition = append(usedPartition, p) + } + } + + if len(usedPartition) == 0 { + return &TableDualExec{baseExecutor: *e.base()}, nil + } + return &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: usedPartition, + nextPartition: n, + }, nil +} + func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV) if err != nil { @@ -3039,7 +3104,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) }) nextPartition := nextPartitionForTableReader{e} - return buildPartitionTable(builder.executorBuilder, tbInfo, &v.PartitionInfo, e, nextPartition) + return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) } handles := make([]kv.Handle, 0, len(lookUpContents)) for _, content := range lookUpContents { @@ -3062,10 +3127,9 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles) } - e.kvRangeBuilder = kvRangeBuilderFromHandles(handles) nextPartition := nextPartitionForTableReader{e} - return buildPartitionTable(builder.executorBuilder, tbInfo, &v.PartitionInfo, e, nextPartition) + return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) } type kvRangeBuilderFromFunc func(pid int64) ([]kv.KeyRange, error) @@ -3153,7 +3217,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte return nil, err } nextPartition := nextPartitionForIndexReader{exec: e} - ret, err := buildPartitionTable(builder.executorBuilder, tbInfo, &v.PartitionInfo, e, nextPartition) + ret, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) if err != nil { return nil, err } @@ -3177,13 +3241,12 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context err = e.open(ctx) return e, err } - e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } nextPartition := nextPartitionForIndexLookUp{exec: e} - ret, err := buildPartitionTable(builder.executorBuilder, tbInfo, &v.PartitionInfo, e, nextPartition) + ret, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) if err != nil { return nil, err } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 8dde95a671c5b..5004730badaec 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -494,6 +494,7 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { type indexJoinLookUpContent struct { keys []types.Datum row chunk.Row + keyCols []int } func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { @@ -558,7 +559,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi // dLookUpKey is sorted and deduplicated at sortAndDedupLookUpContents. // So we don't need to do it here. } - lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: chk.GetRow(rowIdx)}) + lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: chk.GetRow(rowIdx), keyCols:iw.keyCols}) } } diff --git a/table/tables/partition.go b/table/tables/partition.go index 4cfe8092d1315..dd05a4b275641 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -132,6 +132,8 @@ type PartitionExpr struct { *ForRangePruning // Used in the range column pruning process. *ForRangeColumnsPruning + // ColOffset is the offsets of partition columns. + ColumnOffset []int } func initEvalBufferType(t *partitionedTable) { @@ -292,12 +294,28 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, UpperBounds: locateExprs, } + // build column offset. + partExp := pi.Expr + if len(pi.Columns) == 1 { + partExp = pi.Columns[0].L + } + exprs, err := parseSimpleExprWithNames(p, ctx, partExp, schema, names) + if err != nil { + return nil, err + } + partitionCols := expression.ExtractColumns(exprs) + offset := make([]int, len(partitionCols)) + for i, col := range columns { + for j, partitionCol := range partitionCols { + if partitionCol.UniqueID == col.UniqueID { + offset[j] = i + } + } + } + ret.ColumnOffset = offset + switch len(pi.Columns) { case 0: - exprs, err := parseSimpleExprWithNames(p, ctx, pi.Expr, schema, names) - if err != nil { - return nil, err - } tmp, err := dataForRangePruning(ctx, pi) if err != nil { return nil, errors.Trace(err) @@ -330,10 +348,21 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, logutil.BgLogger().Error("wrong table partition expression", zap.String("expression", pi.Expr), zap.Error(err)) return nil, errors.Trace(err) } + // build column offset. + partitionCols := expression.ExtractColumns(exprs) + offset := make([]int, len(partitionCols)) + for i, col := range columns { + for j, partitionCol := range partitionCols { + if partitionCol.UniqueID == col.UniqueID { + offset[j] = i + } + } + } exprs.HashCode(ctx.GetSessionVars().StmtCtx) return &PartitionExpr{ Expr: exprs, OrigExpr: origExpr, + ColumnOffset: offset, }, nil } From c20e2db9d02af1016c9b97cf1868b3136687be95 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Tue, 15 Sep 2020 10:43:39 +0800 Subject: [PATCH 2/6] fix make dev --- executor/index_lookup_join.go | 6 +++--- table/tables/partition.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 5004730badaec..be972aade587c 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -492,8 +492,8 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { } type indexJoinLookUpContent struct { - keys []types.Datum - row chunk.Row + keys []types.Datum + row chunk.Row keyCols []int } @@ -559,7 +559,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi // dLookUpKey is sorted and deduplicated at sortAndDedupLookUpContents. // So we don't need to do it here. } - lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: chk.GetRow(rowIdx), keyCols:iw.keyCols}) + lookUpContents = append(lookUpContents, &indexJoinLookUpContent{keys: dLookUpKey, row: chk.GetRow(rowIdx), keyCols: iw.keyCols}) } } diff --git a/table/tables/partition.go b/table/tables/partition.go index dd05a4b275641..bcb5e6a2b28c2 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -360,8 +360,8 @@ func generateHashPartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, } exprs.HashCode(ctx.GetSessionVars().StmtCtx) return &PartitionExpr{ - Expr: exprs, - OrigExpr: origExpr, + Expr: exprs, + OrigExpr: origExpr, ColumnOffset: offset, }, nil } From 01c7d8e81bea7f125e918d69793fde8fdfde51a1 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Fri, 18 Sep 2020 15:40:16 +0800 Subject: [PATCH 3/6] divide range into each partition according to datum --- executor/builder.go | 162 ++++++++++++++++++++++++++++++------ executor/partition_table.go | 38 ++++++++- 2 files changed, 172 insertions(+), 28 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index d0f28e1d26740..a5901196af753 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2563,7 +2563,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } } - nextPartition := nextPartitionForTableReader{ret} + nextPartition := nextPartitionForTableReader{exec: ret} exec, err := buildPartitionTable(b, ts.Table, &v.PartitionInfo, ret, nextPartition) if err != nil { b.err = err @@ -2591,23 +2591,22 @@ func buildPartitionTable(b *executorBuilder, tblInfo *model.TableInfo, partition } func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.TableInfo, partitionInfo *plannercore.PartitionInfo, - lookUpContent []*indexJoinLookUpContent, e Executor, n nextPartition) (Executor, error) { + lookUpContent []*indexJoinLookUpContent, e Executor, n innerNextPartition) (*PartitionTableExecutor, []int64, error) { tbl, _ := b.is.TableByID(tblInfo.ID) partitionTbl := tbl.(table.PartitionedTable) locateKey := make([]types.Datum, e.Schema().Len()) // TODO: condition based pruning can be do in advance. condPruneResult, err := partitionPruning(b.ctx, partitionTbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) if err != nil { - return nil, err + return nil, nil, err } - // check whether can runtime prune. type partitionExpr interface { PartitionExpr() (*tables.PartitionExpr, error) } pe, err := tbl.(partitionExpr).PartitionExpr() if err != nil { - return nil, err + return nil, nil, err } offsetMap := make(map[int]bool) for _, offset := range lookUpContent[0].keyCols { @@ -2615,27 +2614,30 @@ func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.Tabl } for _, offset := range pe.ColumnOffset { if _, ok := offsetMap[offset]; !ok { + n.GetInnerPartitionInfo().isFullPartition = true logutil.BgLogger().Warn("can not runtime prune in index join") return &PartitionTableExecutor{ baseExecutor: *e.base(), partitions: condPruneResult, nextPartition: n, - }, nil + }, nil, nil } } partitions := make(map[int64]table.PhysicalTable) - for _, content := range lookUpContent { + contentPos := make([]int64, len(lookUpContent)) + for idx, content := range lookUpContent { for i, date := range content.keys { locateKey[content.keyCols[i]] = date } p, err := partitionTbl.GetPartitionByRow(b.ctx, locateKey) if err != nil { - return nil, err + return nil, nil, err } if _, ok := partitions[p.GetPhysicalID()]; !ok { partitions[p.GetPhysicalID()] = p } + contentPos[idx] = p.GetPhysicalID() } usedPartition := make([]table.PhysicalTable, 0, len(partitions)) @@ -2644,15 +2646,11 @@ func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.Tabl usedPartition = append(usedPartition, p) } } - - if len(usedPartition) == 0 { - return &TableDualExec{baseExecutor: *e.base()}, nil - } return &PartitionTableExecutor{ baseExecutor: *e.base(), partitions: usedPartition, nextPartition: n, - }, nil + }, contentPos, nil } func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { @@ -3103,10 +3101,41 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte e.kvRangeBuilder = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { return buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) }) - nextPartition := nextPartitionForTableReader{e} - return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + + nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} + // return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + if err != nil { + return nil, err + } + if len(partitionExec.partitions) != 0 { + if !nextPartition.isFullPartition { + nextPartition.rangeBuilders = make(map[int64]kvRangeBuilder) + contentBucket := make(map[int64][]*indexJoinLookUpContent) + for _, p := range partitionExec.partitions { + contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) + } + for i, pos := range lookUpContentMap { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) + } + } + for _, p := range partitionExec.partitions { + //rangeBuilder := kvRangeBuilderFromHandles(contentBucket[p.GetPhysicalID()]) + //nextPartition.rangeBuilders = append(nextPartition.rangeBuilders, rangeBuilder) + nextPartition.rangeBuilders[p.GetPhysicalID()] = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { + return buildKvRangesForIndexJoin(e.ctx, pid, -1, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + }) + } + partitionExec.nextPartition = nextPartition + } + return partitionExec, nil + } + ret := &TableDualExec{baseExecutor: *e.base()} + return ret, nil } handles := make([]kv.Handle, 0, len(lookUpContents)) + validLookUpContents := make([]*indexJoinLookUpContent, 0, len(lookUpContents)) for _, content := range lookUpContents { isValidHandle := true handle := kv.IntHandle(content.keys[0].GetInt64()) @@ -3118,6 +3147,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte } if isValidHandle { handles = append(handles, handle) + validLookUpContents = append(validLookUpContents, content) } } @@ -3128,8 +3158,34 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles) } e.kvRangeBuilder = kvRangeBuilderFromHandles(handles) - nextPartition := nextPartitionForTableReader{e} - return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} + // return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, validLookUpContents, e, nextPartition) + if err != nil { + return nil, err + } + if len(partitionExec.partitions) != 0 { + if !nextPartition.isFullPartition { + nextPartition.rangeBuilders = make(map[int64]kvRangeBuilder) + contentBucket := make(map[int64][]kv.Handle) + for _, p := range partitionExec.partitions { + contentBucket[p.GetPhysicalID()] = make([]kv.Handle, 0, 8) + } + for i, pos := range lookUpContentMap { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], handles[i]) + } + } + for _, p := range partitionExec.partitions { + rangeBuilder := kvRangeBuilderFromHandles(contentBucket[p.GetPhysicalID()]) + nextPartition.rangeBuilders[p.GetPhysicalID()] = rangeBuilder + } + partitionExec.nextPartition = nextPartition + } + return partitionExec, nil + } + ret := &TableDualExec{baseExecutor: *e.base()} + return ret, nil } type kvRangeBuilderFromFunc func(pid int64) ([]kv.KeyRange, error) @@ -3212,15 +3268,41 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte return e, err } - e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + nextPartition := nextPartitionForIndexReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} + partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) if err != nil { return nil, err } - nextPartition := nextPartitionForIndexReader{exec: e} - ret, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) - if err != nil { - return nil, err + if len(partitionExec.partitions) != 0 { + if !nextPartition.isFullPartition { + contentBucket := make(map[int64][]*indexJoinLookUpContent) + for _, p := range partitionExec.partitions { + contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) + } + for i, pos := range lookUpContentMap { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) + } + } + nextRange := make(map[int64][]*ranger.Range) + for _, p := range partitionExec.partitions { + ranges, err := buildRangesForIndexJoin(e.ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err + } + nextRange[p.GetPhysicalID()] = ranges + } + nextPartition.nextRange = nextRange + } else { + e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err + } + } + err = partitionExec.Open(ctx) + return partitionExec, err } + ret := &TableDualExec{baseExecutor: *e.base()} err = ret.Open(ctx) return ret, err } @@ -3241,15 +3323,41 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context err = e.open(ctx) return e, err } - e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + nextPartition := nextPartitionForIndexLookUp{exec: e, innerPartitionInfo: &innerPartitionInfo{}} + partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) if err != nil { return nil, err } - nextPartition := nextPartitionForIndexLookUp{exec: e} - ret, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) - if err != nil { - return nil, err + if len(partitionExec.partitions) != 0 { + if !nextPartition.isFullPartition { + contentBucket := make(map[int64][]*indexJoinLookUpContent) + for _, p := range partitionExec.partitions { + contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) + } + for i, pos := range lookUpContentMap { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) + } + } + nextRange := make(map[int64][]*ranger.Range) + for _, p := range partitionExec.partitions { + ranges, err := buildRangesForIndexJoin(e.ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err + } + nextRange[p.GetPhysicalID()] = ranges + } + nextPartition.nextRange = nextRange + } else { + e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err + } + } + err = partitionExec.Open(ctx) + return partitionExec, err } + ret := &TableDualExec{baseExecutor: *e.base()} err = ret.Open(ctx) return ret, err } diff --git a/executor/partition_table.go b/executor/partition_table.go index 6989c0eef7a7f..4840aa10abc59 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -22,6 +22,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tipb/go-tipb" ) @@ -40,13 +41,32 @@ type nextPartition interface { nextPartition(context.Context, table.PhysicalTable) (Executor, error) } +type innerPartitionInfo struct { + isFullPartition bool + nextRange map[int64][]*ranger.Range +} + +type innerNextPartition interface { + nextPartition + GetInnerPartitionInfo() *innerPartitionInfo +} + type nextPartitionForTableReader struct { - exec *TableReaderExecutor + *innerPartitionInfo + rangeBuilders map[int64]kvRangeBuilder + exec *TableReaderExecutor +} + +func (n nextPartitionForTableReader) GetInnerPartitionInfo() *innerPartitionInfo { + return n.innerPartitionInfo } func (n nextPartitionForTableReader) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) { n.exec.table = tbl n.exec.kvRanges = n.exec.kvRanges[:0] + if n.innerPartitionInfo != nil && !n.isFullPartition { + n.exec.kvRangeBuilder = n.rangeBuilders[tbl.GetPhysicalID()] + } if err := updateDAGRequestTableID(ctx, n.exec.dagPB, tbl.Meta().ID, tbl.GetPhysicalID()); err != nil { return nil, err } @@ -54,22 +74,38 @@ func (n nextPartitionForTableReader) nextPartition(ctx context.Context, tbl tabl } type nextPartitionForIndexLookUp struct { + *innerPartitionInfo exec *IndexLookUpExecutor } +func (n nextPartitionForIndexLookUp) GetInnerPartitionInfo() *innerPartitionInfo { + return n.innerPartitionInfo +} + func (n nextPartitionForIndexLookUp) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) { n.exec.table = tbl + if n.innerPartitionInfo != nil && !n.isFullPartition { + n.exec.ranges = n.nextRange[tbl.GetPhysicalID()] + } return n.exec, nil } type nextPartitionForIndexReader struct { + *innerPartitionInfo exec *IndexReaderExecutor } +func (n nextPartitionForIndexReader) GetInnerPartitionInfo() *innerPartitionInfo { + return n.innerPartitionInfo +} + func (n nextPartitionForIndexReader) nextPartition(ctx context.Context, tbl table.PhysicalTable) (Executor, error) { exec := n.exec exec.table = tbl exec.physicalTableID = tbl.GetPhysicalID() + if n.innerPartitionInfo != nil && !n.isFullPartition { + exec.ranges = n.nextRange[tbl.GetPhysicalID()] + } return exec, nil } From 10ee034f4fc29b36dddfb33cd8c8d6a92d2da696 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Fri, 18 Sep 2020 16:27:14 +0800 Subject: [PATCH 4/6] fix ci --- executor/partition_table.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/partition_table.go b/executor/partition_table.go index 4840aa10abc59..88ac2ad769b4c 100644 --- a/executor/partition_table.go +++ b/executor/partition_table.go @@ -42,8 +42,8 @@ type nextPartition interface { } type innerPartitionInfo struct { - isFullPartition bool - nextRange map[int64][]*ranger.Range + isFullPartition bool + nextRange map[int64][]*ranger.Range } type innerNextPartition interface { From 8723aba26cbf6cce2a055b7e957c820d28c04e7e Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Thu, 24 Sep 2020 17:31:15 +0800 Subject: [PATCH 5/6] clean code --- executor/builder.go | 234 ++++++++++++++++++++++++-------------------- 1 file changed, 126 insertions(+), 108 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index a5901196af753..e20462d7de9ab 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2590,23 +2590,74 @@ func buildPartitionTable(b *executorBuilder, tblInfo *model.TableInfo, partition }, nil } -func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.TableInfo, partitionInfo *plannercore.PartitionInfo, - lookUpContent []*indexJoinLookUpContent, e Executor, n innerNextPartition) (*PartitionTableExecutor, []int64, error) { - tbl, _ := b.is.TableByID(tblInfo.ID) +func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64, + lookUpContent []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64][]*ranger.Range, error) { + contentBucket := make(map[int64][]*indexJoinLookUpContent) + for _, p := range usedPartitions { + contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) + } + for i, pos := range contentPos { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], lookUpContent[i]) + } + } + nextRange := make(map[int64][]*ranger.Range) + for _, p := range usedPartitions { + ranges, err := buildRangesForIndexJoin(ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err + } + nextRange[p.GetPhysicalID()] = ranges + } + return nextRange, nil +} + +func buildKVRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table.PhysicalTable, contentPos []int64, isCommonHandle bool, + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (map[int64]kvRangeBuilder, error) { + rangeBuilders := make(map[int64]kvRangeBuilder) + contentBucket := make(map[int64][]*indexJoinLookUpContent) + for _, p := range usedPartitions { + contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) + } + for i, pos := range contentPos { + if _, ok := contentBucket[pos]; ok { + contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) + } + } + for _, p := range usedPartitions { + if isCommonHandle { + rangeBuilders[p.GetPhysicalID()] = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { + return buildKvRangesForIndexJoin(ctx, pid, -1, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + }) + } else { + handles := make([]kv.Handle, 0, len(contentBucket[p.GetPhysicalID()])) + for _, content := range contentBucket[p.GetPhysicalID()] { + handle := kv.IntHandle(content.keys[0].GetInt64()) + handles = append(handles, handle) + } + rangeBuilders[p.GetPhysicalID()] = kvRangeBuilderFromHandles(handles) + } + } + return rangeBuilders, nil +} + +func prunePartitionForInnerExecutor(ctx sessionctx.Context, tbl table.Table, schema *expression.Schema, partitionInfo *plannercore.PartitionInfo, + lookUpContent []*indexJoinLookUpContent) (usedPartition []table.PhysicalTable, canPrune bool, contentPos []int64, err error) { partitionTbl := tbl.(table.PartitionedTable) - locateKey := make([]types.Datum, e.Schema().Len()) + locateKey := make([]types.Datum, schema.Len()) // TODO: condition based pruning can be do in advance. - condPruneResult, err := partitionPruning(b.ctx, partitionTbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) + condPruneResult, err := partitionPruning(ctx, partitionTbl, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) if err != nil { - return nil, nil, err + return nil, false, nil, err } + // check whether can runtime prune. type partitionExpr interface { PartitionExpr() (*tables.PartitionExpr, error) } pe, err := tbl.(partitionExpr).PartitionExpr() if err != nil { - return nil, nil, err + return nil, false, nil, err } offsetMap := make(map[int]bool) for _, offset := range lookUpContent[0].keyCols { @@ -2614,25 +2665,20 @@ func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.Tabl } for _, offset := range pe.ColumnOffset { if _, ok := offsetMap[offset]; !ok { - n.GetInnerPartitionInfo().isFullPartition = true logutil.BgLogger().Warn("can not runtime prune in index join") - return &PartitionTableExecutor{ - baseExecutor: *e.base(), - partitions: condPruneResult, - nextPartition: n, - }, nil, nil + return condPruneResult, false, nil, nil } } partitions := make(map[int64]table.PhysicalTable) - contentPos := make([]int64, len(lookUpContent)) + contentPos = make([]int64, len(lookUpContent)) for idx, content := range lookUpContent { for i, date := range content.keys { locateKey[content.keyCols[i]] = date } - p, err := partitionTbl.GetPartitionByRow(b.ctx, locateKey) + p, err := partitionTbl.GetPartitionByRow(ctx, locateKey) if err != nil { - return nil, nil, err + return nil, false, nil, err } if _, ok := partitions[p.GetPhysicalID()]; !ok { partitions[p.GetPhysicalID()] = p @@ -2640,17 +2686,13 @@ func buildPartitionTableForInnerExecutor(b *executorBuilder, tblInfo *model.Tabl contentPos[idx] = p.GetPhysicalID() } - usedPartition := make([]table.PhysicalTable, 0, len(partitions)) + usedPartition = make([]table.PhysicalTable, 0, len(partitions)) for _, p := range condPruneResult { if _, ok := partitions[p.GetPhysicalID()]; ok { usedPartition = append(usedPartition, p) } } - return &PartitionTableExecutor{ - baseExecutor: *e.base(), - partitions: usedPartition, - nextPartition: n, - }, contentPos, nil + return usedPartition, true, contentPos, nil } func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { @@ -3101,38 +3143,30 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte e.kvRangeBuilder = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { return buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) }) - - nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} - // return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) - partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}} + tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID) + usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, lookUpContents) if err != nil { return nil, err } - if len(partitionExec.partitions) != 0 { - if !nextPartition.isFullPartition { - nextPartition.rangeBuilders = make(map[int64]kvRangeBuilder) - contentBucket := make(map[int64][]*indexJoinLookUpContent) - for _, p := range partitionExec.partitions { - contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) - } - for i, pos := range lookUpContentMap { - if _, ok := contentBucket[pos]; ok { - contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) - } - } - for _, p := range partitionExec.partitions { - //rangeBuilder := kvRangeBuilderFromHandles(contentBucket[p.GetPhysicalID()]) - //nextPartition.rangeBuilders = append(nextPartition.rangeBuilders, rangeBuilder) - nextPartition.rangeBuilders[p.GetPhysicalID()] = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { - return buildKvRangesForIndexJoin(e.ctx, pid, -1, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) - }) + if len(usedPartition) != 0 { + if canPrune { + rangeBuilders, err := buildKVRangeForEachPartition(e.ctx, usedPartition, contentPos, v.IsCommonHandle, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err } - partitionExec.nextPartition = nextPartition + nextPartition.rangeBuilders = rangeBuilders + nextPartition.isFullPartition = false + } + partitionExec := &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: usedPartition, + nextPartition: nextPartition, } return partitionExec, nil } ret := &TableDualExec{baseExecutor: *e.base()} - return ret, nil + return ret, err } handles := make([]kv.Handle, 0, len(lookUpContents)) validLookUpContents := make([]*indexJoinLookUpContent, 0, len(lookUpContents)) @@ -3158,34 +3192,30 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles) } e.kvRangeBuilder = kvRangeBuilderFromHandles(handles) - nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} - // return buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) - partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, validLookUpContents, e, nextPartition) + nextPartition := nextPartitionForTableReader{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}} + tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID) + usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, validLookUpContents) if err != nil { return nil, err } - if len(partitionExec.partitions) != 0 { - if !nextPartition.isFullPartition { - nextPartition.rangeBuilders = make(map[int64]kvRangeBuilder) - contentBucket := make(map[int64][]kv.Handle) - for _, p := range partitionExec.partitions { - contentBucket[p.GetPhysicalID()] = make([]kv.Handle, 0, 8) - } - for i, pos := range lookUpContentMap { - if _, ok := contentBucket[pos]; ok { - contentBucket[pos] = append(contentBucket[pos], handles[i]) - } - } - for _, p := range partitionExec.partitions { - rangeBuilder := kvRangeBuilderFromHandles(contentBucket[p.GetPhysicalID()]) - nextPartition.rangeBuilders[p.GetPhysicalID()] = rangeBuilder + if len(usedPartition) != 0 { + if canPrune { + rangeBuilders, err := buildKVRangeForEachPartition(e.ctx, usedPartition, contentPos, v.IsCommonHandle, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err } - partitionExec.nextPartition = nextPartition + nextPartition.rangeBuilders = rangeBuilders + nextPartition.isFullPartition = false + } + partitionExec := &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: usedPartition, + nextPartition: nextPartition, } return partitionExec, nil } ret := &TableDualExec{baseExecutor: *e.base()} - return ret, nil + return ret, err } type kvRangeBuilderFromFunc func(pid int64) ([]kv.KeyRange, error) @@ -3268,37 +3298,31 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte return e, err } - nextPartition := nextPartitionForIndexReader{exec: e, innerPartitionInfo: &innerPartitionInfo{}} - partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + nextPartition := nextPartitionForIndexReader{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}} + tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID) + usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, lookUpContents) if err != nil { return nil, err } - if len(partitionExec.partitions) != 0 { - if !nextPartition.isFullPartition { - contentBucket := make(map[int64][]*indexJoinLookUpContent) - for _, p := range partitionExec.partitions { - contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) - } - for i, pos := range lookUpContentMap { - if _, ok := contentBucket[pos]; ok { - contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) - } - } - nextRange := make(map[int64][]*ranger.Range) - for _, p := range partitionExec.partitions { - ranges, err := buildRangesForIndexJoin(e.ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) - if err != nil { - return nil, err - } - nextRange[p.GetPhysicalID()] = ranges + if len(usedPartition) != 0 { + if canPrune { + rangeMap, err := buildIndexRangeForEachPartition(e.ctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err } - nextPartition.nextRange = nextRange + nextPartition.isFullPartition = false + nextPartition.nextRange = rangeMap } else { e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } } + partitionExec := &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: usedPartition, + nextPartition: nextPartition, + } err = partitionExec.Open(ctx) return partitionExec, err } @@ -3323,37 +3347,31 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context err = e.open(ctx) return e, err } - nextPartition := nextPartitionForIndexLookUp{exec: e, innerPartitionInfo: &innerPartitionInfo{}} - partitionExec, lookUpContentMap, err := buildPartitionTableForInnerExecutor(builder.executorBuilder, tbInfo, &v.PartitionInfo, lookUpContents, e, nextPartition) + nextPartition := nextPartitionForIndexLookUp{exec: e, innerPartitionInfo: &innerPartitionInfo{isFullPartition: true}} + tbl, _ := builder.executorBuilder.is.TableByID(tbInfo.ID) + usedPartition, canPrune, contentPos, err := prunePartitionForInnerExecutor(builder.executorBuilder.ctx, tbl, e.Schema(), &v.PartitionInfo, lookUpContents) if err != nil { return nil, err } - if len(partitionExec.partitions) != 0 { - if !nextPartition.isFullPartition { - contentBucket := make(map[int64][]*indexJoinLookUpContent) - for _, p := range partitionExec.partitions { - contentBucket[p.GetPhysicalID()] = make([]*indexJoinLookUpContent, 0, 8) - } - for i, pos := range lookUpContentMap { - if _, ok := contentBucket[pos]; ok { - contentBucket[pos] = append(contentBucket[pos], lookUpContents[i]) - } - } - nextRange := make(map[int64][]*ranger.Range) - for _, p := range partitionExec.partitions { - ranges, err := buildRangesForIndexJoin(e.ctx, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) - if err != nil { - return nil, err - } - nextRange[p.GetPhysicalID()] = ranges + if len(usedPartition) != 0 { + if canPrune { + rangeMap, err := buildIndexRangeForEachPartition(e.ctx, usedPartition, contentPos, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + if err != nil { + return nil, err } - nextPartition.nextRange = nextRange + nextPartition.isFullPartition = false + nextPartition.nextRange = rangeMap } else { e.ranges, err = buildRangesForIndexJoin(e.ctx, lookUpContents, indexRanges, keyOff2IdxOff, cwc) if err != nil { return nil, err } } + partitionExec := &PartitionTableExecutor{ + baseExecutor: *e.base(), + partitions: usedPartition, + nextPartition: nextPartition, + } err = partitionExec.Open(ctx) return partitionExec, err } From 583a96b64898d82002455bd9791fa083907edc89 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Mon, 28 Sep 2020 16:54:24 +0800 Subject: [PATCH 6/6] fix test --- executor/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/builder.go b/executor/builder.go index e20462d7de9ab..7e2b01ba4358f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2627,7 +2627,7 @@ func buildKVRangeForEachPartition(ctx sessionctx.Context, usedPartitions []table for _, p := range usedPartitions { if isCommonHandle { rangeBuilders[p.GetPhysicalID()] = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) { - return buildKvRangesForIndexJoin(ctx, pid, -1, contentBucket[p.GetPhysicalID()], indexRanges, keyOff2IdxOff, cwc) + return buildKvRangesForIndexJoin(ctx, pid, -1, contentBucket[pid], indexRanges, keyOff2IdxOff, cwc) }) } else { handles := make([]kv.Handle, 0, len(contentBucket[p.GetPhysicalID()]))