-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16475][SQL] Broadcast Hint for SQL Queries #14426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #63048 has finished for PR 14426 at commit
|
|
Why creating a new pull request? All the discussions were in the other pull request. |
|
Oh, it's just because the previous PR page with 363 comments becomes too slow to view in my laptop. Since the most recent discussion was ended two days ago with implementations. I think it's safe and better for being review again here. I can move some summary of the previous discussion decision into here, too. |
|
Test build #63348 has finished for PR 14426 at commit
|
|
Test build #63639 has finished for PR 14426 at commit
|
|
Test build #63756 has finished for PR 14426 at commit
|
|
Hi, @cloud-fan . |
|
Test build #64042 has finished for PR 14426 at commit
|
|
Test build #64108 has finished for PR 14426 at commit
|
|
Resolve conflicts. |
|
Test build #64132 has finished for PR 14426 at commit
|
|
Can this PR support multiple JOINs( |
|
Oh, sorry for late response, @watermen . I missed your message. Yes. This supports multiple joins. |
|
Hi, @watermen . You can try that like this. scala> spark.range(1000000000).createOrReplaceTempView("t1")
scala> spark.range(1000000000).createOrReplaceTempView("t2")
scala> spark.range(1000000000).createOrReplaceTempView("t3")
scala> sql("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t1.id = t3.id").explain
== Physical Plan ==
*SortMergeJoin [id#0L], [id#8L], Inner
:- *SortMergeJoin [id#0L], [id#4L], Inner
: :- *Sort [id#0L ASC], false, 0
: : +- Exchange hashpartitioning(id#0L, 200)
: : +- *Range (0, 1000000000, splits=8)
: +- *Sort [id#4L ASC], false, 0
: +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200)
+- *Sort [id#8L ASC], false, 0
+- ReusedExchange [id#8L], Exchange hashpartitioning(id#0L, 200)
scala> sql("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t1.id = t3.id").explain
== Physical Plan ==
*SortMergeJoin [id#0L], [id#8L], Inner
:- *Sort [id#0L ASC], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *BroadcastHashJoin [id#0L], [id#4L], Inner, BuildLeft
: :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: : +- *Range (0, 1000000000, splits=8)
: +- *Range (0, 1000000000, splits=8)
+- *Sort [id#8L ASC], false, 0
+- Exchange hashpartitioning(id#8L, 200)
+- *Range (0, 1000000000, splits=8)
scala> sql("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t1.id = t3.id").explain
== Physical Plan ==
*SortMergeJoin [id#0L], [id#8L], Inner
:- *Sort [id#0L ASC], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
: :- *Range (0, 1000000000, splits=8)
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
: +- *Range (0, 1000000000, splits=8)
+- *Sort [id#8L ASC], false, 0
+- Exchange hashpartitioning(id#8L, 200)
+- *Range (0, 1000000000, splits=8)
scala> sql("SELECT /*+ MAPJOIN(t3) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t1.id = t3.id").explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#8L], Inner, BuildRight
:- *SortMergeJoin [id#0L], [id#4L], Inner
: :- *Sort [id#0L ASC], false, 0
: : +- Exchange hashpartitioning(id#0L, 200)
: : +- *Range (0, 1000000000, splits=8)
: +- *Sort [id#4L ASC], false, 0
: +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1000000000, splits=8) |
|
Test build #64360 has finished for PR 14426 at commit
|
|
Test build #64529 has finished for PR 14426 at commit
|
|
Test build #64574 has finished for PR 14426 at commit
|
|
Test build #64875 has finished for PR 14426 at commit
|
|
Test build #64966 has finished for PR 14426 at commit
|
|
Test build #65023 has finished for PR 14426 at commit
|
|
Test build #65142 has finished for PR 14426 at commit
|
|
Test build #65249 has finished for PR 14426 at commit
|
|
Rebased to resolve conflicts. |
|
Test build #65432 has finished for PR 14426 at commit
|
|
Test build #65663 has finished for PR 14426 at commit
|
|
Test build #65927 has finished for PR 14426 at commit
|
|
Test build #66220 has finished for PR 14426 at commit
|
|
Hi, @rxin . |
|
Hi, @gatorsmile . |
|
Test build #66515 has finished for PR 14426 at commit
|
|
Rebased to resolve the conflicts. |
|
Test build #66546 has finished for PR 14426 at commit
|
|
Test build #66619 has finished for PR 14426 at commit
|
|
Test build #67189 has finished for PR 14426 at commit
|
|
Test build #67425 has finished for PR 14426 at commit
|
|
Resolve the conflicts. |
|
Test build #68088 has finished for PR 14426 at commit
|
|
Retest this please |
|
Test build #68092 has finished for PR 14426 at commit
|
|
Mind closing this for now? Let's open it once we've done the view canonicalization work. This pr will be much simpler. We should do both in 2.2. |
|
Thank you for guide. |
|
@dongjoon-hyun do you have time to update the pull request now the view canonicalization work is done? Basically we can remove all the SQL generation stuff. |
[SPARK-16475][SQL] Broadcast Hint for SQL Queries
|
Actually I have some time. I will submit a pr based on this. |
|
Oh. |
## What changes were proposed in this pull request? This pull request introduces a simple hint infrastructure to SQL and implements broadcast join hint using the infrastructure. The hint syntax looks like the following: ``` SELECT /*+ BROADCAST(t) */ * FROM t ``` For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of relation aliases can be specified in the hint. A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name. The hint resolution works by recursively traversing down the query plan to find a relation or subquery that matches one of the specified broadcast aliases. The traversal does not go past beyond any existing broadcast hints, subquery aliases. This rule happens before common table expressions. Note that there was an earlier patch in apache#14426. This is a rewrite of that patch, with different semantics and simpler test cases. ## How was this patch tested? Added a new unit test suite for the broadcast hint rule (SubstituteHintsSuite) and new test cases for parser change (in PlanParserSuite). Also added end-to-end test case in BroadcastSuite. Author: Reynold Xin <rxin@databricks.com> Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#16925 from rxin/SPARK-16475-broadcast-hint.
## What changes were proposed in this pull request? This pull request introduces a simple hint infrastructure to SQL and implements broadcast join hint using the infrastructure. The hint syntax looks like the following: ``` SELECT /*+ BROADCAST(t) */ * FROM t ``` For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of relation aliases can be specified in the hint. A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name. The hint resolution works by recursively traversing down the query plan to find a relation or subquery that matches one of the specified broadcast aliases. The traversal does not go past beyond any existing broadcast hints, subquery aliases. This rule happens before common table expressions. Note that there was an earlier patch in apache#14426. This is a rewrite of that patch, with different semantics and simpler test cases. ## How was this patch tested? Added a new unit test suite for the broadcast hint rule (SubstituteHintsSuite) and new test cases for parser change (in PlanParserSuite). Also added end-to-end test case in BroadcastSuite. Author: Reynold Xin <rxin@databricks.com> Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#16925 from rxin/SPARK-16475-broadcast-hint.
What changes were proposed in this pull request?
This PR aims to achieve the following two goals in Spark SQL.
1. Generic Hint Syntax
The generic hints are parsed and transformed into concrete hints by
SubstituteHintsof Analyzer. The unknown hints are removed, too. For example,Hint("MAPJOIN")is transformed intoBroadcastJoinand other hints are removed currently.Unlink Hive,
NEWMAPJOIN(t)is allowed for accepting new Spark Hints.2. Broadcast Hints
The followings are recognized. Technically, broadcast hints are matched
UnresolvedRelationto support HiveMetastoreRelation. The style ofdatabase_name.table_nameis not allowed in this PR.Examples
The many previous discussions on this issue are at #14132 .
How was this patch tested?
Pass the Jenkins tests with new testcases.