Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-27]Reuse exchage to optimize DPP performance (#28)
Browse files Browse the repository at this point in the history
* Reuse exchage to optimize DPP performance

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Since we use case class now, remove unnecessary 'new' word

1. besides, also changed ColumnarUnionExec as case class

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* Fix case class Exchange may break AQE issue

when AQE enabled, it only accept ShuffleExchangeExec and BroadcastExchangeExec, so if we use case class for ColumnarShuffleExchangeExec and ColumnarBroadcastExchangeExec, it will go exception in AQE
To fix this, we add a shadow class who extends ShuffleExchangeExec or BroadcastExchangeExec while actually call ColumnarShuffleExchangeExec and ColumnarBroadcastExchangeExec implementation

Then we will use supportAdaptive check to see if we should instantiate shadow class or case class to both support DPP and AQE

Signed-off-by: Chendi Xue <chendi.xue@intel.com>

* also check if sqlconf enabled AQE in supportAdaptive function

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
  • Loading branch information
xuechendi authored Jan 14, 2021
1 parent 8f01e2e commit 6a8e413
Show file tree
Hide file tree
Showing 13 changed files with 470 additions and 130 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] {
plan.left,
plan.right)
case plan: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
ColumnarBroadcastExchangeExec(plan.mode, plan.child)
case plan: BroadcastHashJoinExec =>
ColumnarBroadcastHashJoinExec(
plan.leftKeys,
Expand Down
79 changes: 52 additions & 27 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.internal.SQLConf

case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case RowGuard(child: CustomShuffleReaderExec) =>
Expand Down Expand Up @@ -62,18 +63,18 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
if (columnarPlan.isInstanceOf[ColumnarConditionProjectExec]) {
val cur_plan = columnarPlan.asInstanceOf[ColumnarConditionProjectExec]
new ColumnarConditionProjectExec(cur_plan.condition, plan.projectList, cur_plan.child)
ColumnarConditionProjectExec(cur_plan.condition, plan.projectList, cur_plan.child)
} else {
new ColumnarConditionProjectExec(null, plan.projectList, columnarPlan)
ColumnarConditionProjectExec(null, plan.projectList, columnarPlan)
}
case plan: FilterExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarConditionProjectExec(plan.condition, null, child)
ColumnarConditionProjectExec(plan.condition, null, child)
case plan: HashAggregateExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarHashAggregateExec(
ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
plan.aggregateExpressions,
Expand All @@ -84,32 +85,32 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case plan: UnionExec =>
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarUnionExec(children)
ColumnarUnionExec(children)
case plan: ExpandExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarExpandExec(plan.projections, plan.output, child)
ColumnarExpandExec(plan.projections, plan.output, child)
case plan: SortExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
child match {
case CoalesceBatchesExec(fwdChild: SparkPlan) =>
new ColumnarSortExec(plan.sortOrder, plan.global, fwdChild, plan.testSpillFrequency)
ColumnarSortExec(plan.sortOrder, plan.global, fwdChild, plan.testSpillFrequency)
case _ =>
new ColumnarSortExec(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
ColumnarSortExec(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
}
case plan: ShuffleExchangeExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
if ((child.supportsColumnar || columnarConf.enablePreferColumnar) && columnarConf.enableColumnarShuffle) {
if (SQLConf.get.adaptiveExecutionEnabled) {
new ColumnarShuffleExchangeExec(
if (isSupportAdaptive) {
new ColumnarShuffleExchangeAdaptor(
plan.outputPartitioning,
child,
plan.canChangeNumPartitions)
} else {
CoalesceBatchesExec(
new ColumnarShuffleExchangeExec(
ColumnarShuffleExchangeExec(
plan.outputPartitioning,
child,
plan.canChangeNumPartitions))
Expand All @@ -136,7 +137,10 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case plan: BroadcastExchangeExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarBroadcastExchangeExec(plan.mode, child)
if (isSupportAdaptive)
new ColumnarBroadcastExchangeAdaptor(plan.mode, child)
else
ColumnarBroadcastExchangeExec(plan.mode, child)
case plan: BroadcastHashJoinExec =>
if (columnarConf.enableColumnarBroadcastJoin) {
val left = replaceWithColumnarPlan(plan.left)
Expand All @@ -162,7 +166,7 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val right = replaceWithColumnarPlan(plan.right)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")

new ColumnarSortMergeJoinExec(
ColumnarSortMergeJoinExec(
plan.leftKeys,
plan.rightKeys,
plan.joinType,
Expand All @@ -182,17 +186,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {

case plan: CustomShuffleReaderExec if columnarConf.enableColumnarShuffle =>
plan.child match {
case shuffle: ColumnarShuffleExchangeExec =>
case shuffle: ColumnarShuffleExchangeAdaptor =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs, plan.description))
case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec) =>
case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeAdaptor) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs, plan.description))
case ShuffleQueryStageExec(_, reused: ReusedExchangeExec) =>
reused match {
case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) =>
case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeAdaptor) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceBatchesExec(
ColumnarCustomShuffleReaderExec(
Expand Down Expand Up @@ -231,22 +235,20 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
plan.withNewChildren(children)

case p =>
// Here we need to make an exception for operators who use BroadcastExchange
// as one side child, while it is not BroadcastHashedJoin
val children = plan.children.map(replaceWithColumnarPlan)
logDebug(s"Columnar Processing for ${p.getClass} is currently not supported.")
p.withNewChildren(children.map(fallBackBroadcastExchangeOrNot))
}

def fallBackBroadcastQueryStage(curPlan: BroadcastQueryStageExec): BroadcastQueryStageExec = {
curPlan.plan match {
case originalBroadcastPlan: ColumnarBroadcastExchangeExec =>
case originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor =>
BroadcastQueryStageExec(
curPlan.id,
BroadcastExchangeExec(
originalBroadcastPlan.mode,
DataToArrowColumnarExec(originalBroadcastPlan, 1)))
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeExec) =>
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) =>
BroadcastQueryStageExec(
curPlan.id,
BroadcastExchangeExec(
Expand All @@ -261,11 +263,15 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {
case p: ColumnarBroadcastExchangeExec =>
// aqe is disabled
BroadcastExchangeExec(p.mode, DataToArrowColumnarExec(p, 1))
case p: ColumnarBroadcastExchangeAdaptor =>
// aqe is disabled
BroadcastExchangeExec(p.mode, DataToArrowColumnarExec(p, 1))
case p: BroadcastQueryStageExec =>
// ape is enabled
fallBackBroadcastQueryStage(p)
case other => other
}
def setAdaptiveSupport(enable: Boolean): Unit = { isSupportAdaptive = enable }

def apply(plan: SparkPlan): SparkPlan = {
replaceWithColumnarPlan(plan)
Expand All @@ -275,18 +281,16 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {

case class ColumnarPostOverrides(conf: SparkConf) extends Rule[SparkPlan] {
val columnarConf = ColumnarPluginConfig.getConf(conf)
var isSupportAdaptive: Boolean = true

def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match {
case plan: RowToColumnarExec =>
val child = replaceWithColumnarPlan(plan.child)
logDebug(s"ColumnarPostOverrides RowToArrowColumnarExec(${child.getClass})")
RowToArrowColumnarExec(child)
case ColumnarToRowExec(child: ColumnarShuffleExchangeExec)
if SQLConf.get.adaptiveExecutionEnabled && columnarConf.enableColumnarShuffle =>
// When AQE enabled, we need to discard ColumnarToRowExec to avoid extra transactions
// if ColumnarShuffleExchangeExec is the last plan of the query stage.
case ColumnarToRowExec(child: ColumnarShuffleExchangeAdaptor) =>
replaceWithColumnarPlan(child)
case ColumnarToRowExec(child: ColumnarBroadcastExchangeExec) =>
case ColumnarToRowExec(child: ColumnarBroadcastExchangeAdaptor) =>
replaceWithColumnarPlan(child)
case ColumnarToRowExec(child: CoalesceBatchesExec) =>
plan.withNewChildren(Seq(replaceWithColumnarPlan(child.child)))
Expand All @@ -295,6 +299,8 @@ case class ColumnarPostOverrides(conf: SparkConf) extends Rule[SparkPlan] {
p.withNewChildren(children)
}

def setAdaptiveSupport(enable: Boolean): Unit = { isSupportAdaptive = enable }

def apply(plan: SparkPlan): SparkPlan = {
replaceWithColumnarPlan(plan)
}
Expand All @@ -309,18 +315,37 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit
val preOverrides = ColumnarPreOverrides(conf)
val postOverrides = ColumnarPostOverrides(conf)
val collapseOverrides = ColumnarCollapseCodegenStages(conf)
var isSupportAdaptive: Boolean = true

private def supportAdaptive(plan: SparkPlan): Boolean = {
// TODO migrate dynamic-partition-pruning onto adaptive execution.
// Only QueryStage will have Exchange as Leaf Plan
val isLeafPlanExchange = plan match {
case e: Exchange => true
case other => false
}
isLeafPlanExchange || (SQLConf.get.adaptiveExecutionEnabled && (sanityCheck(plan) &&
!plan.logicalLink.exists(_.isStreaming) &&
!plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
plan.children.forall(supportAdaptive)))
}

private def sanityCheck(plan: SparkPlan): Boolean =
plan.logicalLink.isDefined

override def preColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
val tmpPlan = rowGuardOverrides(plan)
preOverrides(tmpPlan)
isSupportAdaptive = supportAdaptive(plan)
preOverrides.setAdaptiveSupport(isSupportAdaptive)
preOverrides(rowGuardOverrides(plan))
} else {
plan
}
}

override def postColumnarTransitions: Rule[SparkPlan] = plan => {
if (columnarEnabled) {
postOverrides.setAdaptiveSupport(isSupportAdaptive)
val tmpPlan = postOverrides(plan)
collapseOverrides(tmpPlan)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ExecutorManager
import org.apache.spark.sql.util.StructTypeFWD
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.ArrowType
Expand Down Expand Up @@ -125,7 +126,7 @@ case class ColumnarConditionProjectExec(

override def supportColumnarCodegen: Boolean = true

override def canEqual(that: Any): Boolean = false
// override def canEqual(that: Any): Boolean = false

def getKernelFunction(childTreeNode: TreeNode): TreeNode = {
val (filterNode, projectNode) =
Expand Down Expand Up @@ -226,22 +227,30 @@ case class ColumnarConditionProjectExec(
}
}

// We have to override equals because subclassing a case class like ProjectExec is not that clean
// One of the issues is that the generated equals will see ColumnarProjectExec and ProjectExec
// as being equal and this can result in the withNewChildren method not actually replacing
// anything
override def equals(other: Any): Boolean = {
if (!super.equals(other)) {
return false
}
return other.isInstanceOf[ColumnarConditionProjectExec]
}
}

class ColumnarUnionExec(children: Seq[SparkPlan]) extends UnionExec(children) {
case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
// updating nullability to make all the children consistent

override def supportsColumnar = true
protected override def doExecuteColumnar(): RDD[ColumnarBatch] =
sparkContext.union(children.map(_.executeColumnar()))
override def output: Seq[Attribute] = {
children.map(_.output).transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
val newDt = attrs.map(_.dataType).reduce(StructTypeFWD.merge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId,
firstAttr.qualifier)
}
}
}
protected override def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,6 @@ case class ColumnarBroadcastHashJoinExec(
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ColumnarBroadcastHashJoinExec]

override def equals(other: Any): Boolean = other match {
case that: ColumnarBroadcastHashJoinExec =>
(that canEqual this) && super.equals(that)
case _ => false
}

//////////////////////////////////////////////////////////////////////////////////////////////////////
def getResultSchema = {
val attributes =
Expand Down
Loading

0 comments on commit 6a8e413

Please sign in to comment.