Skip to content

Commit

Permalink
planner, executor: fix the logic for inline projection of executors (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 12, 2023
1 parent a5ad6d3 commit 0fa3186
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 16 deletions.
27 changes: 26 additions & 1 deletion executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
},
}

childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema())
childrenUsedSchema := markChildrenUsedColsForTest(e.Schema(), e.children[0].Schema(), e.children[1].Schema())
defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
for i := uint(0); i < e.concurrency; i++ {
Expand All @@ -958,6 +958,31 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
return e
}

// markChildrenUsedColsForTest compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedColsForTest(outputSchema *expression.Schema, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputSchema.Columns {
markedOffsets[col.Index] = struct{}{}
}
prefixLen := 0
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
}
}
childrenUsed = append(childrenUsed, used)
}
for _, child := range childSchemas {
used := expression.GetUsedList(outputSchema.Columns, child)
childrenUsed = append(childrenUsed, used)
}
return
}

func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
opt1 := mockDataSourceParameters{
rows: casTest.rows,
Expand Down
45 changes: 37 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
end: v.Offset + v.Count,
}

childUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema())[0]
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
for i, used := range childUsedSchema {
if used {
Expand Down Expand Up @@ -1417,6 +1417,11 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
}
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}

e := &MergeJoinExec{
stmtCtx: b.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec),
Expand All @@ -1429,7 +1434,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
v.OtherConditions,
retTypes(leftExec),
retTypes(rightExec),
markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()),
markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema()),
false,
),
isOuterJoin: v.JoinType.IsOuterJoin(),
Expand Down Expand Up @@ -1558,7 +1563,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
probeNAKeColIdx[i] = probeNAKeys[i].Index
}
isNAJoin := len(v.LeftNAJoinKeys) > 0
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i] = &probeWorker{
hashJoinCtx: e.hashJoinCtx,
Expand Down Expand Up @@ -3039,10 +3048,22 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expression.Schema) (childrenUsed [][]bool) {
for _, child := range childSchema {
used := expression.GetUsedList(outputSchema.Columns, child)
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]bool) {
childrenUsed = make([][]bool, 0, len(childSchemas))
markedOffsets := make(map[int]struct{})
for _, col := range outputCols {
markedOffsets[col.Index] = struct{}{}
}
prefixLen := 0
for _, childSchema := range childSchemas {
used := make([]bool, len(childSchema.Columns))
for i := range childSchema.Columns {
if _, ok := markedOffsets[prefixLen+i]; ok {
used[i] = true
}
}
childrenUsed = append(childrenUsed, used)
prefixLen += childSchema.Len()
}
return
}
Expand Down Expand Up @@ -3220,7 +3241,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
lastColHelper: v.CompareFilters,
finished: &atomic.Value{},
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false)
outerKeyCols := make([]int, len(v.OuterJoinKeys))
for i := 0; i < len(v.OuterJoinKeys); i++ {
Expand Down Expand Up @@ -3342,7 +3367,11 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
keyOff2IdxOff: v.KeyOff2IdxOff,
lastColHelper: v.CompareFilters,
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
joiners := make([]joiner, e.ctx.GetSessionVars().IndexLookupJoinConcurrency())
for i := 0; i < len(joiners); i++ {
joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema, false)
Expand Down
2 changes: 2 additions & 0 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (*ImplLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalProperty
Offset: logicalLimit.Offset,
Count: logicalLimit.Count,
}.Init(logicalLimit.SCtx(), expr.Group.Prop.Stats, logicalLimit.SelectBlockOffset(), newProp)
physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone())
return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil
}

Expand Down Expand Up @@ -420,6 +421,7 @@ func (*ImplTopNAsLimit) OnImplement(expr *memo.GroupExpr, _ *property.PhysicalPr
Offset: lt.Offset,
Count: lt.Count,
}.Init(lt.SCtx(), expr.Group.Prop.Stats, lt.SelectBlockOffset(), newProp)
physicalLimit.SetSchema(expr.Group.Prop.Schema.Clone())
return []memo.Implementation{impl.NewLimitImpl(physicalLimit)}, nil
}

Expand Down
18 changes: 13 additions & 5 deletions planner/core/pb_to_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,17 @@ func NewPBPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, ranges
func (b *PBPlanBuilder) Build(executors []*tipb.Executor) (p PhysicalPlan, err error) {
var src PhysicalPlan
for i := 0; i < len(executors); i++ {
curr, err := b.pbToPhysicalPlan(executors[i])
curr, err := b.pbToPhysicalPlan(executors[i], src)
if err != nil {
return nil, errors.Trace(err)
}
if src != nil {
curr.SetChildren(src)
}
src = curr
}
_, src = b.predicatePushDown(src, nil)
return src, nil
}

func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err error) {
func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor, subPlan PhysicalPlan) (p PhysicalPlan, err error) {
switch e.Tp {
case tipb.ExecType_TypeTableScan:
p, err = b.pbToTableScan(e)
Expand All @@ -81,6 +78,17 @@ func (b *PBPlanBuilder) pbToPhysicalPlan(e *tipb.Executor) (p PhysicalPlan, err
// TODO: Support other types.
err = errors.Errorf("this exec type %v doesn't support yet", e.GetTp())
}
if subPlan != nil {
p.SetChildren(subPlan)
}
// The limit missed its output cols via the protobuf.
// We need to add it back and do a ResolveIndicies for the later inline projection.
if limit, ok := p.(*PhysicalLimit); ok {
limit.SetSchema(p.Children()[0].Schema().Clone())
for i, col := range limit.Schema().Columns {
col.Index = i
}
}
return p, err
}

Expand Down
74 changes: 72 additions & 2 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,33 @@ func (p *PhysicalHashJoin) ResolveIndicesItself() (err error) {
return err
}
}

mergedSchema := expression.MergeSchema(lSchema, rSchema)

for i, expr := range p.OtherConditions {
p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema)
if err != nil {
return err
}
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}

return
}

Expand Down Expand Up @@ -173,12 +194,32 @@ func (p *PhysicalMergeJoin) ResolveIndices() (err error) {
return err
}
}

mergedSchema := expression.MergeSchema(lSchema, rSchema)

for i, expr := range p.OtherConditions {
p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema))
p.OtherConditions[i], err = expr.ResolveIndices(mergedSchema)
if err != nil {
return err
}
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}
return
}

Expand Down Expand Up @@ -245,6 +286,24 @@ func (p *PhysicalIndexJoin) ResolveIndices() (err error) {
}
p.OuterHashKeys[i], p.InnerHashKeys[i] = outerKey.(*expression.Column), innerKey.(*expression.Column)
}

colsNeedResolving := p.schema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
colsNeedResolving--
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i := 0; i < colsNeedResolving; i++ {
newCol, err := p.schema.Columns[i].ResolveIndices(mergedSchema)
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}

return
}

Expand Down Expand Up @@ -607,6 +666,17 @@ func (p *PhysicalLimit) ResolveIndices() (err error) {
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
// To avoid that two plan shares the same column slice.
shallowColSlice := make([]*expression.Column, p.schema.Len())
copy(shallowColSlice, p.schema.Columns)
p.schema = expression.NewSchema(shallowColSlice...)
for i, col := range p.schema.Columns {
newCol, err := col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.schema.Columns[i] = newCol.(*expression.Column)
}
return
}

Expand Down

0 comments on commit 0fa3186

Please sign in to comment.