Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Add special handling for consecutive SMJ
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Mar 28, 2022
1 parent 7343012 commit c56ef48
Showing 1 changed file with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ case class ColumnarCollapseCodegenStages(
p.right,
plan.projectList)
case p: ColumnarSortMergeJoinExec
if !skip_smj && plan.condition == null && !containsExpression(plan.projectList) =>
if !skip_smj && plan.condition == null && !containsExpression(plan.projectList)
&& !isConsecutiveSMJ(p) =>
ColumnarSortMergeJoinExec(
p.leftKeys,
p.rightKeys,
Expand All @@ -214,11 +215,27 @@ case class ColumnarCollapseCodegenStages(
case other => plan
}

/**
* To filter the case that a opeeration is SMJ and its children are also SMJ (TPC-DS q23b).
*/
def isConsecutiveSMJ(plan: SparkPlan): Boolean = {
plan match {
case p: ColumnarSortMergeJoinExec if p.left.isInstanceOf[ColumnarSortMergeJoinExec]
&& p.right.isInstanceOf[ColumnarSortMergeJoinExec] =>
true
case _ =>
false
}
}

/**
* Inserts an InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan): SparkPlan = {
plan match {
case p if isConsecutiveSMJ(p) =>
new ColumnarInputAdapter(p.withNewChildren(p.children.map(c =>
insertWholeStageCodegen(c))))
case p if !supportCodegen(p) =>
new ColumnarInputAdapter(insertWholeStageCodegen(p))
case p: ColumnarConditionProjectExec
Expand Down

0 comments on commit c56ef48

Please sign in to comment.