Skip to content

Commit

Permalink
planner: implement the BatchPointGetPlan to improve the BatchPointGet…
Browse files Browse the repository at this point in the history
… performance (#12322)
  • Loading branch information
lonng authored and sre-bot committed Oct 10, 2019
1 parent 729f3e4 commit c0d6185
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 137 deletions.
77 changes: 37 additions & 40 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildExplain(v)
case *plannercore.PointGetPlan:
return b.buildPointGet(v)
case *plannercore.BatchPointGetPlan:
return b.buildBatchPointGet(v)
case *plannercore.Insert:
return b.buildInsert(v)
case *plannercore.LoadData:
Expand Down Expand Up @@ -1308,46 +1310,6 @@ func (b *executorBuilder) buildMaxOneRow(v *plannercore.PhysicalMaxOneRow) Execu
}

func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executor {
if v.IsPointGetUnion {
startTS, err := b.getStartTS()
if err != nil {
b.err = err
return nil
}
children := v.Children()
// It's OK to type assert here because `v.IsPointGetUnion == true` only if all children are PointGet
pointGet := children[0].(*plannercore.PointGetPlan)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
tblInfo: pointGet.TblInfo,
idxInfo: pointGet.IndexInfo,
startTS: startTS,
}
if pointGet.IndexInfo != nil {
idxVals := make([][]types.Datum, len(children))
for i, child := range children {
idxVals[i] = child.(*plannercore.PointGetPlan).IndexValues
}
e.idxVals = idxVals
} else {
// `SELECT a FROM t WHERE a IN (1, 1, 2, 1, 2)` should not return duplicated rows
handles := make([]int64, 0, len(children))
dedup := make(map[int64]struct{}, len(children))
for _, child := range children {
handle := child.(*plannercore.PointGetPlan).Handle
if _, found := dedup[handle]; found {
continue
}
dedup[handle] = struct{}{}
handles = append(handles, handle)
}
e.handles = handles
}
e.base().initCap = len(children)
e.base().maxChunkSize = len(children)
return e
}

childExecs := make([]Executor, len(v.Children()))
for i, child := range v.Children() {
childExecs[i] = b.build(child)
Expand Down Expand Up @@ -2285,6 +2247,41 @@ func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor
return e
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
startTS, err := b.getStartTS()
if err != nil {
b.err = err
return nil
}
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ExplainID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
startTS: startTS,
}
var capacity int
if plan.IndexInfo != nil {
e.idxVals = plan.IndexValues
capacity = len(e.idxVals)
} else {
// `SELECT a FROM t WHERE a IN (1, 1, 2, 1, 2)` should not return duplicated rows
handles := make([]int64, 0, len(plan.Handles))
dedup := make(map[int64]struct{}, len(plan.Handles))
for _, handle := range plan.Handles {
if _, found := dedup[handle]; found {
continue
}
dedup[handle] = struct{}{}
handles = append(handles, handle)
}
e.handles = handles
capacity = len(e.handles)
}
e.base().initCap = capacity
e.base().maxChunkSize = capacity
return e
}

func getPhysicalTableID(t table.Table) int64 {
if p, ok := t.(table.PhysicalTable); ok {
return p.GetPhysicalID()
Expand Down
20 changes: 20 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,26 @@ func (e *Execute) rebuildRange(p Plan) error {
}
}
return nil
case *BatchPointGetPlan:
for i, param := range x.HandleParams {
if param != nil {
x.Handles[i], err = param.Datum.ToInt64(sc)
if err != nil {
return err
}
return nil
}
}
for i, params := range x.IndexValueParams {
if len(params) < 1 {
continue
}
for j, param := range params {
if param != nil {
x.IndexValues[i][j] = param.Datum
}
}
}
case PhysicalPlan:
for _, child := range x.Children() {
err = e.rebuildRange(child)
Expand Down
11 changes: 11 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package core

import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
)

const (
Expand Down Expand Up @@ -483,6 +485,15 @@ func (p PhysicalIndexHashJoin) Init(ctx sessionctx.Context) *PhysicalIndexHashJo
return &p
}

// Init initializes BatchPointGetPlan.
func (p BatchPointGetPlan) Init(ctx sessionctx.Context, stats *property.StatsInfo, schema *expression.Schema, names []*types.FieldName) *BatchPointGetPlan {
p.basePlan = newBasePlan(ctx, "Batch_Point_Get", 0)
p.schema = schema
p.names = names
p.stats = stats
return &p
}

// flattenPushDownPlan converts a plan tree to a list, whose head is the leaf node like table scan.
func flattenPushDownPlan(p PhysicalPlan) []PhysicalPlan {
plans := make([]PhysicalPlan, 0, 5)
Expand Down
12 changes: 1 addition & 11 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
_ PhysicalPlan = &PhysicalMergeJoin{}
_ PhysicalPlan = &PhysicalUnionScan{}
_ PhysicalPlan = &PhysicalWindow{}
_ PhysicalPlan = &BatchPointGetPlan{}
)

// PhysicalTableReader is the table reader in tidb.
Expand Down Expand Up @@ -336,17 +337,6 @@ type PhysicalLimit struct {
// PhysicalUnionAll is the physical operator of UnionAll.
type PhysicalUnionAll struct {
physicalSchemaProducer
// IsPointGetUnion indicates all the children are PointGet and
// all of them reference the same table and use the same `unique key`
IsPointGetUnion bool
}

// OutputNames returns the outputting names of each column.
func (p *PhysicalUnionAll) OutputNames() []*types.FieldName {
if p.IsPointGetUnion {
return p.children[0].OutputNames()
}
return p.physicalSchemaProducer.OutputNames()
}

// AggregationType stands for the mode of aggregation plan.
Expand Down
Loading

0 comments on commit c0d6185

Please sign in to comment.