-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEA][AUDIT][SPARK-37099][SQL] Introduce the group limit of Window for rank-based filter to optimize top-k computation #8208
Comments
I agree this looks like a big performance win. They end up doing more sorting of the data in earlier stages, but the GPU is really good at that. This does not look too difficult. It looks like we are going to do a regular window operation followed by a filter on the partial result. which then throws away the rank. We might be able to generalize this pattern for any running window aggregation that follows the pattern. Compute a window, and filter rows based off of the result. |
I had initially considered that this would need a CUDF component, to process the filter. I'm now thinking this might be doable directly in the plugin. We would need to hook up a I'll file a separate bug with details. In the first pass, I'm inclined to address the case where the entire group fits in memory. I'll consider larger groups in a follow-on. |
I see a few things that we can do here, each with different levels of performance improvement. In the simplest case we do a regular window operation like we do today and then add in a filter like @mythrocks suggested. Longer term it might be nice to see if we could combine the sort and do something like we do with TopN, TakeOrderedAndProject. In factWindowGroupLimitExec on RowNumber, without any partitions is TopN, with the sort order the same as the requiredChildOrdering. But that is the assumption that K is small enough that keeping all of it in memory is going to work out. |
Fixes #8208. This commit adds support for `WindowGroupLimitExec` to run on GPU. This optimization was added in Apache Spark 3.5, to reduce the number of rows that participate in shuffles, for queries that contain filters on the result of ranking functions. For example: ```sql SELECT foo, bar FROM ( SELECT foo, bar, RANK() OVER (PARTITION BY foo ORDER BY bar) AS rnk FROM mytable ) WHERE rnk < 10 ``` Such a query would require a shuffle to bring all rows in a window-group to be made available in the same task. In Spark 3.5, an optimization was added in [SPARK-37099](https://issues.apache.org/jira/browse/SPARK-37099) to take advantage of the `rnk < 10` predicate to reduce shuffle load. Specifically, since only 9 (i.e. 10-1) ranks participate in the window function, only those many rows need be shuffled into the task, per input batch. By pre-filtering rows that can't possibly satisfy the condition, the number of shuffled records can be reduced. The GPU implementation (i.e. `GpuWindowGroupLimitExec`) differs slightly from the CPU implementation, because it needs to execute on the entire input column batch. As a result, `GpuWindowGroupLimitExec` runs the rank scan on each input batch, and then filters out ranks that exceed the limit specified in the predicate (`rnk < 10`). After the shuffle, the `RANK()` is calculated again by `GpuRunningWindowExec`, to produce the final result. The current implementation addresses `RANK()` and `DENSE_RANK` window functions. Other ranking functions (like `ROW_NUMBER()`) can be added at a later date. Signed-off-by: MithunR <mythrocks@gmail.com>
This is a new window exec added in spark 3.5: WindowGroupLimitExec improves some TPCDS queries, specifically q67, when using rank-like functions where they are looking to reduce skew and shuffle writes.
apache/spark@0e8a20e6da
We should look at implementing something similarly for the GPU.
The text was updated successfully, but these errors were encountered: