From a5f1f504118cdf57da0db92a46e13b9d87403080 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 9 Apr 2019 14:29:41 -0700 Subject: [PATCH 1/7] [SPARK-19712] Pushdown LeftSemi/LeftAnti through join --- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../optimizer/PushDownLeftSemiAntiJoin.scala | 103 ++++++++++++ .../LeftSemiAntiJoinPushDownSuite.scala | 154 +++++++++++++++++- 3 files changed, 257 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d0368becaeff..afdf61ef322d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -66,6 +66,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) PushPredicateThroughJoin, PushDownPredicate, PushDownLeftSemiAntiJoin, + PushLeftSemiLeftAntiThroughJoin, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index d91f262d75e8..c7cc8420c397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -159,3 +159,106 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { } } } + +/** + * This rule is a variant of [[PushPredicateThroughJoin]] which can handle + * pushing down Left semi and Left Anti joins below a join operator. The + * allowable join types are: + * 1) Inner + * 2) Cross + * 3) LeftOuter + * 4) RightOuter + */ +object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateHelper { + /** + * Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to + * the left leg or the right leg of the join. + */ + object pushdownDirection extends Enumeration { + val toRightBranch, toLeftBranch, none = Value + } + + /** + * LeftSemi/LeftAnti joins are pushed down when its left child is a join operator + * with a join type that is in the AllowedJoinTypes. + */ + object AllowedJoinTypes { + def unapply(joinType: JoinType): Option[JoinType] = joinType match { + case Inner | Cross | LeftOuter | RightOuter => Some(joinType) + case _ => None + } + } + + /** + * Determine which side of the join a LeftSemi/LeftAnti join can be pushed to. + */ + private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: Option[Expression]) = { + val left = leftChild.left + val right = leftChild.right + val joinType = leftChild.joinType + val rightOutput = rightChild.outputSet + + if (joinCond.nonEmpty) { + val noPushdown = (pushdownDirection.none, None) + val conditions = splitConjunctivePredicates(joinCond.get) + val (leftConditions, rest) = + conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) + val (rightConditions, commonConditions) = + rest.partition(_.references.subsetOf(right.outputSet ++ rightOutput)) + + if (rest.isEmpty && leftConditions.nonEmpty) { + // When the join conditions can be computed based on the left leg of + // leftsemi/anti join then push the leftsemi/anti join to the left side. + (pushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And)) + } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { + // When the join conditions can be computed based on the attributes from right leg of + // leftsemi/anti join then push the leftsemi/anti join to the right side. + (pushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And)) + } else { + noPushdown + } + } else { + /** + * When the join condition is empty, + * 1) if this is a left outer join or inner join, push leftsemi/anti join down + * to the left leg of join. + * 2) if a right outer join, to the right leg of join, + */ + val action = joinType match { + case RightOuter => + pushdownDirection.toRightBranch + case _: InnerLike | LeftOuter => + pushdownDirection.toLeftBranch + case _ => + pushdownDirection.none + } + (action, None) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // push LeftSemi/LeftAnti down into the join below + case j @ Join(left @ Join(gLeft, gRight, AllowedJoinTypes(_), belowJoinCond, childHint), + right, LeftSemiOrAnti(joinType), joinCond, parentHint) => + val belowJoinType = left.joinType + val (action, newJoinCond) = pushTo(left, right, joinCond) + + action match { + case pushdownDirection.toLeftBranch + if (belowJoinType == LeftOuter || belowJoinType.isInstanceOf[InnerLike]) => + // push down leftsemi/anti join to the left table + val newLeft = Join(gLeft, right, joinType, newJoinCond, parentHint) + Join(newLeft, gRight, belowJoinType, belowJoinCond, childHint) + case pushdownDirection.toRightBranch + if (belowJoinType == RightOuter || belowJoinType.isInstanceOf[InnerLike]) => + // push down leftsemi/anti join to the right table + val newRight = Join(gRight, right, joinType, newJoinCond, parentHint) + Join(gLeft, newRight, belowJoinType, belowJoinCond, childHint) + case _ => + // Do nothing + j + } + } +} + + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index 185568d334ce..a4879c57f384 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -37,13 +37,14 @@ class LeftSemiPushdownSuite extends PlanTest { CombineFilters, PushDownPredicate, PushDownLeftSemiAntiJoin, + PushLeftSemiLeftAntiThroughJoin, BooleanSimplification, CollapseProject) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - val testRelation1 = LocalRelation('d.int) + val testRelation2 = LocalRelation('e.int) test("Project: LeftSemiAnti join pushdown") { val originalQuery = testRelation @@ -314,4 +315,155 @@ class LeftSemiPushdownSuite extends PlanTest { val optimized = Optimize.execute(originalQuery.analyze) comparePlans(optimized, originalQuery.analyze) } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown with empty join condition join type $innerJT") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) + val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) + val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None) + comparePlans(optimized, correctAnswer) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case jt => + test(s"$jt pushdown with empty join condition join type RightOuter") { + val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None) + val originalQuery = joinedRelation.join(testRelation, joinType = jt, None) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = testRelation2.join(testRelation, joinType = jt, None) + val correctAnswer = testRelation1.join(pushedDownJoin, joinType = RightOuter, None) + comparePlans(optimized, correctAnswer) + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown with join condition referring to left leg of join type $innerJT") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None).analyze + comparePlans(optimized, correctAnswer) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown with outer and inner join condition for join type $innerJT") { + val joinedRelation = + testRelation1.join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val correctAnswer = pushedDownJoin + .join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) + .analyze + comparePlans(optimized, correctAnswer) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case jt => + test(s"$jt no pushdown - join condition refers left leg - join type for RightOuter") { + val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None) + val originalQuery = + joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'd)) + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, RightOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown with join condition referring to right leg - join type $innerJT") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val correctAnswer = testRelation1.join(pushedDownJoin, joinType = innerJT, None).analyze + comparePlans(optimized, correctAnswer) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, RightOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown with outer and inner join conditions for join type $innerJT") { + val joinedRelation = testRelation1. + join(testRelation2, joinType = innerJT, condition = Some('e === 'd)) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val correctAnswer = testRelation1. + join(pushedDownJoin, joinType = innerJT, condition = Some('e === 'd)) + .analyze + comparePlans(optimized, correctAnswer) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case jt => + test(s"$jt no pushdown - join condition refers right leg - join type for LeftOuter") { + val joinedRelation = testRelation1.join(testRelation2, joinType = LeftOuter, None) + val originalQuery = + joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT => + test(s"$outerJT no pushdown - join condition refers both leg - join type $innerJT") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) + val originalQuery = joinedRelation + .join(testRelation, joinType = outerJT, condition = Some('a === 'd && 'a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT => + test(s"$outerJT no pushdown - join condition refers none of the leg - join type $innerJT") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) + val originalQuery = joinedRelation + .join(testRelation, joinType = outerJT, condition = Some('d + 'e === 'a)) + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + } + } + + Seq(LeftSemi, LeftAnti).foreach { case jt => + test(s"$jt no pushdown when child join type is FullOuter") { + val joinedRelation = testRelation1.join(testRelation2, joinType = FullOuter, None) + val originalQuery = + joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + } + } From 14ed943d4e5dac537154e61d0cd94e7644858266 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 9 Apr 2019 14:40:59 -0700 Subject: [PATCH 2/7] style --- .../sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index a4879c57f384..b43a89f58af2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -357,7 +357,7 @@ class LeftSemiPushdownSuite extends PlanTest { } } } - + Seq(LeftSemi, LeftAnti).foreach { case outerJT => Seq(Inner, LeftOuter, Cross).foreach { case innerJT => test(s"$outerJT pushdown with outer and inner join condition for join type $innerJT") { From 813075fbefb891d02bde7f87b0557d8ca415acdd Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 11 Apr 2019 16:40:09 -0700 Subject: [PATCH 3/7] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 46 +++++++++---------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index c7cc8420c397..b0064223bb20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -174,17 +174,13 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH * Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to * the left leg or the right leg of the join. */ - object pushdownDirection extends Enumeration { + object PushdownDirection extends Enumeration { val toRightBranch, toLeftBranch, none = Value } - /** - * LeftSemi/LeftAnti joins are pushed down when its left child is a join operator - * with a join type that is in the AllowedJoinTypes. - */ - object AllowedJoinTypes { - def unapply(joinType: JoinType): Option[JoinType] = joinType match { - case Inner | Cross | LeftOuter | RightOuter => Some(joinType) + object AllowedJoins { + def unapply(join: Join): Option[Join] = join.joinType match { + case Inner | Cross | LeftOuter | RightOuter => Some(join) case _ => None } } @@ -199,7 +195,7 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH val rightOutput = rightChild.outputSet if (joinCond.nonEmpty) { - val noPushdown = (pushdownDirection.none, None) + val noPushdown = (PushdownDirection.none, None) val conditions = splitConjunctivePredicates(joinCond.get) val (leftConditions, rest) = conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) @@ -209,11 +205,11 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH if (rest.isEmpty && leftConditions.nonEmpty) { // When the join conditions can be computed based on the left leg of // leftsemi/anti join then push the leftsemi/anti join to the left side. - (pushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And)) + (PushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And)) } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { // When the join conditions can be computed based on the attributes from right leg of // leftsemi/anti join then push the leftsemi/anti join to the right side. - (pushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And)) + (PushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And)) } else { noPushdown } @@ -226,11 +222,11 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH */ val action = joinType match { case RightOuter => - pushdownDirection.toRightBranch + PushdownDirection.toRightBranch case _: InnerLike | LeftOuter => - pushdownDirection.toLeftBranch + PushdownDirection.toLeftBranch case _ => - pushdownDirection.none + PushdownDirection.none } (action, None) } @@ -238,22 +234,22 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH def apply(plan: LogicalPlan): LogicalPlan = plan transform { // push LeftSemi/LeftAnti down into the join below - case j @ Join(left @ Join(gLeft, gRight, AllowedJoinTypes(_), belowJoinCond, childHint), - right, LeftSemiOrAnti(joinType), joinCond, parentHint) => - val belowJoinType = left.joinType + case j @ Join(AllowedJoins(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) => + val (childJoinType, childLeft, childRight, childCondition, childHint) = + (left.joinType, left.left, left.right, left.condition, left.hint) val (action, newJoinCond) = pushTo(left, right, joinCond) action match { - case pushdownDirection.toLeftBranch - if (belowJoinType == LeftOuter || belowJoinType.isInstanceOf[InnerLike]) => + case PushdownDirection.toLeftBranch + if (childJoinType == LeftOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the left table - val newLeft = Join(gLeft, right, joinType, newJoinCond, parentHint) - Join(newLeft, gRight, belowJoinType, belowJoinCond, childHint) - case pushdownDirection.toRightBranch - if (belowJoinType == RightOuter || belowJoinType.isInstanceOf[InnerLike]) => + val newLeft = Join(childLeft, right, joinType, newJoinCond, parentHint) + Join(newLeft, childRight, childJoinType, childCondition, childHint) + case PushdownDirection.toRightBranch + if (childJoinType == RightOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the right table - val newRight = Join(gRight, right, joinType, newJoinCond, parentHint) - Join(gLeft, newRight, belowJoinType, belowJoinCond, childHint) + val newRight = Join(childRight, right, joinType, newJoinCond, parentHint) + Join(childLeft, newRight, childJoinType, childCondition, childHint) case _ => // Do nothing j From 9945c28927ba11b5c233020d2baa635f8e0b9c72 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 15 Apr 2019 09:09:53 -0700 Subject: [PATCH 4/7] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index b0064223bb20..638bb6923a3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -175,10 +175,10 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH * the left leg or the right leg of the join. */ object PushdownDirection extends Enumeration { - val toRightBranch, toLeftBranch, none = Value + val TO_LEFT_BRANCH, TO_RIGHT_BRANCH, NONE = Value } - object AllowedJoins { + object AllowedJoin { def unapply(join: Join): Option[Join] = join.joinType match { case Inner | Cross | LeftOuter | RightOuter => Some(join) case _ => None @@ -195,7 +195,7 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH val rightOutput = rightChild.outputSet if (joinCond.nonEmpty) { - val noPushdown = (PushdownDirection.none, None) + val noPushdown = (PushdownDirection.NONE, None) val conditions = splitConjunctivePredicates(joinCond.get) val (leftConditions, rest) = conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) @@ -205,11 +205,11 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH if (rest.isEmpty && leftConditions.nonEmpty) { // When the join conditions can be computed based on the left leg of // leftsemi/anti join then push the leftsemi/anti join to the left side. - (PushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And)) + (PushdownDirection.TO_LEFT_BRANCH, leftConditions.reduceLeftOption(And)) } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { // When the join conditions can be computed based on the attributes from right leg of // leftsemi/anti join then push the leftsemi/anti join to the right side. - (PushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And)) + (PushdownDirection.TO_RIGHT_BRANCH, rightConditions.reduceLeftOption(And)) } else { noPushdown } @@ -221,12 +221,12 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH * 2) if a right outer join, to the right leg of join, */ val action = joinType match { - case RightOuter => - PushdownDirection.toRightBranch case _: InnerLike | LeftOuter => - PushdownDirection.toLeftBranch + PushdownDirection.TO_LEFT_BRANCH + case RightOuter => + PushdownDirection.TO_RIGHT_BRANCH case _ => - PushdownDirection.none + PushdownDirection.NONE } (action, None) } @@ -234,18 +234,18 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH def apply(plan: LogicalPlan): LogicalPlan = plan transform { // push LeftSemi/LeftAnti down into the join below - case j @ Join(AllowedJoins(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) => + case j @ Join(AllowedJoin(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) => val (childJoinType, childLeft, childRight, childCondition, childHint) = (left.joinType, left.left, left.right, left.condition, left.hint) val (action, newJoinCond) = pushTo(left, right, joinCond) action match { - case PushdownDirection.toLeftBranch + case PushdownDirection.TO_LEFT_BRANCH if (childJoinType == LeftOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the left table val newLeft = Join(childLeft, right, joinType, newJoinCond, parentHint) Join(newLeft, childRight, childJoinType, childCondition, childHint) - case PushdownDirection.toRightBranch + case PushdownDirection.TO_RIGHT_BRANCH if (childJoinType == RightOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the right table val newRight = Join(childRight, right, joinType, newJoinCond, parentHint) From 4a5b4cac835a99e6a697f2dcb8e6aa33f34865c8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 15 Apr 2019 19:58:22 -0700 Subject: [PATCH 5/7] Code review - add todo --- .../sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 638bb6923a3a..0327867f838a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -168,6 +168,13 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { * 2) Cross * 3) LeftOuter * 4) RightOuter + * + * TODO: + * Currently this rule can push down the left semi or left anti joins to either + * left or right leg of the child join. This matches the behaviour of `PushPredicateThroughJoin` + * when the lefi semi or left anti join is in expression form. We need to explore the possibility + * to push the left semi/anti joins to both legs of join if the join condition refers to + * both left and right legs of the child join. */ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateHelper { /** From fe3e168ecdcbe201dab832f111df6c3d8c6a8241 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 16 Apr 2019 09:05:20 -0700 Subject: [PATCH 6/7] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 15 ++- .../LeftSemiAntiJoinPushDownSuite.scala | 115 ++++++------------ 2 files changed, 45 insertions(+), 85 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 0327867f838a..8a03d1458979 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -202,7 +202,7 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH val rightOutput = rightChild.outputSet if (joinCond.nonEmpty) { - val noPushdown = (PushdownDirection.NONE, None) + val noPushdown = PushdownDirection.NONE val conditions = splitConjunctivePredicates(joinCond.get) val (leftConditions, rest) = conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) @@ -212,11 +212,11 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH if (rest.isEmpty && leftConditions.nonEmpty) { // When the join conditions can be computed based on the left leg of // leftsemi/anti join then push the leftsemi/anti join to the left side. - (PushdownDirection.TO_LEFT_BRANCH, leftConditions.reduceLeftOption(And)) + (PushdownDirection.TO_LEFT_BRANCH) } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { // When the join conditions can be computed based on the attributes from right leg of // leftsemi/anti join then push the leftsemi/anti join to the right side. - (PushdownDirection.TO_RIGHT_BRANCH, rightConditions.reduceLeftOption(And)) + (PushdownDirection.TO_RIGHT_BRANCH) } else { noPushdown } @@ -227,7 +227,7 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH * to the left leg of join. * 2) if a right outer join, to the right leg of join, */ - val action = joinType match { + joinType match { case _: InnerLike | LeftOuter => PushdownDirection.TO_LEFT_BRANCH case RightOuter => @@ -235,7 +235,6 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH case _ => PushdownDirection.NONE } - (action, None) } } @@ -244,18 +243,18 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH case j @ Join(AllowedJoin(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) => val (childJoinType, childLeft, childRight, childCondition, childHint) = (left.joinType, left.left, left.right, left.condition, left.hint) - val (action, newJoinCond) = pushTo(left, right, joinCond) + val action = pushTo(left, right, joinCond) action match { case PushdownDirection.TO_LEFT_BRANCH if (childJoinType == LeftOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the left table - val newLeft = Join(childLeft, right, joinType, newJoinCond, parentHint) + val newLeft = Join(childLeft, right, joinType, joinCond, parentHint) Join(newLeft, childRight, childJoinType, childCondition, childHint) case PushdownDirection.TO_RIGHT_BRANCH if (childJoinType == RightOuter || childJoinType.isInstanceOf[InnerLike]) => // push down leftsemi/anti join to the right table - val newRight = Join(childRight, right, joinType, newJoinCond, parentHint) + val newRight = Join(childRight, right, joinType, joinCond, parentHint) Join(childLeft, newRight, childJoinType, childCondition, childHint) case _ => // Do nothing diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index b43a89f58af2..35638e084ba0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -317,62 +317,58 @@ class LeftSemiPushdownSuite extends PlanTest { } Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, LeftOuter, Cross).foreach { case innerJT => + Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT => test(s"$outerJT pushdown with empty join condition join type $innerJT") { val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None) val optimized = Optimize.execute(originalQuery.analyze) - val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) - val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None) + val correctAnswer = if (innerJT == RightOuter) { + val pushedDownJoin = testRelation2.join(testRelation, joinType = outerJT, None) + testRelation1.join(pushedDownJoin, joinType = innerJT, None) + } else { + val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) + pushedDownJoin.join(testRelation2, joinType = innerJT, None) + } comparePlans(optimized, correctAnswer) } } } - Seq(LeftSemi, LeftAnti).foreach { case jt => - test(s"$jt pushdown with empty join condition join type RightOuter") { - val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None) - val originalQuery = joinedRelation.join(testRelation, joinType = jt, None) - val optimized = Optimize.execute(originalQuery.analyze) - - val pushedDownJoin = testRelation2.join(testRelation, joinType = jt, None) - val correctAnswer = testRelation1.join(pushedDownJoin, joinType = RightOuter, None) - comparePlans(optimized, correctAnswer) - } - } - - Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, LeftOuter, Cross).foreach { case innerJT => - test(s"$outerJT pushdown with join condition referring to left leg of join type $innerJT") { - val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) - val originalQuery = - joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) - val optimized = Optimize.execute(originalQuery.analyze) - - val pushedDownJoin = - testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) - val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None).analyze - comparePlans(optimized, correctAnswer) + Seq(Some('d === 'e), None).foreach { case innerJoinCond => + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown to left of join type: $innerJT join condition $innerJoinCond") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) + val correctAnswer = + pushedDownJoin.join(testRelation2, joinType = innerJT, innerJoinCond).analyze + comparePlans(optimized, correctAnswer) + } } } } - Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, LeftOuter, Cross).foreach { case innerJT => - test(s"$outerJT pushdown with outer and inner join condition for join type $innerJT") { - val joinedRelation = - testRelation1.join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) - val originalQuery = - joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) - val optimized = Optimize.execute(originalQuery.analyze) - - val pushedDownJoin = - testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) - val correctAnswer = pushedDownJoin - .join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) - .analyze - comparePlans(optimized, correctAnswer) + Seq(Some('e === 'd), None).foreach { case innerJoinCond => + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, RightOuter, Cross).foreach { case innerJT => + test(s"$outerJT pushdown to right of join type: $innerJT join condition $innerJoinCond") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond) + val originalQuery = + joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val optimized = Optimize.execute(originalQuery.analyze) + + val pushedDownJoin = + testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) + val correctAnswer = + testRelation1.join(pushedDownJoin, joinType = innerJT, innerJoinCond).analyze + comparePlans(optimized, correctAnswer) + } } } } @@ -387,41 +383,6 @@ class LeftSemiPushdownSuite extends PlanTest { } } - Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, RightOuter, Cross).foreach { case innerJT => - test(s"$outerJT pushdown with join condition referring to right leg - join type $innerJT") { - val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) - val originalQuery = - joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) - val optimized = Optimize.execute(originalQuery.analyze) - - val pushedDownJoin = - testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) - val correctAnswer = testRelation1.join(pushedDownJoin, joinType = innerJT, None).analyze - comparePlans(optimized, correctAnswer) - } - } - } - - Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, RightOuter, Cross).foreach { case innerJT => - test(s"$outerJT pushdown with outer and inner join conditions for join type $innerJT") { - val joinedRelation = testRelation1. - join(testRelation2, joinType = innerJT, condition = Some('e === 'd)) - val originalQuery = - joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) - val optimized = Optimize.execute(originalQuery.analyze) - - val pushedDownJoin = - testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) - val correctAnswer = testRelation1. - join(pushedDownJoin, joinType = innerJT, condition = Some('e === 'd)) - .analyze - comparePlans(optimized, correctAnswer) - } - } - } - Seq(LeftSemi, LeftAnti).foreach { case jt => test(s"$jt no pushdown - join condition refers right leg - join type for LeftOuter") { val joinedRelation = testRelation1.join(testRelation2, joinType = LeftOuter, None) From a5ed3d9f9fbe4081566a1b20c02a433c75edf2b9 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 16 Apr 2019 21:22:47 -0700 Subject: [PATCH 7/7] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 7 ++--- .../LeftSemiAntiJoinPushDownSuite.scala | 28 ++++++++++--------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 8a03d1458979..0c389000dd32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -202,7 +202,6 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH val rightOutput = rightChild.outputSet if (joinCond.nonEmpty) { - val noPushdown = PushdownDirection.NONE val conditions = splitConjunctivePredicates(joinCond.get) val (leftConditions, rest) = conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) @@ -212,13 +211,13 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH if (rest.isEmpty && leftConditions.nonEmpty) { // When the join conditions can be computed based on the left leg of // leftsemi/anti join then push the leftsemi/anti join to the left side. - (PushdownDirection.TO_LEFT_BRANCH) + PushdownDirection.TO_LEFT_BRANCH } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { // When the join conditions can be computed based on the attributes from right leg of // leftsemi/anti join then push the leftsemi/anti join to the right side. - (PushdownDirection.TO_RIGHT_BRANCH) + PushdownDirection.TO_RIGHT_BRANCH } else { - noPushdown + PushdownDirection.NONE } } else { /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index 35638e084ba0..00709adaa7c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -316,21 +316,23 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } - Seq(LeftSemi, LeftAnti).foreach { case outerJT => - Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT => - test(s"$outerJT pushdown with empty join condition join type $innerJT") { - val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) - val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None) - val optimized = Optimize.execute(originalQuery.analyze) + Seq(Some('d === 'e), None).foreach { case innerJoinCond => + Seq(LeftSemi, LeftAnti).foreach { case outerJT => + Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT => + test(s"$outerJT pushdown empty join cond join type $innerJT join cond $innerJoinCond") { + val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond) + val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None) + val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = if (innerJT == RightOuter) { - val pushedDownJoin = testRelation2.join(testRelation, joinType = outerJT, None) - testRelation1.join(pushedDownJoin, joinType = innerJT, None) - } else { - val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) - pushedDownJoin.join(testRelation2, joinType = innerJT, None) + val correctAnswer = if (innerJT == RightOuter) { + val pushedDownJoin = testRelation2.join(testRelation, joinType = outerJT, None) + testRelation1.join(pushedDownJoin, joinType = innerJT, innerJoinCond).analyze + } else { + val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) + pushedDownJoin.join(testRelation2, joinType = innerJT, innerJoinCond).analyze + } + comparePlans(optimized, correctAnswer) } - comparePlans(optimized, correctAnswer) } } }