Skip to content

Commit debb094

Browse files
committed
fix
1 parent 25689da commit debb094

File tree

2 files changed

+10
-34
lines changed

2 files changed

+10
-34
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.analysis._
2424
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
2525
import org.apache.spark.sql.catalyst.expressions._
26-
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
2726
import org.apache.spark.sql.catalyst.expressions.aggregate._
2827
import org.apache.spark.sql.catalyst.plans._
2928
import org.apache.spark.sql.catalyst.plans.logical._
@@ -1458,8 +1457,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
14581457
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
14591458
val (leftEvaluateCondition, rest) =
14601459
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
1461-
val rightEvaluateCondition = pushDownCandidates.filter(_.references.subsetOf(right.outputSet))
1462-
val commonCondition = rest.filterNot(_.references.subsetOf(right.outputSet))
1460+
val (rightEvaluateCondition, commonCondition) =
1461+
rest.partition(expr => expr.references.subsetOf(right.outputSet))
14631462

14641463
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic)
14651464
}
@@ -1469,12 +1468,6 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
14691468
case _ => false
14701469
}
14711470

1472-
private def pushDownJoinConditions(conditions: Seq[Expression], plan: LogicalPlan) = {
1473-
conditions
1474-
.filterNot(_.semanticEquals(TrueLiteral)) // Push down true condition is useless.
1475-
.reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan)
1476-
}
1477-
14781471
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
14791472

14801473
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
@@ -1533,22 +1526,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
15331526
joinType match {
15341527
case _: InnerLike | LeftSemi =>
15351528
// push down the single side only join filter for both sides sub queries
1536-
val newLeft = pushDownJoinConditions(leftJoinConditions, left)
1537-
val newRight = pushDownJoinConditions(rightJoinConditions, right)
1529+
val newLeft = leftJoinConditions.
1530+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
1531+
val newRight = rightJoinConditions.
1532+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
15381533
val newJoinCond = commonJoinCondition.reduceLeftOption(And)
15391534

15401535
Join(newLeft, newRight, joinType, newJoinCond, hint)
15411536
case RightOuter =>
15421537
// push down the left side only join filter for left side sub query
1543-
val newLeft = pushDownJoinConditions(leftJoinConditions, left)
1538+
val newLeft = leftJoinConditions.
1539+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
15441540
val newRight = right
15451541
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
15461542

15471543
Join(newLeft, newRight, RightOuter, newJoinCond, hint)
15481544
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
15491545
// push down the right side only join filter for right sub query
15501546
val newLeft = left
1551-
val newRight = pushDownJoinConditions(rightJoinConditions, right)
1547+
val newRight = rightJoinConditions.
1548+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
15521549
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
15531550

15541551
Join(newLeft, newRight, joinType, newJoinCond, hint)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,25 +1384,4 @@ class FilterPushdownSuite extends PlanTest {
13841384
condition = Some("x.a".attr === "z.a".attr)).analyze
13851385
comparePlans(optimized, correctAnswer)
13861386
}
1387-
1388-
test("SPARK-28220: Push down true join condition for inner join") {
1389-
val x = testRelation.subquery('x)
1390-
val y = testRelation.subquery('y)
1391-
val originalQuery = x.join(y, condition = Some(true))
1392-
1393-
val optimized = Optimize.execute(originalQuery.analyze)
1394-
val correctAnswer = x.join(y, condition = None).analyze
1395-
comparePlans(optimized, correctAnswer)
1396-
}
1397-
1398-
test("SPARK-28220: Should not push down true join condition for left/right join") {
1399-
Seq(LeftOuter, RightOuter).foreach { joinType =>
1400-
val x = testRelation.subquery('x)
1401-
val y = testRelation.subquery('y)
1402-
val originalQuery = x.join(y, joinType = joinType, condition = Some(true))
1403-
1404-
val optimized = Optimize.execute(originalQuery.analyze)
1405-
comparePlans(optimized, originalQuery.analyze)
1406-
}
1407-
}
14081387
}

0 commit comments

Comments
 (0)