Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: append common handle columns into the schema of index merge table plan (#23933) #24042

Merged
merged 9 commits into from
Apr 15, 2021
9 changes: 7 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,13 +1027,18 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
cop.doubleReadNeedProj = true
cop.needExtraProj = true
}
}
}
// If inner cop task need keep order, the extraHandleCol should be set.
if cop.keepOrder && !ds.tableInfo.IsCommonHandle {
cop.extraHandleCol, cop.doubleReadNeedProj = ts.appendExtraHandleCol(ds)
var needExtraProj bool
cop.extraHandleCol, needExtraProj = ts.appendExtraHandleCol(ds)
cop.needExtraProj = cop.needExtraProj || needExtraProj
}
if cop.needExtraProj {
cop.originSchema = ds.schema
}
cop.tablePlan = ts
}
Expand Down
53 changes: 41 additions & 12 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,10 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
for _, partPath := range path.PartialIndexPaths {
var scan PhysicalPlan
var partialCost float64
var needExtraProj bool
if partPath.IsTablePath() {
scan, partialCost = ds.convertToPartialTableScan(prop, partPath)
scan, partialCost, needExtraProj = ds.convertToPartialTableScan(prop, partPath)
cop.needExtraProj = cop.needExtraProj || needExtraProj
} else {
scan, partialCost = ds.convertToPartialIndexScan(prop, partPath)
}
Expand All @@ -821,14 +823,18 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
if prop.ExpectedCnt < ds.stats.RowCount {
totalRowCount *= prop.ExpectedCnt / ds.stats.RowCount
}
ts, partialCost, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
ts, partialCost, needExtraProj, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
if err != nil {
return nil, err
}
cop.needExtraProj = cop.needExtraProj || needExtraProj
totalCost += partialCost
cop.tablePlan = ts
cop.idxMergePartPlans = scans
cop.cst = totalCost
if cop.needExtraProj {
cop.originSchema = ds.schema
}
task = cop.convertToRootTask(ds.ctx)
return task, nil
}
Expand Down Expand Up @@ -865,9 +871,18 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty,
}

func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, path *util.AccessPath) (
tablePlan PhysicalPlan,
partialCost float64) {
tablePlan PhysicalPlan, partialCost float64, needExtraProj bool) {
ts, partialCost, rowCount := ds.getOriginalPhysicalTableScan(prop, path, false)
if ds.tableInfo.IsCommonHandle {
commonHandle := ds.handleCols.(*CommonHandleCols)
for _, col := range commonHandle.columns {
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
needExtraProj = true
}
}
}
rowSize := ds.TblColHists.GetAvgRowSize(ds.ctx, ds.TblCols, false, false)
sessVars := ds.ctx.GetSessionVars()
if len(ts.filterCondition) > 0 {
Expand All @@ -880,15 +895,16 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty,
tablePlan.SetChildren(ts)
partialCost += rowCount * sessVars.CopCPUFactor
partialCost += selectivity * rowCount * rowSize * sessVars.NetworkFactor
return tablePlan, partialCost
return
}
partialCost += rowCount * rowSize * sessVars.NetworkFactor
tablePlan = ts
return tablePlan, partialCost
return
}

func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, error) {
func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, bool, error) {
var partialCost float64
var needExtraProj bool
sessVars := ds.ctx.GetSessionVars()
ts := PhysicalTableScan{
Table: ds.tableInfo,
Expand All @@ -907,10 +923,20 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
}
ts.HandleCols = NewIntHandleCols(handleCol)
}
if ds.tableInfo.IsCommonHandle {
commonHandle := ds.handleCols.(*CommonHandleCols)
for _, col := range commonHandle.columns {
if ts.schema.ColumnIndex(col) == -1 {
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
needExtraProj = true
}
}
}
var err error
ts.HandleCols, err = ts.HandleCols.ResolveIndices(ts.schema)
if err != nil {
return nil, 0, err
return nil, 0, false, err
}
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
Expand All @@ -934,9 +960,9 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
}
sel := PhysicalSelection{Conditions: tableFilters}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*totalRowCount), ts.blockOffset)
sel.SetChildren(ts)
return sel, partialCost, nil
return sel, partialCost, needExtraProj, nil
}
return ts, partialCost, nil
return ts, partialCost, needExtraProj, nil
}

func indexCoveringCol(col *expression.Column, indexCols []*expression.Column, idxColLens []int) bool {
Expand Down Expand Up @@ -1033,22 +1059,25 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
ts := cop.tablePlan.(*PhysicalTableScan)
ts.Schema().Append(col)
ts.Columns = append(ts.Columns, col.ToInfo())
cop.doubleReadNeedProj = true
cop.needExtraProj = true
}
}
}
if candidate.isMatchProp {
if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle {
col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds)
cop.extraHandleCol = col
cop.doubleReadNeedProj = isNew
cop.needExtraProj = cop.needExtraProj || isNew
}
cop.keepOrder = true
// IndexScan on partition table can't keep order.
if ds.tableInfo.GetPartitionInfo() != nil {
return invalidTask, nil
}
}
if cop.needExtraProj {
cop.originSchema = ds.schema
}
// prop.IsEmpty() would always return true when coming to here,
// so we can just use prop.ExpectedCnt as parameter of addPushedDownSelection.
finalStats := ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)
Expand Down
19 changes: 19 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,25 @@ func (s *testIntegrationSuite) TestIndexMergeTableFilter(c *C) {
))
}

func (s *testIntegrationSuite) TestIndexMergeClusterIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 float, c2 int, c3 int, primary key (c1) /*T![clustered_index] CLUSTERED */, key idx_1 (c2), key idx_2 (c3))")
tk.MustExec("insert into t values(1.0,1,2),(2.0,2,1),(3.0,1,1),(4.0,2,2)")
tk.MustQuery("select /*+ use_index_merge(t) */ c3 from t where c3 = 1 or c2 = 1").Sort().Check(testkit.Rows(
"1",
"1",
"2",
))
tk.MustExec("drop table t")
tk.MustExec("create table t (a int, b int, c int, primary key (a,b) /*T![clustered_index] CLUSTERED */, key idx_c(c))")
tk.MustExec("insert into t values (0,1,2)")
tk.MustQuery("select /*+ use_index_merge(t) */ c from t where c > 10 or a < 1").Check(testkit.Rows(
"2",
))
}

func (s *testIntegrationSuite) TestIssue23736(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
27 changes: 18 additions & 9 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ type copTask struct {
indexPlanFinished bool
// keepOrder indicates if the plan scans data by order.
keepOrder bool
// doubleReadNeedProj means an extra prune is needed because
// in double read case, it may output one more column for handle(row id).
doubleReadNeedProj bool
// needExtraProj means an extra prune is needed because
// in double read / index merge cases, they may output one more column for handle(row id).
needExtraProj bool
// originSchema is the target schema to be projected to when needExtraProj is true.
originSchema *expression.Schema

extraHandleCol *expression.Column
commonHandleCols []*expression.Column
Expand Down Expand Up @@ -883,8 +885,8 @@ func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers
newTask.cst += sortCPUCost
}
if t.doubleReadNeedProj {
schema := p.IndexPlans[0].(*PhysicalIndexScan).dataSourceSchema
if t.needExtraProj {
schema := t.originSchema
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.tablePlan.SelectBlockOffset(), nil)
proj.SetSchema(schema)
proj.SetChildren(p)
Expand Down Expand Up @@ -948,6 +950,13 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
p.PartitionInfo = t.partitionInfo
setTableScanToTableRowIDScan(p.tablePlan)
newTask.p = p
if t.needExtraProj {
schema := t.originSchema
proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.idxMergePartPlans[0].SelectBlockOffset(), nil)
proj.SetSchema(schema)
proj.SetChildren(p)
newTask.p = proj
}
return newTask
}
if t.indexPlan != nil && t.tablePlan != nil {
Expand Down Expand Up @@ -1706,13 +1715,13 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
// If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// the schema is the same as the original DataSource schema.
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
// the partial agg, and the schema will be broken.
cop.doubleReadNeedProj = false
cop.needExtraProj = false
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
Expand Down Expand Up @@ -1840,13 +1849,13 @@ func (p *PhysicalHashAgg) attach2Task(tasks ...task) task {
cop.finishIndexPlan()
partialAgg.SetChildren(cop.tablePlan)
cop.tablePlan = partialAgg
// If doubleReadNeedProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// If needExtraProj is true, a projection will be created above the PhysicalIndexLookUpReader to make sure
// the schema is the same as the original DataSource schema.
// However, we pushed down the agg here, the partial agg was placed on the top of tablePlan, and the final
// agg will be placed above the PhysicalIndexLookUpReader, and the schema will be set correctly for them.
// If we add the projection again, the projection will be between the PhysicalIndexLookUpReader and
// the partial agg, and the schema will be broken.
cop.doubleReadNeedProj = false
cop.needExtraProj = false
} else {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
Expand Down