diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5fb59ef350b8..913354e4df0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] { * constraints. These filters are currently inserted to the existing conditions in the Filter * operators and on either side of Join operators. * - * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and - * LeftSemi joins. + * In addition, for left/right outer joins, infer predicate from the preserved side of the Join + * operator and push the inferred filter over to the null-supplying side. For example, if the + * preserved side has constraints of the form 'a > 5' and the join condition is 'a = b', in + * which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b > 5' will + * be applied to the null-supplying side. */ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper { @@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + // Infer filter for left/right outer joins + val newLeftOpt = joinType match { + case RightOuter if newConditionOpt.isDefined => + val inferredConstraints = left.getRelevantConstraints( + left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints + .filterNot(left.constraints.contains) + .reduceLeftOption(And) + newFilters.map(Filter(_, left)) + case _ => None + } + val newRightOpt = joinType match { + case LeftOuter if newConditionOpt.isDefined => + val inferredConstraints = right.getRelevantConstraints( + right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints + .filterNot(right.constraints.contains) + .reduceLeftOption(And) + newFilters.map(Filter(_, right)) + case _ => None + } + + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) + || newLeftOpt.isDefined || newRightOpt.isDefined) { + Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt) + } else { + join + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 046848875548..a29f3d29236c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan => * example, if this set contains the expression `a = 2` then that expression is guaranteed to * evaluate to `true` for all rows produced. */ - lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c => - c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic - }) + lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly)) /** * This method can be overridden by any child class of QueryPlan to specify a set of constraints @@ -55,6 +53,23 @@ trait QueryPlanConstraints { self: LogicalPlan => */ protected def validConstraints: Set[Expression] = Set.empty + /** + * Returns an [[ExpressionSet]] that contains an additional set of constraints, such as + * equality constraints and `isNotNull` constraints, etc., and that only contains references + * to this [[LogicalPlan]] node. + */ + def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = { + val allRelevantConstraints = + if (conf.constraintPropagationEnabled) { + constraints + .union(inferAdditionalConstraints(constraints)) + .union(constructIsNotNullConstraints(constraints)) + } else { + constraints + } + ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly)) + } + /** * Infers a set of `isNotNull` constraints from null intolerant expressions as well as * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this @@ -120,4 +135,8 @@ trait QueryPlanConstraints { self: LogicalPlan => destination: Attribute): Set[Expression] = constraints.map(_ transform { case e: Expression if e.semanticEquals(source) => destination }) + + private def selfReferenceOnly(e: Expression): Boolean = { + e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index f78c2356e35a..e068f5104458 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("SPARK-21479: Outer join after-join filters push down to null-supplying side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 2).analyze + val left = x.where(IsNotNull('a) && 'a === 2) + val right = y.where(IsNotNull('a) && 'a === 2) + val correctAnswer = left.join(right, LeftOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21479: Outer join pre-existing filters push down to null-supplying side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, condition).analyze + val left = x.where(IsNotNull('a) && 'a > 5) + val right = y.where(IsNotNull('a) && 'a > 5) + val correctAnswer = left.join(right, RightOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21479: Outer join no filter push down to preserved side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze + val left = x + val right = y.where(IsNotNull('a) && 'a === 1) + val correctAnswer = left.join(right, LeftOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } }