Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
* constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
* In addition, for left/right outer joins, infer predicate from the preserved side of the Join
* operator and push the inferred filter over to the null-supplying side. For example, if the
* preserved side has constraints of the form 'a > 5' and the join condition is 'a = b', in
* which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b > 5' will
* be applied to the null-supplying side.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {

Expand Down Expand Up @@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
val newConditionOpt = conditionOpt match {
case Some(condition) =>
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this unchanged? We just conditionOpt in line 681, 685, 693, and 697?

Copy link
Contributor Author

@maryannxue maryannxue Mar 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what would be the benefit of keeping that unchanged? To me, it would make the code look confusing. And in theory the two parts (1. infer newConditionOpt; 2. infer newLeftOp or newRightOpt) of this optimization rule will be unsynchronized, leaving part 2 always one iteration behind part 1.

case None =>
additionalConstraints.reduceOption(And)
}
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
// Infer filter for left/right outer joins
val newLeftOpt = joinType match {
case RightOuter if newConditionOpt.isDefined =>
val inferredConstraints = left.getRelevantConstraints(
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
val newFilters = inferredConstraints
.filterNot(left.constraints.contains)
.reduceLeftOption(And)
newFilters.map(Filter(_, left))
case _ => None
}
val newRightOpt = joinType match {
case LeftOuter if newConditionOpt.isDefined =>
val inferredConstraints = right.getRelevantConstraints(
right.constraints
.union(left.constraints)
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
val newFilters = inferredConstraints
.filterNot(right.constraints.contains)
.reduceLeftOption(And)
newFilters.map(Filter(_, right))
case _ => None
}

if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test case in which newConditionOpt ne conditionOpt and newConditionOpt.isDefined are not true at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      val newConditionOpt = conditionOpt match {
        case Some(condition) =>
          val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
        case None =>
          additionalConstraints.reduceOption(And)
      }

So here, if conditionOpt is matched "None" and meanwhile additionalConstraints is empty, I assume newConditionOpt and conditionOpt will both be an empty Opt, but reference comparison ne will return false.
Since this is part of the original InferFilterFromConstraints logic, and I only modified it so as to make newConditionOpt work for the rest of the function (the new logic added), I assume it has already been covered by the existing tests.

Copy link
Member

@gatorsmile gatorsmile Mar 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change this to if (newConditionOpt.isDefined || newLeftOpt.isDefined || newRightOpt.isDefined), do all the tests can pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they do. Only that when conditionOpt is not empty and additionalConstraints is empty, there will be unnecessary operations.

|| newLeftOpt.isDefined || newRightOpt.isDefined) {
Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
} else {
join
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
*/
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
})
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly))

/**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
Expand All @@ -55,6 +53,23 @@ trait QueryPlanConstraints { self: LogicalPlan =>
*/
protected def validConstraints: Set[Expression] = Set.empty

/**
* Returns an [[ExpressionSet]] that contains an additional set of constraints, such as
* equality constraints and `isNotNull` constraints, etc., and that only contains references
* to this [[LogicalPlan]] node.
*/
def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
val allRelevantConstraints =
if (conf.constraintPropagationEnabled) {
constraints
.union(inferAdditionalConstraints(constraints))
.union(constructIsNotNullConstraints(constraints))
} else {
constraints
}
ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
}

/**
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
Expand Down Expand Up @@ -120,4 +135,8 @@ trait QueryPlanConstraints { self: LogicalPlan =>
destination: Attribute): Set[Expression] = constraints.map(_ transform {
case e: Expression if e.semanticEquals(source) => destination
})

private def selfReferenceOnly(e: Expression): Boolean = {
e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("SPARK-21479: Outer join after-join filters push down to null-supplying side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 2).analyze
val left = x.where(IsNotNull('a) && 'a === 2)
val right = y.where(IsNotNull('a) && 'a === 2)
val correctAnswer = left.join(right, LeftOuter, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("SPARK-21479: Outer join pre-existing filters push down to null-supplying side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, condition).analyze
val left = x.where(IsNotNull('a) && 'a > 5)
val right = y.where(IsNotNull('a) && 'a > 5)
val correctAnswer = left.join(right, RightOuter, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("SPARK-21479: Outer join no filter push down to preserved side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze
val left = x
val right = y.where(IsNotNull('a) && 'a === 1)
val correctAnswer = left.join(right, LeftOuter, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the test cases can pass without this new rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please verify again. The first two could not pass without this rule, the last one could since it's a counter case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Could you change the InferFiltersFromConstraints directly?

}