Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1385,18 +1385,25 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Splits join condition expressions into three categories based on the attributes required
* to evaluate them.
* Splits join condition expressions or filter predicates (on a given join's output) into three
* categories based on the attributes required to evaluate them. Note that we explicitly exclude
* on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
Copy link
Contributor

@cloud-fan cloud-fan Aug 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: non-deterministic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eye! can you please fold this change into one of your open PRs :) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I will :)

* canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
// Note: In order to ensure correctness, it's important to not change the relative ordering of
// any deterministic expression that follows a non-deterministic expression. To achieve this,
// we only consider pushing down those expressions that precede the first non-deterministic
// expression in the condition.
val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic)
val (leftEvaluateCondition, rest) =
condition.partition(_.references subsetOf left.outputSet)
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(_.references subsetOf right.outputSet)
rest.partition(expr => expr.references.subsetOf(right.outputSet))

(leftEvaluateCondition, rightEvaluateCondition, commonCondition)
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down Expand Up @@ -1447,7 +1454,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

// push down the join filter into sub query scanning if applicable
case f @ Join(left, right, joinType, joinCondition) =>
case j @ Join(left, right, joinType, joinCondition) =>
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)

Expand Down Expand Up @@ -1477,7 +1484,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, LeftOuter, newJoinCond)
case FullOuter => f
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,4 +987,18 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
}

test("join condition pushdown: deterministic and non-deterministic") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

// Verify that all conditions preceding the first non-deterministic condition are pushed down
// by the optimizer and others are not.
val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
"x.a".attr === Rand(10) && "y.b".attr === 5))
val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5),
condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze)
}
}