-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19122][SQL] Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and sorting order #16985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
Test build #73112 has finished for PR 16985 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ugly but I can't think of a better way. The problem is: I want to mutate this ordering at some point in the query planning. I cannot do that when SortMergeJoinExec object is generated because there wont be ample information available at that time.
I tried to add class attributes which would be altered and don't mutate this. Doing that, I saw that that tasks on executor do not see the updated values of the local class attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What information are you missing? The SortMergeExec is replaced after each planning iteration.
I would prefer that we use a lazy val here instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell : I had tried that but for some class of queries that didn't work. When I try to get the outputPartitioning for a SMB node, in case of inner-join it is PartioniningCollection. Now one of its children can have a ReusedExchange which is yet to be resolved but if the other child is resolved, then this check fails.
Example query:
SELECT a.i, b.i, c.i
FROM mytable a, mytable b, mytable c
where a.i = b.i and a.i = c.i
Example query plan:
:- *SortMergeJoin [i#8], [i#9], Inner
: :- *Sort [i#8 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(i#8, 200)
: : +- *Project [i#8]
: : +- *Filter isnotnull(i#8)
: : +- *FileScan orc default.one_column[i#8] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/one_column], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>
: +- *Sort [i#9 ASC NULLS FIRST], false, 0
: +- ReusedExchange [i#9], Exchange hashpartitioning(i#8, 200)
Just to be on the same page, sharing what I had tried (will update the PR with the change anyways. I know that there would be some unit tests which would fail):
case class SortMergeJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
....) {
lazy val (reorderedLeftKeys, reorderedRightKeys) = {
def reorder(
expectedOrderOfKeys: Seq[Expression],
currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
val leftKeysBuffer = ArrayBuffer[Expression]()
val rightKeysBuffer = ArrayBuffer[Expression]()
expectedOrderOfKeys.foreach(expression => {
val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
leftKeysBuffer.append(leftKeys(index))
rightKeysBuffer.append(rightKeys(index))
})
(leftKeysBuffer, rightKeysBuffer)
}
left.outputPartitioning match {
case HashPartitioning(leftExpressions, _)
if leftExpressions.length == leftKeys.length &&
leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) =>
reorder(leftExpressions, leftKeys)
case _ => right.outputPartitioning match {
case HashPartitioning(rightExpressions, _)
if rightExpressions.length == rightKeys.length &&
rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) =>
reorder(rightExpressions, rightKeys)
case _ => (leftKeys, rightKeys)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell : ping !!! As expected, that doesn't work in all cases and few unit test are failing. I would go back to my original version if you have don't any other idea(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tejasapatil I see your point. Let me think of another way.
|
@hvanhovell : This is as per our discussion in the jira : https://issues.apache.org/jira/browse/SPARK-19122 |
|
Test build #73119 has finished for PR 16985 at commit
|
|
Test build #73121 has finished for PR 16985 at commit
|
7f3ea36 to
41aa767
Compare
|
Test build #73221 has finished for PR 16985 at commit
|
41aa767 to
ba5119b
Compare
|
Jenkins test this please |
|
Test build #74556 has finished for PR 16985 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell : continuing our discussion at #16985 (comment) :
I found that ReusedExchangeExec does not use child's partitioning which was causing the tests to fail. This version of the PR is with your suggestion + works in all cases (all unit tests are passing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
ba5119b to
7cbf0d0
Compare
|
Test build #75319 has finished for PR 16985 at commit
|
|
@hvanhovell @cloud-fan @gatorsmile can anyone please review this PR ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it fixed by #17339 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at #17339 and its doing something orthogonal to whats done here.
#17339 is ensuring that the join outputs' sort ordering has attributes from both relations.
This PR is ensuring that the order of join kets (in both distribution and sort order) is not blindly picked from the order of occurrence in the query string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is test, I think it's ok to always return the result
7cbf0d0 to
b2a6f1c
Compare
|
Jenkins test this please |
|
Test build #76593 has finished for PR 16985 at commit
|
|
Jenkins test this please |
|
Test build #76601 has finished for PR 16985 at commit
|
b2a6f1c to
e202ac1
Compare
|
shall we introduce a physical optimizer rule which reorders join predicates based on |
|
Test build #76614 has finished for PR 16985 at commit
|
e202ac1 to
bd1aa6d
Compare
|
Test build #76685 has finished for PR 16985 at commit
|
4f76bd0 to
72325b8
Compare
|
Test build #77836 has started for PR 16985 at commit |
|
Jenkins test this please |
|
Test build #77845 has finished for PR 16985 at commit
|
|
@cloud-fan : ping |
|
Sorry for the delay, pretty busy this month... So a summary of this patch: order does matter for hash partitioning keys, but doesn't matter for join keys. So we can reorder the join keys to match the child hash partitioning to avoid shuffle. My last concern is #16985 (comment) . Ideally it is a valid case I think, but we may have some problems in the implementation. Can we investigate it? |
…ordering differ from bucketing and sorting order
… on child.outputOrdering and outputPartitioning
72325b8 to
b295093
Compare
|
@cloud-fan : I was on a long vacation for a quite sometime so couldn't get to this. Wrt to the concern you had, I have replied to that discussion in the PR : https://github.com/apache/spark/pull/16985/files#r132620278 along with a test case which covers the scenario you had mentioned. |
|
BTW: The "summary of this patch" in your comment accurately captures what this PR is doing. |
|
retest this please |
| } | ||
|
|
||
| // Irrespective of the ordering of keys in the join predicate, the query plan and | ||
| // query results should always be the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to test this here, this is an existing property of join implementation in Spark SQL, and should have already been well tested. Then we don't need to change testBucketing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed validation of query result
|
LGTM except one comment for the test |
|
jenkins test this please |
|
Test build #80541 has finished for PR 16985 at commit
|
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-19122
leftKeysandrightKeysinSortMergeJoinExecare altered based on the ordering of join keys in the child'soutputPartitioning. This is done everytimerequiredChildDistributionis invoked during query planning.How was this patch tested?