-
Notifications
You must be signed in to change notification settings - Fork 916
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switch fully unbounded window functions to use aggregations #13727
Switch fully unbounded window functions to use aggregations #13727
Conversation
A fully unbounded window function (i.e. [unbounded_preceding, unbounded_following]) need not go through the window function machinery for execution. E.g. Consider the following: ```c++ auto grps = { 0, 0, 0, 0, 1, 1, 1, 1, 2, 2 }; auto vals = { 3, 1, 4, 2, 6, 7, 8, 5, 9, 0 }; ``` Running the `MIN` window function on the groups, over an `[UNBOUNDED, UNBOUNDED]` window should produce: ```c++ auto res = { 1, 1, 1, 1, 5, 5, 5, 5, 0, 0 }; ``` This result could more easily be achieved using a grouped `MIN` aggregation, and replicating each group's result for every entry in the group. This commit adds logic to detect fully unbounded windows, and use `groupby::aggregate()` (when one or more grouping keys are specified), or `reduce()` (when there are no grouping keys).
…y-unbounded-windows
A couple of tangential changes were necessitated for this one:
|
These failures aren't pertinent to this change. I'll work on the JNI piece of this, and come back to the CI failure. |
It turns out that the JNI side of unbounded row-based window functions wasn't wired properly: Unbounded windows were treated as "very large, finite windows". This was corrected, and an appropriate test was added. |
Running correctness tests in Spark right now, but it looks like there's reasonable speedup. A simulation of the user query ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java code looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice optimization. I have a few comments on its implementation, some of which might require a bit of separate work to prepare the pieces used by this PR.
@@ -33,7 +34,8 @@ void test_single_agg(cudf::column_view const& keys, | |||
cudf::sorted keys_are_sorted = cudf::sorted::NO, | |||
std::vector<cudf::order> const& column_order = {}, | |||
std::vector<cudf::null_order> const& null_precedence = {}, | |||
cudf::sorted reference_keys_are_sorted = cudf::sorted::NO); | |||
cudf::sorted reference_keys_are_sorted = cudf::sorted::NO, | |||
rmm::cuda_stream_view test_stream = cudf::get_default_stream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the other tests in https://github.com/rapidsai/cudf/tree/branch-23.08/cpp/tests/streams are using cudf::test::get_default_stream()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line#166 does use cudf::test::get_default_stream()
.
That said, I have moved the streams test to a separate file under tests/streams
. This test uses cudf::test::get_default_stream()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially the requirement for streams testing is that in every code path it must be possible to pass cudf::test::get_default_stream()
through. #13506 contains a number of changes to existing cudf test utilities where there is no input stream so the code had to be changed to always use cudf::test::get_default_stream
. Since this util is now supporting passing a stream it's fine for this to be cudf::get_default_stream()
.
That said, it would probably be easier to remove the stream parameter here and just always use cudf::test::get_default_stream()
internally in this util. cudf::test::get_default_stream()
is just an alias for cudf::get_default_stream()
under normal circumstances. It exists to provide a symbol that can be overridden to use a different stream when STREAM_MODE testing
is set in the CMake configuration for a particular test. Using cudf::test::get_default_stream()
will therefore "just work" in all cases.
1. New header for optimized_unbounded_window. 2. Removed static_cast from CUDF_FAIL. 3. [[fallthrough]] is switch case.
1. Moved utility functions to separate header. 2. Changed window_bounds::is_unbounded, and value to member functions.
Moved groupby stream tests into its own translation unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good optimization here. Would love to see more perf numbers if you have them (I see one comment indicating that one particular case went from 730 to 53 seconds?). A few minor suggestions for improvement, but in general the stream piece has been handled correctly now (looks like some changes were made after our last discussion). Regarding the detail reduction header I don't think that's critical, but it's a nice-to-have now that it's there.
@@ -33,7 +34,8 @@ void test_single_agg(cudf::column_view const& keys, | |||
cudf::sorted keys_are_sorted = cudf::sorted::NO, | |||
std::vector<cudf::order> const& column_order = {}, | |||
std::vector<cudf::null_order> const& null_precedence = {}, | |||
cudf::sorted reference_keys_are_sorted = cudf::sorted::NO); | |||
cudf::sorted reference_keys_are_sorted = cudf::sorted::NO, | |||
rmm::cuda_stream_view test_stream = cudf::get_default_stream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially the requirement for streams testing is that in every code path it must be possible to pass cudf::test::get_default_stream()
through. #13506 contains a number of changes to existing cudf test utilities where there is no input stream so the code had to be changed to always use cudf::test::get_default_stream
. Since this util is now supporting passing a stream it's fine for this to be cudf::get_default_stream()
.
That said, it would probably be easier to remove the stream parameter here and just always use cudf::test::get_default_stream()
internally in this util. cudf::test::get_default_stream()
is just an alias for cudf::get_default_stream()
under normal circumstances. It exists to provide a symbol that can be overridden to use a different stream when STREAM_MODE testing
is set in the CMake configuration for a particular test. Using cudf::test::get_default_stream()
will therefore "just work" in all cases.
1. Removed unnecessary headers. 2. Adjusted the use of cudf::test::get_default_stream() in groupby_test_util, and streams/groupby_test. 3. Documented aggregation_based_rolling_window() and reduction_based_rolling_window().
…y-unbounded-windows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mythrocks. Two comments for forward-looking changes, otherwise approving.
Follow-up to rapidsai/cudf#13727. This change addresses the slowness in window aggregations for windows defined as `[UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING]`. Before this change, unbounded row window bounds were interpreted as finite values, e.g. `[MAX_INT, MAX_INT]`. While this might be technically indistinguishable from a fully unbounded window, it causes the optimization in rapidsai/cudf/pull/13727 not to be triggered, because the window bounds are still finite. The change in this PR allows the plugin to detect unbounded windows, and mark them as such for `libcudf`. The `libcudf` window function primitives can then detect fully unbounded windows, and use a faster/optimized path for execution. Preliminary test results indicate that `[UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING]` window function computations over 1B rows and thousands of groups are sped up by a factor of 10-14x over the previous/naive GPU implementation. Signed-off-by: MithunR <mythrocks@gmail.com>
/merge |
This change has been merged. Thank you all for your reviews and advice. |
Follow-up to rapidsai/cudf#13727. This change addresses the slowness in window aggregations for windows defined as `[UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING]`. Before this change, unbounded row window bounds were interpreted as finite values, e.g. `[MAX_INT, MAX_INT]`. While this might be technically indistinguishable from a fully unbounded window, it causes the optimization in rapidsai/cudf/pull/13727 not to be triggered, because the window bounds are still finite. The change in this PR allows the plugin to detect unbounded windows, and mark them as such for `libcudf`. The `libcudf` window function primitives can then detect fully unbounded windows, and use a faster/optimized path for execution. Preliminary test results indicate that `[UNBOUNDED PRECEDING, UNBOUNDED FOLLOWING]` window function computations over 1B rows and thousands of groups are sped up by a factor of 10-14x over the previous/naive GPU implementation. Signed-off-by: MithunR <mythrocks@gmail.com>
Description
A fully unbounded window function (i.e. [unbounded_preceding, unbounded_following]) need not go through the window function machinery for execution. E.g. Consider the following:
Running the
MIN
window function on the groups, over an[UNBOUNDED, UNBOUNDED]
window should produce:This result could more easily be achieved using a grouped
MIN
aggregation, and replicating each group's result for every entry in the group.This commit adds logic to detect fully unbounded windows, and use
groupby::aggregate()
(when one or more grouping keys are specified), orreduce()
(when there are no grouping keys).Tangentially, this change also adds the following:
cudf::groupby::groupby::aggregate()
that takes astream
parameter.detail
header to declare the (pre-existing)cudf::reduction::detail::reduce()
function.Checklist