Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
PushPredicateThroughJoin,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,107 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}
}

/**
* This rule is a variant of [[PushPredicateThroughJoin]] which can handle
* pushing down Left semi and Left Anti joins below a join operator. The
* allowable join types are:
* 1) Inner
* 2) Cross
* 3) LeftOuter
* 4) RightOuter
*
* TODO:
* Currently this rule can push down the left semi or left anti joins to either
* left or right leg of the child join. This matches the behaviour of `PushPredicateThroughJoin`
* when the lefi semi or left anti join is in expression form. We need to explore the possibility
* to push the left semi/anti joins to both legs of join if the join condition refers to
* both left and right legs of the child join.
*/
object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to
* the left leg or the right leg of the join.
*/
object PushdownDirection extends Enumeration {
val TO_LEFT_BRANCH, TO_RIGHT_BRANCH, NONE = Value
}

object AllowedJoin {
def unapply(join: Join): Option[Join] = join.joinType match {
case Inner | Cross | LeftOuter | RightOuter => Some(join)
case _ => None
}
}

/**
* Determine which side of the join a LeftSemi/LeftAnti join can be pushed to.
*/
private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: Option[Expression]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we can only push down left semi/anti join to either left or right side, there is no need to return the new join condition in this method, because the join condition won't change.

val left = leftChild.left
val right = leftChild.right
val joinType = leftChild.joinType
val rightOutput = rightChild.outputSet

if (joinCond.nonEmpty) {
val conditions = splitConjunctivePredicates(joinCond.get)
val (leftConditions, rest) =
conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput))
val (rightConditions, commonConditions) =
rest.partition(_.references.subsetOf(right.outputSet ++ rightOutput))

if (rest.isEmpty && leftConditions.nonEmpty) {
// When the join conditions can be computed based on the left leg of
// leftsemi/anti join then push the leftsemi/anti join to the left side.
PushdownDirection.TO_LEFT_BRANCH
} else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if commonConditions is not empty? can we add a filter at top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Please refer to my answer above.

// When the join conditions can be computed based on the attributes from right leg of
// leftsemi/anti join then push the leftsemi/anti join to the right side.
PushdownDirection.TO_RIGHT_BRANCH
} else {
PushdownDirection.NONE
}
} else {
/**
* When the join condition is empty,
* 1) if this is a left outer join or inner join, push leftsemi/anti join down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we push to both legs if it's inner join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Perhaps its possible. In this PR, i was focusing on what is happening today in PushPredicateThroughJoin and keep the behaviour same. We can look into improving both this rule and PushPredicateThroughJoin as follow-up. The reason i say it is, probably we need to test more and prove that pushdown to both sides don't create any side effects or can cause wrong results ?

* to the left leg of join.
* 2) if a right outer join, to the right leg of join,
*/
joinType match {
case _: InnerLike | LeftOuter =>
PushdownDirection.TO_LEFT_BRANCH
case RightOuter =>
PushdownDirection.TO_RIGHT_BRANCH
case _ =>
PushdownDirection.NONE
}
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// push LeftSemi/LeftAnti down into the join below
case j @ Join(AllowedJoin(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) =>
val (childJoinType, childLeft, childRight, childCondition, childHint) =
(left.joinType, left.left, left.right, left.condition, left.hint)
val action = pushTo(left, right, joinCond)

action match {
case PushdownDirection.TO_LEFT_BRANCH
if (childJoinType == LeftOuter || childJoinType.isInstanceOf[InnerLike]) =>
// push down leftsemi/anti join to the left table
val newLeft = Join(childLeft, right, joinType, joinCond, parentHint)
Join(newLeft, childRight, childJoinType, childCondition, childHint)
case PushdownDirection.TO_RIGHT_BRANCH
if (childJoinType == RightOuter || childJoinType.isInstanceOf[InnerLike]) =>
// push down leftsemi/anti join to the right table
val newRight = Join(childRight, right, joinType, joinCond, parentHint)
Join(childLeft, newRight, childJoinType, childCondition, childHint)
case _ =>
// Do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When we decide that we can't pushdown the parent join. For example this test should exercise the default case.

j
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ class LeftSemiPushdownSuite extends PlanTest {
CombineFilters,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
BooleanSimplification,
CollapseProject) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

val testRelation1 = LocalRelation('d.int)
val testRelation2 = LocalRelation('e.int)

test("Project: LeftSemiAnti join pushdown") {
val originalQuery = testRelation
Expand Down Expand Up @@ -314,4 +315,118 @@ class LeftSemiPushdownSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}

Seq(Some('d === 'e), None).foreach { case innerJoinCond =>
Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT =>
test(s"$outerJT pushdown empty join cond join type $innerJT join cond $innerJoinCond") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond)
val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None)
val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = if (innerJT == RightOuter) {
val pushedDownJoin = testRelation2.join(testRelation, joinType = outerJT, None)
testRelation1.join(pushedDownJoin, joinType = innerJT, innerJoinCond).analyze
} else {
val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None)
pushedDownJoin.join(testRelation2, joinType = innerJT, innerJoinCond).analyze
}
comparePlans(optimized, correctAnswer)
}
}
}
}

Seq(Some('d === 'e), None).foreach { case innerJoinCond =>
Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, Cross).foreach { case innerJT =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases almost cover the above one, except the RightOuter join type. Can we merge them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I was wrong

test(s"$outerJT pushdown to left of join type: $innerJT join condition $innerJoinCond") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond)
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val correctAnswer =
pushedDownJoin.join(testRelation2, joinType = innerJT, innerJoinCond).analyze
comparePlans(optimized, correctAnswer)
}
}
}
}

Seq(Some('e === 'd), None).foreach { case innerJoinCond =>
Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown to right of join type: $innerJT join condition $innerJoinCond") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond)
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val correctAnswer =
testRelation1.join(pushedDownJoin, joinType = innerJT, innerJoinCond).analyze
comparePlans(optimized, correctAnswer)
}
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown - join condition refers left leg - join type for RightOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'd))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown - join condition refers right leg - join type for LeftOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = LeftOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT no pushdown - join condition refers both leg - join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery = joinedRelation
.join(testRelation, joinType = outerJT, condition = Some('a === 'd && 'a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT no pushdown - join condition refers none of the leg - join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery = joinedRelation
.join(testRelation, joinType = outerJT, condition = Some('d + 'e === 'a))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown when child join type is FullOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = FullOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

}