Skip to content

Commit c5ff632

Browse files
committed
support non-local predicates and bug fix.
1 parent 0bb07cb commit c5ff632

File tree

1 file changed

+32
-33
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer

1 file changed

+32
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -770,54 +770,52 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
770770
}
771771

772772
/**
773-
* Elimination of outer joins, if the local predicates can restrict the result sets so that
773+
* Elimination of outer joins, if the predicates can restrict the result sets so that
774774
* all null-supplying rows are eliminated
775775
*
776-
* - full outer -> inner if both sides have such local predicates
777-
* - left outer -> inner if the right side has such local predicates
778-
* - right outer -> inner if the left side has such local predicates
779-
* - full outer -> left outer if only the left side has such local predicates
780-
* - full outer -> right outer if only the right side has such local predicates
776+
* - full outer -> inner if both sides have such predicates
777+
* - left outer -> inner if the right side has such predicates
778+
* - right outer -> inner if the left side has such predicates
779+
* - full outer -> left outer if only the left side has such predicates
780+
* - full outer -> right outer if only the right side has such predicates
781781
*
782782
* This rule should be executed before pushing down the Filter
783783
*/
784784
object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
785+
786+
private def containsAttr(plan: LogicalPlan, attr: Attribute): Boolean =
787+
plan.outputSet.exists(_.semanticEquals(attr))
785788

786-
private def isNullFilteringPredicate(predicate: Expression): Boolean = {
789+
private def hasNullFilteringPredicate(predicate: Expression, plan: LogicalPlan): Boolean = {
787790
predicate match {
788-
case EqualTo(ar: AttributeReference, _) => true
789-
case EqualTo(_, ar: AttributeReference) => true
790-
case EqualNullSafe(ar: AttributeReference, l) if !l.nullable => true
791-
case EqualNullSafe(l, ar: AttributeReference) if !l.nullable => true
792-
case GreaterThan(ar: AttributeReference, _) => true
793-
case GreaterThan(_, ar: AttributeReference) => true
794-
case GreaterThanOrEqual(ar: AttributeReference, _) => true
795-
case GreaterThanOrEqual(_, ar: AttributeReference) => true
796-
case LessThan(ar: AttributeReference, _) => true
797-
case LessThan(_, ar: AttributeReference) => true
798-
case LessThanOrEqual(ar: AttributeReference, _) => true
799-
case LessThanOrEqual(_, ar: AttributeReference) => true
800-
case In(ar: AttributeReference, _) => true
801-
case IsNotNull(ar: AttributeReference) => true
802-
case And(l, r) => isNullFilteringPredicate(l) || isNullFilteringPredicate(l)
803-
case Or(l, r) => isNullFilteringPredicate(l) && isNullFilteringPredicate(l)
804-
case Not(e) => !isNullFilteringPredicate(e)
791+
case EqualTo(ar: AttributeReference, _) if containsAttr(plan, ar) => true
792+
case EqualTo(_, ar: AttributeReference) if containsAttr(plan, ar) => true
793+
case EqualNullSafe(ar: AttributeReference, l)
794+
if !l.nullable && containsAttr(plan, ar) => true
795+
case EqualNullSafe(l, ar: AttributeReference)
796+
if !l.nullable && containsAttr(plan, ar) => true
797+
case GreaterThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true
798+
case GreaterThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true
799+
case GreaterThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true
800+
case GreaterThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true
801+
case LessThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true
802+
case LessThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true
803+
case LessThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true
804+
case LessThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true
805+
case In(ar: AttributeReference, _) if containsAttr(plan, ar) => true
806+
case IsNotNull(ar: AttributeReference) if containsAttr(plan, ar) => true
807+
case And(l, r) => hasNullFilteringPredicate(l, plan) || hasNullFilteringPredicate(r, plan)
808+
case Or(l, r) => hasNullFilteringPredicate(l, plan) && hasNullFilteringPredicate(r, plan)
809+
case Not(e) => !hasNullFilteringPredicate(e, plan)
805810
case _ => false
806811
}
807812
}
808813

809-
private def hasNullFilteringLocalPredicate(
810-
condition: Expression, child: LogicalPlan): Boolean = {
811-
val localPredicates = splitConjunctivePredicates(condition)
812-
.filter(_.references subsetOf child.outputSet)
813-
localPredicates.exists(isNullFilteringPredicate)
814-
}
815-
816814
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
817815
// Only three outer join types are eligible: RightOuter|LeftOuter|FullOuter
818816
case f @ Filter(filterCond, j @ Join(left, right, RightOuter|LeftOuter|FullOuter, joinCond)) =>
819-
val leftHasNonNullPredicate = hasNullFilteringLocalPredicate(filterCond, left)
820-
val rightHasNonNullPredicate = hasNullFilteringLocalPredicate(filterCond, right)
817+
val leftHasNonNullPredicate = hasNullFilteringPredicate(filterCond, left)
818+
val rightHasNonNullPredicate = hasNullFilteringPredicate(filterCond, right)
821819

822820
j.joinType match {
823821
case RightOuter if leftHasNonNullPredicate =>
@@ -830,6 +828,7 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
830828
Filter(filterCond, Join(left, right, LeftOuter, joinCond))
831829
case FullOuter if rightHasNonNullPredicate =>
832830
Filter(filterCond, Join(left, right, RightOuter, joinCond))
831+
case _ => f
833832
}
834833
}
835834
}

0 commit comments

Comments
 (0)