Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support group-limit optimization for ROW_NUMBER #11886

Merged
merged 2 commits into from
Dec 19, 2024

Conversation

mythrocks
Copy link
Collaborator

Fixes #10505.

This is a follow-up to #10500, which added support for WindowGroupLimit optimizations for RANK and DENSE_RANK window functions. The same optimization was not extended to ROW_NUMBER at that time.

This commit now allows the output from ROW_NUMBER to be filtered map-side, in case there is a < predicate on its return value. The following is an example of the kind of query that is affected by this change:

SELECT * FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY p ORDER BY o) AS rn
  FROM mytable )
WHERE rn < 10;

This is per the optimization in SPARK-37099 in Apache Spark. With this, the output from the window function could potentially be drastically smaller than the input, thus saving on shuffle traffic.

Note that this optimization does not kick in on Apache Spark or in spark-rapids if the ROW_NUMBER phrase does not include a PARTITION BY clause. spark-rapids remains consistent with Apache Spark in this regard.

Fixes NVIDIA#10505.

This is a follow-up to NVIDIA#10500, which added support for WindowGroupLimit optimizations for `RANK` and `DENSE_RANK` window functions.  The same optimization was not extended to `ROW_NUMBER` at that time.

This commit now allows the output from `ROW_NUMBER` to be filtered map-side, in case there is a `<` predicate on its return value.
The following is an example of the kind of query that is affected by this change:
```sql
SELECT * FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY p ORDER BY o) AS rn
  FROM mytable )
WHERE rn < 10;
```

This is per the optimization in [SPARK-37099](https://issues.apache.org/jira/browse/SPARK-37099) in Apache Spark.  With this, the output from the window function could potentially be drastically smaller than the input, thus saving on shuffle traffic.

Note that this optimization does not kick in on Apache Spark or in `spark-rapids` if the `ROW_NUMBER` phrase does not include a `PARTITION BY` clause.  `spark-rapids` remains consistent with Apache Spark in this regard.

Signed-off-by: MithunR <mithunr@nvidia.com>
@mythrocks mythrocks self-assigned this Dec 18, 2024
Signed-off-by: MithunR <mithunr@nvidia.com>
@mythrocks
Copy link
Collaborator Author

Note: I had initially thought that we'd need to route row_number to the window-function implementation in libcudf (since CUDF doesn't have a scan-based implementation for it). I had also thought that we'd need to eventually provide a cudf::scan_aggregation implementation for row_number in cudf::rolling_window.

It turns out that the former isn't required, thanks to @revans2's earlier windowing work in spark-rapids. (He simulates row-number by using a SUM scan_aggregation on a column of 1s.)
Also, the latter can wait, since this PR doesn't really require it. I'll take that up separately in CUDF.

@mythrocks
Copy link
Collaborator Author

Build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious if you ran any performance numbers on this?

@mythrocks
Copy link
Collaborator Author

Just curious if you ran any performance numbers on this?

I'm afraid I did run a couple of tests.

Before the optimization:

scala> spark.time( sql(" select * from ( select *, row_number() over (partition by gby order by oby) rn from parquet.`/home/mithunr/workspace/Demos/rownumber-grouplimits/test_repart_10/` order by 1,2,3 ) where rn < 4 ").show )
...
      !Exec <WindowGroupLimitExec> cannot run on GPU because Only Rank() and DenseRank() are currently supported for window group limits
...

+---+---+---+
|gby|oby| rn|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
|  2|  1|  1|
|  2|  1|  2|
|  2|  1|  3|
|  5|  1|  1|
|  5|  1|  2|
|  5|  1|  3|
+---+---+---+
Time taken: 6098 ms

After this change:

scala> spark.time( sql(" select * from ( select *, row_number() over (partition by gby order by oby) rn from parquet.`/home/mithunr/workspace/Demos/rownumber-grouplimits/test_repart_10/` order by 1,2,3 ) where rn < 4 ").show )
...
      *Expression <Alias> row_number() windowspecdefinition(gby#55, oby#56 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#53 will run on GPU
...
          *Expression <WindowSpecDefinition> windowspecdefinition(gby#55, oby#56 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) will run on GPU
...
+---+---+---+
|gby|oby| rn|
+---+---+---+
|  1|  1|  1|
|  1|  1|  2|
|  1|  1|  3|
|  2|  1|  1|
|  2|  1|  2|
|  2|  1|  3|
|  5|  1|  1|
|  5|  1|  2|
|  5|  1|  3|
+---+---+---+

Time taken: 2959 ms

So about 2x faster for row_number, for a 200M row dataset with only 3 group-by keys (i.e. gby). Not comprehensive, exactly, but I'm cautiously optimistic.

@mythrocks mythrocks merged commit 6244613 into NVIDIA:branch-25.02 Dec 19, 2024
50 checks passed
@mythrocks
Copy link
Collaborator Author

P.S. Thank you for the review, @revans2. I've merged this change.

@mythrocks mythrocks deleted the row-number-filter-pushdown branch December 19, 2024 22:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FOLLOW UP] Support row_number() filters for GpuWindowGroupLimitExec
2 participants