Skip to content

Commit 1b7443b

Browse files
allisonwang-dbcloud-fan
authored andcommitted
[SPARK-32216][SQL] Remove redundant ProjectExec
### What changes were proposed in this pull request? This PR added a physical rule to remove redundant project nodes. A `ProjectExec` is redundant when 1. It has the same output attributes and order as its child's output when ordering of these attributes is required. 2. It has the same output attributes as its child's output when attribute output ordering is not required. For example: After Filter: ``` == Physical Plan == *(1) Project [a#14L, b#15L, c#16, key#17] +- *(1) Filter (isnotnull(a#14L) AND (a#14L > 5)) +- *(1) ColumnarToRow +- FileScan parquet [a#14L,b#15L,c#16,key#17] ``` The `Project a#14L, b#15L, c#16, key#17` is redundant because its output is exactly the same as filter's output. Before Aggregate: ``` == Physical Plan == *(2) HashAggregate(keys=[key#17], functions=[sum(a#14L), last(b#15L, false)], output=[sum_a#39L, key#17, last_b#41L]) +- Exchange hashpartitioning(key#17, 5), true, [id=#77] +- *(1) HashAggregate(keys=[key#17], functions=[partial_sum(a#14L), partial_last(b#15L, false)], output=[key#17, sum#49L, last#50L, valueSet#51]) +- *(1) Project [key#17, a#14L, b#15L] +- *(1) Filter (isnotnull(a#14L) AND (a#14L > 100)) +- *(1) ColumnarToRow +- FileScan parquet [a#14L,b#15L,key#17] ``` The `Project key#17, a#14L, b#15L` is redundant because hash aggregate doesn't require child plan's output to be in a specific order. ### Why are the changes needed? It removes unnecessary query nodes and makes query plan cleaner. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #29031 from allisonwang-db/remove-project. Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Co-authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 8659ec5 commit 1b7443b

File tree

8 files changed

+534
-418
lines changed

8 files changed

+534
-418
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,14 @@ object SQLConf {
12111211
.booleanConf
12121212
.createWithDefault(true)
12131213

1214+
val REMOVE_REDUNDANT_PROJECTS_ENABLED = buildConf("spark.sql.execution.removeRedundantProjects")
1215+
.internal()
1216+
.doc("Whether to remove redundant project exec node based on children's output and " +
1217+
"ordering requirement.")
1218+
.version("3.1.0")
1219+
.booleanConf
1220+
.createWithDefault(true)
1221+
12141222
val STATE_STORE_PROVIDER_CLASS =
12151223
buildConf("spark.sql.streaming.stateStore.providerClass")
12161224
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ object QueryExecution {
342342
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
343343
PlanDynamicPruningFilters(sparkSession),
344344
PlanSubqueries(sparkSession),
345+
RemoveRedundantProjects(sparkSession.sessionState.conf),
345346
EnsureRequirements(sparkSession.sessionState.conf),
346347
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
347348
sparkSession.sessionState.columnarRules),
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, PartialMerge}
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
24+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
25+
import org.apache.spark.sql.execution.window.WindowExec
26+
import org.apache.spark.sql.internal.SQLConf
27+
28+
/**
29+
* Remove redundant ProjectExec node from the spark plan. A ProjectExec node is redundant when
30+
* - It has the same output attributes and orders as its child's output and the ordering of
31+
* the attributes is required.
32+
* - It has the same output attributes as its child's output when attribute output ordering
33+
* is not required.
34+
* This rule needs to be a physical rule because project nodes are useful during logical
35+
* optimization to prune data. During physical planning, redundant project nodes can be removed
36+
* to simplify the query plan.
37+
*/
38+
case class RemoveRedundantProjects(conf: SQLConf) extends Rule[SparkPlan] {
39+
def apply(plan: SparkPlan): SparkPlan = {
40+
if (!conf.getConf(SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED)) {
41+
plan
42+
} else {
43+
removeProject(plan, true)
44+
}
45+
}
46+
47+
private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = {
48+
plan match {
49+
case p @ ProjectExec(_, child) =>
50+
if (isRedundant(p, child, requireOrdering)) {
51+
val newPlan = removeProject(child, requireOrdering)
52+
newPlan.setLogicalLink(child.logicalLink.get)
53+
newPlan
54+
} else {
55+
p.mapChildren(removeProject(_, false))
56+
}
57+
case op: TakeOrderedAndProjectExec =>
58+
op.mapChildren(removeProject(_, false))
59+
case a: BaseAggregateExec =>
60+
// BaseAggregateExec require specific column ordering when mode is Final or PartialMerge.
61+
// See comments in BaseAggregateExec inputAttributes method.
62+
val keepOrdering = a.aggregateExpressions
63+
.exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge))
64+
a.mapChildren(removeProject(_, keepOrdering))
65+
case g: GenerateExec => g.mapChildren(removeProject(_, false))
66+
// JoinExec ordering requirement will inherit from its parent. If there is no ProjectExec in
67+
// its ancestors, JoinExec should require output columns to be ordered.
68+
case o => o.mapChildren(removeProject(_, requireOrdering))
69+
}
70+
}
71+
72+
/**
73+
* Check if the nullability change is positive. It catches the case when the project output
74+
* attribute is not nullable, but the child output attribute is nullable.
75+
*/
76+
private def checkNullability(output: Seq[Attribute], childOutput: Seq[Attribute]): Boolean =
77+
output.zip(childOutput).forall { case (attr1, attr2) => attr1.nullable || !attr2.nullable }
78+
79+
private def isRedundant(
80+
project: ProjectExec,
81+
child: SparkPlan,
82+
requireOrdering: Boolean): Boolean = {
83+
child match {
84+
// If a DataSourceV2ScanExec node does not support columnar, a ProjectExec node is required
85+
// to convert the rows to UnsafeRow. See DataSourceV2Strategy for more details.
86+
case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false
87+
case _ =>
88+
if (requireOrdering) {
89+
project.output.map(_.exprId.id) == child.output.map(_.exprId.id) &&
90+
checkNullability(project.output, child.output)
91+
} else {
92+
val orderedProjectOutput = project.output.sortBy(_.exprId.id)
93+
val orderedChildOutput = child.output.sortBy(_.exprId.id)
94+
orderedProjectOutput.map(_.exprId.id) == orderedChildOutput.map(_.exprId.id) &&
95+
checkNullability(orderedProjectOutput, orderedChildOutput)
96+
}
97+
}
98+
}
99+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ case class AdaptiveSparkPlanExec(
8383
)
8484
}
8585

86+
@transient private val removeRedundantProjects = RemoveRedundantProjects(conf)
8687
@transient private val ensureRequirements = EnsureRequirements(conf)
8788

8889
// A list of physical plan rules to be applied before creation of query stages. The physical
8990
// plan should reach a final status of query stages (i.e., no more addition or removal of
9091
// Exchange nodes) after running these rules.
9192
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
93+
removeRedundantProjects,
9294
ensureRequirements
9395
) ++ context.session.sessionState.queryStagePrepRules
9496

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.dynamicpruning
1919

2020
import org.apache.spark.sql.SparkSession
2121
import org.apache.spark.sql.catalyst.expressions
22-
import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper}
22+
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper}
2323
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
2424
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
2525
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
@@ -40,8 +40,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
4040
/**
4141
* Identify the shape in which keys of a given plan are broadcasted.
4242
*/
43-
private def broadcastMode(keys: Seq[Expression], plan: LogicalPlan): BroadcastMode = {
44-
val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), plan.output)
43+
private def broadcastMode(keys: Seq[Expression], output: AttributeSeq): BroadcastMode = {
44+
val packedKeys = BindReferences.bindReferences(HashJoin.rewriteKeyExpr(keys), output)
4545
HashedRelationBroadcastMode(packedKeys)
4646
}
4747

@@ -67,8 +67,8 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession)
6767
}.isDefined
6868

6969
if (canReuseExchange) {
70-
val mode = broadcastMode(buildKeys, buildPlan)
7170
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
71+
val mode = broadcastMode(buildKeys, executedPlan.output)
7272
// plan a broadcast exchange of the build side of the join
7373
val exchange = BroadcastExchangeExec(mode, executedPlan)
7474
val name = s"dynamicpruning#${exprId.id}"

0 commit comments

Comments
 (0)