-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19712][SQL] Pushdown LeftSemi/LeftAnti below join #24331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19712][SQL] Pushdown LeftSemi/LeftAnti below join #24331
Conversation
|
Test build #104453 has finished for PR 24331 at commit
|
|
Test build #104456 has finished for PR 24331 at commit
|
|
cc @cloud-fan |
| case pushdownDirection.toLeftBranch | ||
| if (belowJoinType == LeftOuter || belowJoinType.isInstanceOf[InnerLike]) => | ||
| // push down leftsemi/anti join to the left table | ||
| val newLeft = Join(gLeft, right, joinType, newJoinCond, parentHint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I have a question here. Will it be safe to propagate the hints here ? I am inclined to only do this optimization if no join hints are specified in either parent and child joins. Currently i am propagating them as is but thinking of changing it. Wanted to check your opinion before i made the change.
| * Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to | ||
| * the left leg or the right leg of the join. | ||
| */ | ||
| object pushdownDirection extends Enumeration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we upper case the first letter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sure.
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| // push LeftSemi/LeftAnti down into the join below | ||
| case j @ Join(left @ Join(gLeft, gRight, AllowedJoinTypes(_), belowJoinCond, childHint), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is hard to read when 2 joins are extracted together. How about
case j: Join(AllowedJoin(left), right, ...) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Good idea wenchen.
| val newRight = Join(gRight, right, joinType, newJoinCond, parentHint) | ||
| Join(gLeft, newRight, belowJoinType, belowJoinCond, childHint) | ||
| case _ => | ||
| // Do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when can this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan When we decide that we can't pushdown the parent join. For example this test should exercise the default case.
| // leftsemi/anti join then push the leftsemi/anti join to the right side. | ||
| (pushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And)) | ||
| } else { | ||
| noPushdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushPredicateThroughJoin may pushdown to both sides, do we have such a case here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan To the best of my knowledge, we don't have this case. I actually tried to get a subquery to push down to both legs of the join but couldn't. Normal filter conditions can trigger pushing down to both legs currently though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a TODO and say we will revisit it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sure. I will add a TODO
|
Test build #104530 has finished for PR 24331 at commit
|
| * the left leg or the right leg of the join. | ||
| */ | ||
| object PushdownDirection extends Enumeration { | ||
| val toRightBranch, toLeftBranch, none = Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upper-case the first letter for them too. I think this is the policy when defining enums.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, the policy is upper-case all the letters, e.g.
object PushdownDirection extends Enumeration {
val TO_RIGHT_BRANCH, ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Will change.
| val toRightBranch, toLeftBranch, none = Value | ||
| } | ||
|
|
||
| object AllowedJoins { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it matches a single join, so no s at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan ok.. will make a change.
| } else { | ||
| /** | ||
| * When the join condition is empty, | ||
| * 1) if this is a left outer join or inner join, push leftsemi/anti join down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we push to both legs if it's inner join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Perhaps its possible. In this PR, i was focusing on what is happening today in PushPredicateThroughJoin and keep the behaviour same. We can look into improving both this rule and PushPredicateThroughJoin as follow-up. The reason i say it is, probably we need to test more and prove that pushdown to both sides don't create any side effects or can cause wrong results ?
| // When the join conditions can be computed based on the left leg of | ||
| // leftsemi/anti join then push the leftsemi/anti join to the left side. | ||
| (PushdownDirection.TO_LEFT_BRANCH, leftConditions.reduceLeftOption(And)) | ||
| } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if commonConditions is not empty? can we add a filter at top?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Please refer to my answer above.
|
Test build #104593 has finished for PR 24331 at commit
|
|
Test build #104612 has finished for PR 24331 at commit
|
|
retest this please |
| /** | ||
| * Determine which side of the join a LeftSemi/LeftAnti join can be pushed to. | ||
| */ | ||
| private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: Option[Expression]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we can only push down left semi/anti join to either left or right side, there is no need to return the new join condition in this method, because the join condition won't change.
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with empty join condition join type $innerJT") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the inner join can have a join condition, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it supports, we should add Seq(Some('d === 'e), None).foreach at the begining too
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case jt => | ||
| test(s"$jt pushdown with empty join condition join type RightOuter") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we merge the test case with above? something like
val correctAnswer = if (innerJT = RightOuter) {
...
} else {
...
}
|
Test build #104619 has finished for PR 24331 at commit
|
|
Test build #104627 has finished for PR 24331 at commit
|
| val rightOutput = rightChild.outputSet | ||
|
|
||
| if (joinCond.nonEmpty) { | ||
| val noPushdown = PushdownDirection.NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use PushdownDirection.NONE directly.
| if (rest.isEmpty && leftConditions.nonEmpty) { | ||
| // When the join conditions can be computed based on the left leg of | ||
| // leftsemi/anti join then push the leftsemi/anti join to the left side. | ||
| (PushdownDirection.TO_LEFT_BRANCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to add (...)
| } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { | ||
| // When the join conditions can be computed based on the attributes from right leg of | ||
| // leftsemi/anti join then push the leftsemi/anti join to the right side. | ||
| (PushdownDirection.TO_RIGHT_BRANCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| Seq(Some('d === 'e), None).foreach { case innerJoinCond => | ||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test cases almost cover the above one, except the RightOuter join type. Can we merge them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I was wrong
|
Test build #104642 has finished for PR 24331 at commit
|
|
retest this please |
|
Test build #104650 has finished for PR 24331 at commit
|
|
thanks, merging to master! |
|
@cloud-fan Thank you very much. |
What changes were proposed in this pull request?
This PR adds support for pushing down LeftSemi and LeftAnti joins below the Join operator.
This is a prerequisite work thats needed for the subsequent task of moving the subquery rewrites to the beginning of optimization phase.
The larger PR is here . This PR addresses the comment at link.
How was this patch tested?
Added tests under LeftSemiAntiJoinPushDownSuite.