-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support group-limit optimization for ROW_NUMBER
#11886
Support group-limit optimization for ROW_NUMBER
#11886
Conversation
Fixes NVIDIA#10505. This is a follow-up to NVIDIA#10500, which added support for WindowGroupLimit optimizations for `RANK` and `DENSE_RANK` window functions. The same optimization was not extended to `ROW_NUMBER` at that time. This commit now allows the output from `ROW_NUMBER` to be filtered map-side, in case there is a `<` predicate on its return value. The following is an example of the kind of query that is affected by this change: ```sql SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY p ORDER BY o) AS rn FROM mytable ) WHERE rn < 10; ``` This is per the optimization in [SPARK-37099](https://issues.apache.org/jira/browse/SPARK-37099) in Apache Spark. With this, the output from the window function could potentially be drastically smaller than the input, thus saving on shuffle traffic. Note that this optimization does not kick in on Apache Spark or in `spark-rapids` if the `ROW_NUMBER` phrase does not include a `PARTITION BY` clause. `spark-rapids` remains consistent with Apache Spark in this regard. Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Note: I had initially thought that we'd need to route It turns out that the former isn't required, thanks to @revans2's earlier windowing work in |
Build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious if you ran any performance numbers on this?
I'm afraid I did run a couple of tests. Before the optimization: scala> spark.time( sql(" select * from ( select *, row_number() over (partition by gby order by oby) rn from parquet.`/home/mithunr/workspace/Demos/rownumber-grouplimits/test_repart_10/` order by 1,2,3 ) where rn < 4 ").show )
...
!Exec <WindowGroupLimitExec> cannot run on GPU because Only Rank() and DenseRank() are currently supported for window group limits
...
+---+---+---+
|gby|oby| rn|
+---+---+---+
| 1| 1| 1|
| 1| 1| 2|
| 1| 1| 3|
| 2| 1| 1|
| 2| 1| 2|
| 2| 1| 3|
| 5| 1| 1|
| 5| 1| 2|
| 5| 1| 3|
+---+---+---+
Time taken: 6098 ms After this change: scala> spark.time( sql(" select * from ( select *, row_number() over (partition by gby order by oby) rn from parquet.`/home/mithunr/workspace/Demos/rownumber-grouplimits/test_repart_10/` order by 1,2,3 ) where rn < 4 ").show )
...
*Expression <Alias> row_number() windowspecdefinition(gby#55, oby#56 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#53 will run on GPU
...
*Expression <WindowSpecDefinition> windowspecdefinition(gby#55, oby#56 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) will run on GPU
...
+---+---+---+
|gby|oby| rn|
+---+---+---+
| 1| 1| 1|
| 1| 1| 2|
| 1| 1| 3|
| 2| 1| 1|
| 2| 1| 2|
| 2| 1| 3|
| 5| 1| 1|
| 5| 1| 2|
| 5| 1| 3|
+---+---+---+
Time taken: 2959 ms So about 2x faster for |
P.S. Thank you for the review, @revans2. I've merged this change. |
Fixes #10505.
This is a follow-up to #10500, which added support for WindowGroupLimit optimizations for
RANK
andDENSE_RANK
window functions. The same optimization was not extended toROW_NUMBER
at that time.This commit now allows the output from
ROW_NUMBER
to be filtered map-side, in case there is a<
predicate on its return value. The following is an example of the kind of query that is affected by this change:This is per the optimization in SPARK-37099 in Apache Spark. With this, the output from the window function could potentially be drastically smaller than the input, thus saving on shuffle traffic.
Note that this optimization does not kick in on Apache Spark or in
spark-rapids
if theROW_NUMBER
phrase does not include aPARTITION BY
clause.spark-rapids
remains consistent with Apache Spark in this regard.