Skip to content

Commit 1e14765

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-6055] Backport [SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse (#993)
* [CARMEL-6055] Backport [SPARK-29375][SPARK-35855][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse * fix ut * fix ut * fix ut * fix ut * fix ut
1 parent 25d0233 commit 1e14765

File tree

12 files changed

+444
-173
lines changed

12 files changed

+444
-173
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 86 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import scala.collection.mutable
2120
import scala.collection.mutable.ArrayBuffer
2221

2322
import org.apache.spark.sql.AnalysisException
@@ -28,35 +27,22 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
2827
object ExplainUtils extends AdaptiveSparkPlanHelper {
2928
/**
3029
* Given a input physical plan, performs the following tasks.
31-
* 1. Computes the operator id for current operator and records it in the operaror
32-
* by setting a tag.
33-
* 2. Computes the whole stage codegen id for current operator and records it in the
30+
* 1. Computes the whole stage codegen id for current operator and records it in the
3431
* operator by setting a tag.
35-
* 3. Generate the two part explain output for this plan.
32+
* 2. Generate the two part explain output for this plan.
3633
* 1. First part explains the operator tree with each operator tagged with an unique
3734
* identifier.
38-
* 2. Second part explans each operator in a verbose manner.
35+
* 2. Second part explains each operator in a verbose manner.
3936
*
4037
* Note : This function skips over subqueries. They are handled by its caller.
4138
*
4239
* @param plan Input query plan to process
4340
* @param append function used to append the explain output
44-
* @param startOperatorID The start value of operation id. The subsequent operations will
45-
* be assigned higher value.
46-
*
47-
* @return The last generated operation id for this input plan. This is to ensure we
48-
* always assign incrementing unique id to each operator.
49-
*
5041
*/
5142
private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
52-
plan: => QueryPlan[T],
53-
append: String => Unit,
54-
startOperatorID: Int): Int = {
55-
56-
val operationIDs = new mutable.ArrayBuffer[(Int, QueryPlan[_])]()
57-
var currentOperatorID = startOperatorID
43+
plan: T,
44+
append: String => Unit): Unit = {
5845
try {
59-
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operationIDs)
6046
generateWholeStageCodegenIds(plan)
6147

6248
QueryPlan.append(
@@ -67,31 +53,36 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
6753
printOperatorId = true)
6854

6955
append("\n")
70-
var i: Integer = 0
71-
for ((opId, curPlan) <- operationIDs) {
72-
append(curPlan.verboseStringWithOperatorId())
73-
}
56+
57+
val operationsWithID = ArrayBuffer.empty[QueryPlan[_]]
58+
collectOperatorsWithID(plan, operationsWithID)
59+
operationsWithID.foreach(p => append(p.verboseStringWithOperatorId()))
60+
7461
} catch {
7562
case e: AnalysisException => append(e.toString)
7663
}
77-
currentOperatorID
7864
}
7965

8066
/**
8167
* Given a input physical plan, performs the following tasks.
8268
* 1. Generates the explain output for the input plan excluding the subquery plans.
8369
* 2. Generates the explain output for each subquery referenced in the plan.
8470
*/
85-
def processPlan[T <: QueryPlan[T]](
86-
plan: => QueryPlan[T],
87-
append: String => Unit): Unit = {
71+
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
8872
try {
89-
val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
9073
var currentOperatorID = 0
91-
currentOperatorID = processPlanSkippingSubqueries(plan, append, currentOperatorID)
74+
currentOperatorID = generateOperatorIDs(plan, currentOperatorID)
75+
76+
val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
9277
getSubqueries(plan, subqueries)
93-
var i = 0
9478

79+
subqueries.foldLeft(currentOperatorID) {
80+
(curId, plan) => generateOperatorIDs(plan._3.child, curId)
81+
}
82+
83+
processPlanSkippingSubqueries(plan, append)
84+
85+
var i = 0
9586
for (sub <- subqueries) {
9687
if (i == 0) {
9788
append("\n===== Subqueries =====\n\n")
@@ -104,10 +95,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
10495
// the explain output. In case of subquery reuse, we don't print subquery plan more
10596
// than once. So we skip [[ReusedSubqueryExec]] here.
10697
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
107-
currentOperatorID = processPlanSkippingSubqueries(
108-
sub._3.child,
109-
append,
110-
currentOperatorID)
98+
processPlanSkippingSubqueries(sub._3.child, append)
11199
}
112100
append("\n")
113101
}
@@ -117,59 +105,85 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
117105
}
118106

119107
/**
120-
* Traverses the supplied input plan in a bottem-up fashion does the following :
121-
* 1. produces a map : operator identifier -> operator
122-
* 2. Records the operator id via setting a tag in the operator.
108+
* Traverses the supplied input plan in a bottom-up fashion and records the operator id via
109+
* setting a tag in the operator.
123110
* Note :
124-
* 1. Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
125-
* appear in the explain output.
126-
* 2. operator identifier starts at startOperatorID + 1
111+
* - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
112+
* appear in the explain output.
113+
* - Operator identifier starts at startOperatorID + 1
114+
*
127115
* @param plan Input query plan to process
128-
* @param startOperatorID The start value of operation id. The subsequent operations will
129-
* be assigned higher value.
130-
* @param operatorIDs A output parameter that contains a map of operator id and query plan. This
131-
* is used by caller to print the detail portion of the plan.
132-
* @return The last generated operation id for this input plan. This is to ensure we
133-
* always assign incrementing unique id to each operator.
116+
* @param startOperatorID The start value of operation id. The subsequent operations will be
117+
* assigned higher value.
118+
* @return The last generated operation id for this input plan. This is to ensure we always
119+
* assign incrementing unique id to each operator.
134120
*/
135-
private def generateOperatorIDs(
136-
plan: QueryPlan[_],
137-
startOperatorID: Int,
138-
operatorIDs: mutable.ArrayBuffer[(Int, QueryPlan[_])]): Int = {
121+
private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = {
139122
var currentOperationID = startOperatorID
140123
// Skip the subqueries as they are not printed as part of main query block.
141124
if (plan.isInstanceOf[BaseSubqueryExec]) {
142125
return currentOperationID
143126
}
144-
plan.foreachUp {
145-
case p: WholeStageCodegenExec =>
146-
case p: InputAdapter =>
147-
case other: QueryPlan[_] =>
148127

149-
def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
150-
currentOperationID += 1
151-
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
152-
operatorIDs += ((currentOperationID, other))
153-
}
128+
def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
129+
currentOperationID += 1
130+
plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
131+
}
154132

155-
other match {
156-
case p: AdaptiveSparkPlanExec =>
157-
currentOperationID =
158-
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
159-
setOpId()
160-
case p: QueryStageExec =>
161-
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
162-
setOpId()
163-
case _ =>
164-
setOpId()
165-
other.innerChildren.foldLeft(currentOperationID) {
166-
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
167-
}
133+
plan.foreachUp {
134+
case _: WholeStageCodegenExec =>
135+
case _: InputAdapter =>
136+
case p: AdaptiveSparkPlanExec =>
137+
currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID)
138+
setOpId(p)
139+
case p: QueryStageExec =>
140+
currentOperationID = generateOperatorIDs(p.plan, currentOperationID)
141+
setOpId(p)
142+
case other: QueryPlan[_] =>
143+
setOpId(other)
144+
other.innerChildren.foldLeft(currentOperationID) {
145+
(curId, plan) => generateOperatorIDs(plan, curId)
168146
}
169147
}
170148
currentOperationID
171149
}
172150

151+
/**
152+
* Traverses the supplied input plan in a bottom-up fashion and collects operators with assigned
153+
* ids.
154+
*
155+
* @param plan Input query plan to process
156+
* @param operators An output parameter that contains the operators.
157+
*/
158+
private def collectOperatorsWithID(
159+
plan: QueryPlan[_],
160+
operators: ArrayBuffer[QueryPlan[_]]): Unit = {
161+
// Skip the subqueries as they are not printed as part of main query block.
162+
if (plan.isInstanceOf[BaseSubqueryExec]) {
163+
return
164+
}
165+
166+
def collectOperatorWithID(plan: QueryPlan[_]): Unit = {
167+
if (plan.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
168+
operators += plan
169+
}
170+
}
171+
172+
plan.foreachUp {
173+
case _: WholeStageCodegenExec =>
174+
case _: InputAdapter =>
175+
case p: AdaptiveSparkPlanExec =>
176+
collectOperatorsWithID(p.executedPlan, operators)
177+
collectOperatorWithID(p)
178+
case p: QueryStageExec =>
179+
collectOperatorsWithID(p.plan, operators)
180+
collectOperatorWithID(p)
181+
case other: QueryPlan[_] =>
182+
collectOperatorWithID(other)
183+
other.innerChildren.foreach(collectOperatorsWithID(_, operators))
184+
}
185+
}
186+
173187
/**
174188
* Traverses the supplied input plan in a top-down fashion and records the
175189
* whole stage code gen id in the plan via setting a tag.

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ import org.apache.spark.sql.catalyst.util.truncatedString
3535
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, EnsureRepartitionForWriting, InsertAdaptiveSparkPlan}
3636
import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
3737
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
38-
import org.apache.spark.sql.execution.exchange.{EliminateShuffleExec, EnsureRequirements, ReuseExchange}
38+
import org.apache.spark.sql.execution.exchange.EliminateShuffleExec
39+
import org.apache.spark.sql.execution.exchange.EnsureRequirements
40+
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
3941
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
4042
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
4143
import org.apache.spark.sql.streaming.OutputMode
@@ -147,7 +149,7 @@ class QueryExecution(
147149

148150
protected def preparations: Seq[Rule[SparkPlan]] = {
149151
QueryExecution.preparations(sparkSession,
150-
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
152+
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)
151153
}
152154

153155
private def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
@@ -323,7 +325,8 @@ object QueryExecution {
323325
*/
324326
private[execution] def preparations(
325327
sparkSession: SparkSession,
326-
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
328+
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,
329+
subquery: Boolean): Seq[Rule[SparkPlan]] = {
327330
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
328331
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
329332
adaptiveExecutionRule.toSeq ++
@@ -339,10 +342,12 @@ object QueryExecution {
339342
EliminateShuffleExec,
340343
DisableUnnecessaryBucketedScan,
341344
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
342-
CollapseCodegenStages(),
343-
ReuseExchange,
344-
ReuseSubquery
345-
)
345+
CollapseCodegenStages()) ++
346+
(if (subquery) {
347+
Nil
348+
} else {
349+
Seq(ReuseExchangeAndSubquery)
350+
})
346351
}
347352

348353
/**
@@ -373,7 +378,7 @@ object QueryExecution {
373378
* Prepare the [[SparkPlan]] for execution.
374379
*/
375380
def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {
376-
prepareForExecution(preparations(spark), plan)
381+
prepareForExecution(preparations(spark, subquery = true), plan)
377382
}
378383

379384
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ case class HashAggregateExec(
7272
// This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
7373
// map and/or the sort-based aggregation once it has processed a given number of input rows.
7474
private val testFallbackStartsAt: Option[(Int, Int)] = {
75-
sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match {
75+
Option(sqlContext).map { s =>
76+
s.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null)
77+
}.orNull match {
7678
case null | "" => None
7779
case fallbackStartsAt =>
7880
val splits = fallbackStartsAt.split(",").map(_.trim)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -95,51 +95,3 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
9595
|""".stripMargin
9696
}
9797
}
98-
99-
/**
100-
* Find out duplicated exchanges in the spark plan, then use the same exchange for all the
101-
* references.
102-
*/
103-
object ReuseExchange extends Rule[SparkPlan] {
104-
105-
def apply(plan: SparkPlan): SparkPlan = {
106-
if (!conf.exchangeReuseEnabled) {
107-
return plan
108-
}
109-
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
110-
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
111-
112-
// Replace a Exchange duplicate with a ReusedExchange
113-
def reuse: PartialFunction[Exchange, SparkPlan] = {
114-
case exchange: Exchange =>
115-
val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]())
116-
val samePlan = sameSchema.find { e =>
117-
exchange.sameResult(e)
118-
}
119-
if (samePlan.isDefined) {
120-
// Keep the output of this exchange, the following plans require that to resolve
121-
// attributes.
122-
ReusedExchangeExec(exchange.output, samePlan.get)
123-
} else {
124-
sameSchema += exchange
125-
exchange
126-
}
127-
}
128-
129-
plan transformUp {
130-
case exchange: Exchange => reuse(exchange)
131-
} transformAllExpressions {
132-
// Lookup inside subqueries for duplicate exchanges
133-
case in: InSubqueryExec =>
134-
val newIn = in.plan.transformUp {
135-
case exchange: Exchange => reuse(exchange)
136-
}
137-
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
138-
case in: InBloomFilterSubqueryExec =>
139-
val newIn = in.plan.transformUp {
140-
case exchange: Exchange => reuse(exchange)
141-
}
142-
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
143-
}
144-
}
145-
}

0 commit comments

Comments
 (0)