Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement grouped product scan #15254

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ add_library(
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_product_scan.cu
src/groupby/sort/group_rank_scan.cu
src/groupby/sort/group_replace_nulls.cu
src/groupby/sort/group_sum_scan.cu
Expand Down
1 change: 1 addition & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class sum_aggregation final : public rolling_aggregation,
* @brief Derived class for specifying a product aggregation
*/
class product_aggregation final : public groupby_aggregation,
public groupby_scan_aggregation,
public reduce_aggregation,
public scan_aggregation,
public segmented_reduce_aggregation {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -429,6 +429,8 @@ std::unique_ptr<Base> make_product_aggregation()
}
template std::unique_ptr<aggregation> make_product_aggregation<aggregation>();
template std::unique_ptr<groupby_aggregation> make_product_aggregation<groupby_aggregation>();
template std::unique_ptr<groupby_scan_aggregation>
make_product_aggregation<groupby_scan_aggregation>();
template std::unique_ptr<reduce_aggregation> make_product_aggregation<reduce_aggregation>();
template std::unique_ptr<scan_aggregation> make_product_aggregation<scan_aggregation>();
template std::unique_ptr<segmented_reduce_aggregation>
Expand Down
41 changes: 41 additions & 0 deletions cpp/src/groupby/sort/group_product_scan.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "groupby/sort/group_scan_util.cuh"

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace groupby {
namespace detail {
std::unique_ptr<column> product_scan(column_view const& values,
size_type num_groups,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
group_scan_dispatcher<aggregation::PRODUCT>{},
values,
num_groups,
group_labels,
stream,
mr);
}

} // namespace detail
} // namespace groupby
} // namespace cudf
19 changes: 18 additions & 1 deletion cpp/src/groupby/sort/group_scan.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,23 @@ std::unique_ptr<column> sum_scan(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Internal API to calculate groupwise cumulative product
*
* Behaviour is undefined for signed integral types if any groupwise product overflows the type.
*
* @param values Grouped values to get product of
* @param num_groups Number of groups
* @param group_labels ID of group that the corresponding value belongs to
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> product_scan(column_view const& values,
size_type num_groups,
device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Internal API to calculate groupwise cumulative minimum value
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/groupby/sort/group_scan_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ static constexpr bool is_group_scan_supported()
{
if (K == aggregation::SUM)
return cudf::is_numeric<T>() || cudf::is_duration<T>() || cudf::is_fixed_point<T>();
else if (K == aggregation::PRODUCT)
return cudf::is_numeric<T>();
else if (K == aggregation::MIN or K == aggregation::MAX)
return not cudf::is_dictionary<T>() and
(is_relationally_comparable<T, T>() or std::is_same_v<T, cudf::struct_view>);
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ void scan_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
void scan_result_functor::operator()<aggregation::PRODUCT>(aggregation const& agg)
{
if (cache.has_result(values, agg)) return;

cache.add_result(
values,
agg,
detail::product_scan(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
void scan_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
{
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ ConfigureTest(
groupby/min_scan_tests.cpp
groupby/nth_element_tests.cpp
groupby/nunique_tests.cpp
groupby/product_scan_tests.cpp
groupby/product_tests.cpp
groupby/quantile_tests.cpp
groupby/rank_scan_tests.cpp
Expand Down
144 changes: 144 additions & 0 deletions cpp/tests/groupby/product_scan_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <tests/groupby/groupby_test_util.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/detail/aggregation/aggregation.hpp>

using key_wrapper = cudf::test::fixed_width_column_wrapper<int32_t>;

template <typename T>
struct groupby_product_scan_test : public cudf::test::BaseFixture {
using V = T;
using R = cudf::detail::target_type_t<V, cudf::aggregation::PRODUCT>;
using value_wrapper = cudf::test::fixed_width_column_wrapper<V, int32_t>;
using result_wrapper = cudf::test::fixed_width_column_wrapper<R, int32_t>;
};

using supported_types =
cudf::test::Concat<cudf::test::Types<int8_t, int16_t, int32_t, int64_t, float, double>>;

TYPED_TEST_SUITE(groupby_product_scan_test, supported_types);

TYPED_TEST(groupby_product_scan_test, basic)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys {1, 2, 3, 1, 2, 2, 1, 3, 3, 2};
value_wrapper vals{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

key_wrapper expect_keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
// {0, 3, 6, 1, 4, 5, 9, 2, 7, 8}
result_wrapper expect_vals{0, 0, 0, 1, 4, 20, 180, 2, 14, 112};
// clang-format on
auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, pre_sorted)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
value_wrapper vals{0, 3, 6, 1, 4, 5, 9, 2, 7, 8};

key_wrapper expect_keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
result_wrapper expect_vals{0, 0, 0, 1, 4, 20, 180, 2, 14, 112};
// clang-format on

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys,
vals,
expect_keys,
expect_vals,
std::move(agg),
cudf::null_policy::EXCLUDE,
cudf::sorted::YES);
}

TYPED_TEST(groupby_product_scan_test, empty_cols)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys{};
value_wrapper vals{};

key_wrapper expect_keys{};
result_wrapper expect_vals{};
// clang-format on
wence- marked this conversation as resolved.
Show resolved Hide resolved

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, zero_valid_keys)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

key_wrapper keys({1, 2, 3}, cudf::test::iterators::all_nulls());
value_wrapper vals{3, 4, 5};
key_wrapper expect_keys{};
result_wrapper expect_vals{};

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, zero_valid_values)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

key_wrapper keys{1, 1, 1};
value_wrapper vals({3, 4, 5}, cudf::test::iterators::all_nulls());
key_wrapper expect_keys{1, 1, 1};
result_wrapper expect_vals({3, 4, 5}, cudf::test::iterators::all_nulls());

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, null_keys_and_values)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys( {1, 2, 3, 1, 2, 2, 1, 3, 3, 2, 4}, {1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1});
value_wrapper vals({0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 4}, {0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0});

// { 1, 1, 1, 2, 2, 2, 2, 3, *, 3, 4};
key_wrapper expect_keys( { 1, 1, 1, 2, 2, 2, 2, 3, 3, 4}, cudf::test::iterators::no_nulls());
// { -, 3, 6, 1, 4, -, 9, 2, _, 8, -}
result_wrapper expect_vals({-1, 3, 18, 1, 4, -1, 36, 2, 16, -1},
{ 0, 1, 1, 1, 1, 0, 1, 1, 1, 0});
// clang-format on

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class Aggregation:
cumsum = sum
cummin = min
cummax = max
cumprod = product

@classmethod
def rank(cls, method, ascending, na_option, pct):
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ cdef class GroupBy:
return columns_from_pylibcudf_table(replaced)


_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "rank"}
_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "cumprod", "rank"}


def _is_all_scan_aggregate(all_aggs):
Expand Down
4 changes: 3 additions & 1 deletion python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2319,7 +2319,9 @@ def test_groupby_unique(by, data, dtype):


@pytest.mark.parametrize("nelem", [2, 3, 100, 1000])
@pytest.mark.parametrize("func", ["cummin", "cummax", "cumcount", "cumsum"])
@pytest.mark.parametrize(
"func", ["cummin", "cummax", "cumcount", "cumsum", "cumprod"]
)
def test_groupby_2keys_scan(nelem, func):
pdf = make_frame(pd.DataFrame, nelem=nelem)
expect_df = pdf.groupby(["x", "y"], sort=True).agg(func)
Expand Down
Loading