From f55d2e5f56d81ee56dcad4292cc57d3638b5353d Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Tue, 24 May 2022 17:27:46 -0700 Subject: [PATCH] Split update_v_frontier_from_outgoing_e to two simpler primitives (#2290) Close https://github.com/rapidsai/cugraph/issues/2003 - `Split update_v_frontier_from_outgoing_e` to `transform_reduce_v_frontier_outgoing_e_by_dst` and `update_v_frontier` - Previously `per_v` was used to specify both vertex-centric primitives (e.g. for each vertex, iterate over all the incoming or outgoing edges) AND reduction granularity of edge operation outputs (e.g. reduce edge operation outputs by keys associated with destinations). Updated to use `by_...` to specify reduction granularity. - Bug fix in `update_v_frontier_from_outgoing_e` when key_t == vertex_t && payload_t = void. - Other code-style updates. Authors: - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Kumar Aatish (https://github.com/kaatish) URL: https://github.com/rapidsai/cugraph/pull/2290 --- cpp/include/cugraph/detail/graph_utils.cuh | 19 - ...m_reduce_dst_key_aggregated_outgoing_e.cuh | 22 +- cpp/include/cugraph/prims/reduce_op.cuh | 14 +- ..._nbr_intersection_of_e_endpoints_by_v.cuh} | 9 +- ... => transform_reduce_e_by_src_dst_key.cuh} | 82 ++- ...m_reduce_v_frontier_outgoing_e_by_dst.cuh} | 450 +++++-------- .../cugraph/prims/update_v_frontier.cuh | 382 +++++++++++ .../cugraph/utilities/device_functors.cuh | 11 + cpp/src/community/louvain.cuh | 4 +- cpp/src/community/triangle_count_impl.cuh | 4 +- .../weakly_connected_components_impl.cuh | 59 +- cpp/src/cores/core_number_impl.cuh | 31 +- .../sampling/detail/sampling_utils_impl.cuh | 12 +- cpp/src/structure/coarsen_graph_impl.cuh | 11 +- cpp/src/structure/renumber_edgelist_impl.cuh | 19 +- cpp/src/traversal/bfs_impl.cuh | 76 ++- cpp/src/traversal/sssp_impl.cuh | 73 +- cpp/tests/CMakeLists.txt | 6 +- ...orm_reduce_v_frontier_outgoing_e_by_dst.cu | 621 ++++++++++++++++++ .../mg_update_v_frontier_from_outgoing_e.cu | 399 ----------- 20 files changed, 1405 insertions(+), 899 deletions(-) rename cpp/include/cugraph/prims/{per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints.cuh => transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh} (98%) rename cpp/include/cugraph/prims/{per_src_dst_key_transform_reduce_e.cuh => transform_reduce_e_by_src_dst_key.cuh} (93%) rename cpp/include/cugraph/prims/{update_v_frontier_from_outgoing_e.cuh => transform_reduce_v_frontier_outgoing_e_by_dst.cuh} (77%) create mode 100644 cpp/include/cugraph/prims/update_v_frontier.cuh create mode 100644 cpp/tests/prims/mg_transform_reduce_v_frontier_outgoing_e_by_dst.cu delete mode 100644 cpp/tests/prims/mg_update_v_frontier_from_outgoing_e.cu diff --git a/cpp/include/cugraph/detail/graph_utils.cuh b/cpp/include/cugraph/detail/graph_utils.cuh index 8fa3f0e59e4..f315c509fa7 100644 --- a/cpp/include/cugraph/detail/graph_utils.cuh +++ b/cpp/include/cugraph/detail/graph_utils.cuh @@ -95,24 +95,5 @@ struct compute_partition_id_from_edge_t { } }; -template -struct is_first_in_run_t { - vertex_t const* vertices{nullptr}; - __device__ bool operator()(size_t i) const - { - return (i == 0) || (vertices[i - 1] != vertices[i]); - } -}; - -template -struct is_first_in_run_pair_t { - vertex_t const* vertices0{nullptr}; - vertex_t const* vertices1{nullptr}; - __device__ bool operator()(size_t i) const - { - return (i == 0) || ((vertices0[i - 1] != vertices0[i]) || (vertices1[i - 1] != vertices1[i])); - } -}; - } // namespace detail } // namespace cugraph diff --git a/cpp/include/cugraph/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh b/cpp/include/cugraph/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh index a52ff22aa79..ddf579f93ed 100644 --- a/cpp/include/cugraph/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh +++ b/cpp/include/cugraph/prims/per_v_transform_reduce_dst_key_aggregated_outgoing_e.cuh @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -549,11 +550,11 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( key_pair_first + rx_majors.size(), rx_key_aggregated_edge_weights.begin()); } - auto num_uniques = thrust::count_if( - handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(rx_majors.size()), - detail::is_first_in_run_pair_t{rx_majors.data(), rx_minor_keys.data()}); + auto num_uniques = + thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(rx_majors.size()), + detail::is_first_in_run_t{key_pair_first}); tmp_majors.resize(num_uniques, handle.get_stream()); tmp_minor_keys.resize(tmp_majors.size(), handle.get_stream()); tmp_key_aggregated_edge_weights.resize(tmp_majors.size(), handle.get_stream()); @@ -652,10 +653,11 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( tmp_key_aggregated_edge_weights.shrink_to_fit(handle.get_stream()); { - auto num_uniques = thrust::count_if(handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(tmp_majors.size()), - detail::is_first_in_run_t{tmp_majors.data()}); + auto num_uniques = + thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(tmp_majors.size()), + detail::is_first_in_run_t{tmp_majors.data()}); rmm::device_uvector unique_majors(num_uniques, handle.get_stream()); auto reduced_e_op_result_buffer = allocate_dataframe_buffer(unique_majors.size(), handle.get_stream()); @@ -725,7 +727,7 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e( auto num_uniques = thrust::count_if(handle.get_thrust_policy(), thrust::make_counting_iterator(size_t{0}), thrust::make_counting_iterator(majors.size()), - detail::is_first_in_run_t{majors.data()}); + detail::is_first_in_run_t{majors.data()}); rmm::device_uvector unique_majors(num_uniques, handle.get_stream()); auto reduced_e_op_result_buffer = allocate_dataframe_buffer(unique_majors.size(), handle.get_stream()); diff --git a/cpp/include/cugraph/prims/reduce_op.cuh b/cpp/include/cugraph/prims/reduce_op.cuh index 83e3b325ff1..0ba8b42cc48 100644 --- a/cpp/include/cugraph/prims/reduce_op.cuh +++ b/cpp/include/cugraph/prims/reduce_op.cuh @@ -24,7 +24,7 @@ namespace cugraph { namespace reduce_op { // Guidance on writing a custom reduction operator. -// 1. It is advised to add a "using type = type_of_the_reduced_values" statement. +// 1. It is required to add an "using value_type = type_of_the_reduced_values" statement. // 2. A custom reduction operator MUST be side-effect free. We use thrust::reduce internally to // implement reductions in multiple primitives. The current (version 1.16) implementation of thrust // reduce rounds up the number of invocations based on the CUDA block size and discards the values @@ -47,14 +47,14 @@ namespace reduce_op { // in case there is no payload to reduce struct null { - using type = void; + using value_type = void; }; // Binary reduction operator selecting any of the two input arguments, T should be arithmetic types // or thrust tuple of arithmetic types. template struct any { - using type = T; + using value_type = T; static constexpr bool pure_function = true; // this can be called in any process __host__ __device__ T operator()(T const& lhs, T const& rhs) const { return lhs; } @@ -64,7 +64,7 @@ struct any { // operator <), T should be arithmetic types or thrust tuple of arithmetic types. template struct minimum { - using type = T; + using value_type = T; static constexpr bool pure_function = true; // this can be called in any process static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::MIN; inline static T const identity_element = max_identity_element(); @@ -79,7 +79,7 @@ struct minimum { // operator <), T should be arithmetic types or thrust tuple of arithmetic types. template struct maximum { - using type = T; + using value_type = T; static constexpr bool pure_function = true; // this can be called in any process static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::MAX; inline static T const identity_element = min_identity_element(); @@ -94,7 +94,7 @@ struct maximum { // tuple of arithmetic types. template struct plus { - using type = T; + using value_type = T; static constexpr bool pure_function = true; // this can be called in any process static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::SUM; inline static T const identity_element = T{}; @@ -117,7 +117,7 @@ template inline constexpr bool has_compatible_raft_comms_op_v = has_compatible_raft_comms_op::value; -template +template struct has_identity_element : std::false_type { }; diff --git a/cpp/include/cugraph/prims/per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints.cuh b/cpp/include/cugraph/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh similarity index 98% rename from cpp/include/cugraph/prims/per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints.cuh rename to cpp/include/cugraph/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh index b8c5911c3f9..37f1fb9092a 100644 --- a/cpp/include/cugraph/prims/per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -105,7 +106,7 @@ std::tuple, ValueBuffer> sort_and_reduce_by_vertic auto num_uniques = thrust::count_if(handle.get_thrust_policy(), thrust::make_counting_iterator(size_t{0}), thrust::make_counting_iterator(vertices.size()), - detail::is_first_in_run_t{vertices.data()}); + detail::is_first_in_run_t{vertices.data()}); rmm::device_uvector reduced_vertices(num_uniques, handle.get_stream()); auto reduced_value_buffer = allocate_dataframe_buffer(num_uniques, handle.get_stream()); thrust::reduce_by_key(handle.get_thrust_policy(), @@ -169,7 +170,7 @@ struct accumulate_vertex_property_t { * vertex; invoke a user-provided functor per intersection, and reduce the functor output * values (thrust::tuple of three values having the same type: one for source, one for destination, * and one for every vertex in the intersection) per-vertex. We may add - * per_v_transform_reduce_triplet_of_dst_nbr_intersection_of_e_endpoints in the future to allow + * transform_reduce_triplet_of_dst_nbr_intersection_of_e_endpoints_by_v in the future to allow * emitting different values for different vertices in the intersection of edge endpoints. This * function is inspired by thrust::transfrom_reduce(). * @@ -213,7 +214,7 @@ template -void per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints( +void transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v( raft::handle_t const& handle, GraphViewType const& graph_view, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, @@ -424,7 +425,7 @@ void per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints( thrust::count_if(handle.get_thrust_policy(), thrust::make_counting_iterator(size_t{0}), thrust::make_counting_iterator(merged_vertices.size()), - detail::is_first_in_run_t{merged_vertices.data()}); + detail::is_first_in_run_t{merged_vertices.data()}); rmm::device_uvector reduced_vertices(num_uniques, handle.get_stream()); auto reduced_value_buffer = allocate_dataframe_buffer(num_uniques, handle.get_stream()); thrust::reduce_by_key(handle.get_thrust_policy(), diff --git a/cpp/include/cugraph/prims/per_src_dst_key_transform_reduce_e.cuh b/cpp/include/cugraph/prims/transform_reduce_e_by_src_dst_key.cuh similarity index 93% rename from cpp/include/cugraph/prims/per_src_dst_key_transform_reduce_e.cuh rename to cpp/include/cugraph/prims/transform_reduce_e_by_src_dst_key.cuh index 63bea458963..f784d73c61e 100644 --- a/cpp/include/cugraph/prims/per_src_dst_key_transform_reduce_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_e_by_src_dst_key.cuh @@ -32,7 +32,7 @@ namespace cugraph { namespace detail { // FIXME: block size requires tuning -int32_t constexpr per_src_dst_key_transform_reduce_e_kernel_block_size = 128; +int32_t constexpr transform_reduce_e_by_src_dst_key_kernel_block_size = 128; template (major_range_first - edge_partition.major_range_first()); @@ -352,7 +352,7 @@ template std::tuple, decltype(allocate_dataframe_buffer(0, cudaStream_t{nullptr}))> -per_src_dst_key_transform_reduce_e( +transform_reduce_e_by_src_dst_key( raft::handle_t const& handle, GraphViewType const& graph_view, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, @@ -414,7 +414,7 @@ per_src_dst_key_transform_reduce_e( if ((*segment_offsets)[1] > 0) { raft::grid_1d_block_t update_grid( (*segment_offsets)[1], - detail::per_src_dst_key_transform_reduce_e_kernel_block_size, + detail::transform_reduce_e_by_src_dst_key_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_by_src_dst_key_high_degree <<>>( @@ -431,7 +431,7 @@ per_src_dst_key_transform_reduce_e( if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { raft::grid_1d_warp_t update_grid( (*segment_offsets)[2] - (*segment_offsets)[1], - detail::per_src_dst_key_transform_reduce_e_kernel_block_size, + detail::transform_reduce_e_by_src_dst_key_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_by_src_dst_key_mid_degree <<>>( @@ -448,7 +448,7 @@ per_src_dst_key_transform_reduce_e( if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { raft::grid_1d_thread_t update_grid( (*segment_offsets)[3] - (*segment_offsets)[2], - detail::per_src_dst_key_transform_reduce_e_kernel_block_size, + detail::transform_reduce_e_by_src_dst_key_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_by_src_dst_key_low_degree <<>>( @@ -466,7 +466,7 @@ per_src_dst_key_transform_reduce_e( (*(edge_partition.dcs_nzd_vertex_count()) > 0)) { raft::grid_1d_thread_t update_grid( *(edge_partition.dcs_nzd_vertex_count()), - detail::per_src_dst_key_transform_reduce_e_kernel_block_size, + detail::transform_reduce_e_by_src_dst_key_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_by_src_dst_key_hypersparse <<>>( @@ -482,7 +482,7 @@ per_src_dst_key_transform_reduce_e( } else { raft::grid_1d_thread_t update_grid( edge_partition.major_range_size(), - detail::per_src_dst_key_transform_reduce_e_kernel_block_size, + detail::transform_reduce_e_by_src_dst_key_kernel_block_size, handle.get_device_properties().maxGridSize[0]); detail::transform_reduce_by_src_dst_key_low_degree @@ -553,8 +553,6 @@ per_src_dst_key_transform_reduce_e( } // namespace detail -// FIXME: EdgeOp & VertexOp in update_frontier_v_push_if_out_nbr concatenates push inidicator or -// bucket idx with the value while EdgeOp here does not. This is inconsistent. Better be fixed. /** * @brief Iterate over the entire set of edges and reduce @p edge_op outputs to (key, value) pairs. * @@ -603,15 +601,14 @@ template -auto per_src_key_transform_reduce_e( - raft::handle_t const& handle, - GraphViewType const& graph_view, - EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, - EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, - EdgePartitionSrcKeyInputWrapper edge_partition_src_key_input, - EdgeOp e_op, - T init, - bool do_expensive_check = false) +auto transform_reduce_e_by_src_key(raft::handle_t const& handle, + GraphViewType const& graph_view, + EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, + EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, + EdgePartitionSrcKeyInputWrapper edge_partition_src_key_input, + EdgeOp e_op, + T init, + bool do_expensive_check = false) { static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic::value); static_assert(std::is_same(handle, - graph_view, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_src_key_input, - e_op, - init); + return detail::transform_reduce_e_by_src_dst_key(handle, + graph_view, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_src_key_input, + e_op, + init); } -// FIXME: EdgeOp & VertexOp in update_frontier_v_push_if_out_nbr concatenates push inidicator or -// bucket idx with the value while EdgeOp here does not. This is inconsistent. Better be fixed. /** * @brief Iterate over the entire set of edges and reduce @p edge_op outputs to (key, value) pairs. * @@ -681,15 +676,14 @@ template -auto per_dst_key_transform_reduce_e( - raft::handle_t const& handle, - GraphViewType const& graph_view, - EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, - EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, - EdgePartitionDstKeyInputWrapper edge_partition_dst_key_input, - EdgeOp e_op, - T init, - bool do_expensive_check = false) +auto transform_reduce_e_by_dst_key(raft::handle_t const& handle, + GraphViewType const& graph_view, + EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, + EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, + EdgePartitionDstKeyInputWrapper edge_partition_dst_key_input, + EdgeOp e_op, + T init, + bool do_expensive_check = false) { static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic::value); static_assert(std::is_same(handle, - graph_view, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_dst_key_input, - e_op, - init); + return detail::transform_reduce_e_by_src_dst_key(handle, + graph_view, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_dst_key_input, + e_op, + init); } } // namespace cugraph diff --git a/cpp/include/cugraph/prims/update_v_frontier_from_outgoing_e.cuh b/cpp/include/cugraph/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh similarity index 77% rename from cpp/include/cugraph/prims/update_v_frontier_from_outgoing_e.cuh rename to cpp/include/cugraph/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh index bad102d1203..7328a59e10e 100644 --- a/cpp/include/cugraph/prims/update_v_frontier_from_outgoing_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -57,8 +58,8 @@ namespace detail { int32_t constexpr update_v_frontier_from_outgoing_e_kernel_block_size = 512; -// we cannot use std::iterator_traits::value_type if Iterator is void* (reference to void -// is not allowed) +// we cannot use thrust::iterator_traits::value_type if Iterator is void* (reference to +// void is not allowed) template struct optional_payload_buffer_value_type_t; @@ -66,7 +67,7 @@ template struct optional_payload_buffer_value_type_t< PayloadIterator, std::enable_if_t>> { - using value = typename std::iterator_traits::value_type; + using value = typename thrust::iterator_traits::value_type; }; template @@ -76,18 +77,14 @@ struct optional_payload_buffer_value_type_t< using value = void; }; -// FIXME: to silence the spurious warning (missing return statement ...) due to the nvcc bug -// (https://stackoverflow.com/questions/64523302/cuda-missing-return-statement-at-end-of-non-void- -// function-in-constexpr-if-fun) -#if 1 template >* = nullptr> -std::byte allocate_optional_payload_buffer(size_t size, cudaStream_t stream) +std::byte allocate_optional_payload_buffer(size_t size, rmm::cuda_stream_view stream) { return std::byte{0}; // dummy } template >* = nullptr> -auto allocate_optional_payload_buffer(size_t size, cudaStream_t stream) +auto allocate_optional_payload_buffer(size_t size, rmm::cuda_stream_view stream) { return allocate_dataframe_buffer(size, stream); } @@ -101,84 +98,44 @@ void* get_optional_payload_buffer_begin(std::byte& optional_payload_buffer) template >* = nullptr> auto get_optional_payload_buffer_begin( std::add_lvalue_reference_t( - size_t{0}, cudaStream_t{nullptr}))> optional_payload_buffer) + size_t{0}, rmm::cuda_stream_view{}))> optional_payload_buffer) { return get_dataframe_buffer_begin(optional_payload_buffer); } -#else -auto allocate_optional_payload_buffer = [](size_t size, cudaStream_t stream) { - if constexpr (std::is_same_v) { - return std::byte{0}; // dummy - } else { - return allocate_dataframe_buffer(size, stream); - } -}; -auto get_optional_payload_buffer_begin = [](auto& optional_payload_buffer) { - if constexpr (std::is_same_v) { - return static_cast(nullptr); - } else { - return get_dataframe_buffer_begin(optional_payload_buffer); - } -}; -#endif +template >* = nullptr> +void resize_optional_payload_buffer(std::byte& optional_payload_buffer, + size_t new_buffer_size, + rmm::cuda_stream_view stream_view) +{ + return; +} -// FIXME: a temporary workaround for cudaErrorInvalidDeviceFunction error when device lambda is used -// in the else part in if constexpr else statement that involves device lambda -template -struct update_v_frontier_call_v_op_t { - VertexValueInputIterator vertex_value_input_first{}; - VertexValueOutputIterator vertex_value_output_first{}; - VertexOp v_op{}; - vertex_partition_device_view_t vertex_partition{}; - size_t invalid_bucket_idx; - - template - __device__ std::enable_if_t, uint8_t> operator()( - key_t key) const - { - auto v_offset = vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(key); - auto v_val = *(vertex_value_input_first + v_offset); - auto v_op_result = v_op(key, v_val); - if (v_op_result) { - *(vertex_value_output_first + v_offset) = thrust::get<1>(*v_op_result); - return static_cast(thrust::get<0>(*v_op_result)); - } else { - return std::numeric_limits::max(); - } - } +template >* = nullptr> +void resize_optional_payload_buffer( + std::add_lvalue_reference_t( + size_t{0}, rmm::cuda_stream_view{}))> optional_payload_buffer, + size_t new_buffer_size, + rmm::cuda_stream_view stream_view) +{ + return resize_dataframe_buffer(optional_payload_buffer, new_buffer_size, stream_view); +} - template - __device__ std::enable_if_t, uint8_t> operator()( - key_t key) const - { - auto v_offset = - vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(thrust::get<0>(key)); - auto v_val = *(vertex_value_input_first + v_offset); - auto v_op_result = v_op(key, v_val); - if (v_op_result) { - *(vertex_value_output_first + v_offset) = thrust::get<1>(*v_op_result); - return static_cast(thrust::get<0>(*v_op_result)); - } else { - return std::numeric_limits::max(); - } - } -}; +template >* = nullptr> +void shrink_to_fit_optional_payload_buffer(std::byte& optional_payload_buffer, + rmm::cuda_stream_view stream_view) +{ + return; +} -// FIXME: a temporary workaround for cudaErrorInvalidDeviceFunction error when device lambda is used -// after if constexpr else statement that involves device lambda (bug report submitted) -template -struct check_invalid_bucket_idx_t { - __device__ bool operator()(thrust::tuple pair) - { - return thrust::get<0>(pair) == std::numeric_limits::max(); - } -}; +template >* = nullptr> +void shrink_to_fit_optional_payload_buffer( + std::add_lvalue_reference_t( + size_t{0}, rmm::cuda_stream_view{}))> optional_payload_buffer, + rmm::cuda_stream_view stream_view) +{ + return shrink_to_fit_dataframe_buffer(optional_payload_buffer, stream_view); +} template ::value_type; + using key_t = typename thrust::iterator_traits::value_type; using payload_t = typename optional_payload_buffer_value_type_t::value; @@ -234,9 +191,9 @@ __global__ void update_v_frontier_from_outgoing_e_hypersparse( using vertex_t = typename GraphViewType::vertex_type; using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; - using key_t = typename std::iterator_traits::value_type; + using key_t = typename thrust::iterator_traits::value_type; static_assert( - std::is_same_v::value_type>); + std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; using e_op_result_t = typename evaluate_edge_op::value_type; + using key_t = typename thrust::iterator_traits::value_type; static_assert( - std::is_same_v::value_type>); + std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; using e_op_result_t = typename evaluate_edge_op::value_type; + using key_t = typename thrust::iterator_traits::value_type; static_assert( - std::is_same_v::value_type>); + std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; using e_op_result_t = typename evaluate_edge_op::value_type; + using key_t = typename thrust::iterator_traits::value_type; static_assert( - std::is_same_v::value_type>); + std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; using e_op_result_t = typename evaluate_edge_op -size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, - BufferKeyOutputIterator buffer_key_output_first, - BufferPayloadOutputIterator buffer_payload_output_first, - size_t num_buffer_elements, - ReduceOp reduce_op) +template +auto sort_and_reduce_buffer_elements( + raft::handle_t const& handle, + decltype(allocate_dataframe_buffer(0, rmm::cuda_stream_view{}))&& key_buffer, + decltype(allocate_optional_payload_buffer(0, + rmm::cuda_stream_view{}))&& payload_buffer, + ReduceOp reduce_op) { - using key_t = typename std::iterator_traits::value_type; - using payload_t = - typename optional_payload_buffer_value_type_t::value; - - auto execution_policy = handle.get_thrust_policy(); if constexpr (std::is_same_v) { - thrust::sort( - execution_policy, buffer_key_output_first, buffer_key_output_first + num_buffer_elements); + thrust::sort(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer)); } else { - thrust::sort_by_key(execution_policy, - buffer_key_output_first, - buffer_key_output_first + num_buffer_elements, - buffer_payload_output_first); + thrust::sort_by_key(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer), + get_optional_payload_buffer_begin(payload_buffer)); } - size_t num_reduced_buffer_elements{}; if constexpr (std::is_same_v) { - auto it = thrust::unique( - execution_policy, buffer_key_output_first, buffer_key_output_first + num_buffer_elements); - num_reduced_buffer_elements = - static_cast(thrust::distance(buffer_key_output_first, it)); - } else if constexpr (std::is_same_v>) { - auto it = thrust::unique_by_key(execution_policy, - buffer_key_output_first, - buffer_key_output_first + num_buffer_elements, - buffer_payload_output_first); - num_reduced_buffer_elements = - static_cast(thrust::distance(buffer_key_output_first, thrust::get<0>(it))); + auto it = thrust::unique(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer)); + resize_dataframe_buffer( + key_buffer, + static_cast(thrust::distance(get_dataframe_buffer_begin(key_buffer), it)), + handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + } else if constexpr (std::is_same_v>) { + auto it = thrust::unique_by_key(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer), + get_optional_payload_buffer_begin(payload_buffer)); + resize_dataframe_buffer(key_buffer, + static_cast(thrust::distance( + get_dataframe_buffer_begin(key_buffer), thrust::get<0>(it))), + handle.get_stream()); + resize_dataframe_buffer(payload_buffer, size_dataframe_buffer(key_buffer), handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); } else { - rmm::device_uvector keys(num_buffer_elements, handle.get_stream()); - auto value_buffer = - allocate_dataframe_buffer(num_buffer_elements, handle.get_stream()); - auto it = thrust::reduce_by_key(execution_policy, - buffer_key_output_first, - buffer_key_output_first + num_buffer_elements, - buffer_payload_output_first, - keys.begin(), - get_dataframe_buffer_begin(value_buffer), - thrust::equal_to(), - reduce_op); - num_reduced_buffer_elements = - static_cast(thrust::distance(keys.begin(), thrust::get<0>(it))); - // FIXME: this copy can be replaced by move - thrust::copy(execution_policy, - keys.begin(), - keys.begin() + num_reduced_buffer_elements, - buffer_key_output_first); - thrust::copy(execution_policy, - get_dataframe_buffer_begin(value_buffer), - get_dataframe_buffer_begin(value_buffer) + num_reduced_buffer_elements, - buffer_payload_output_first); + auto num_uniques = + thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(size_dataframe_buffer(key_buffer)), + is_first_in_run_t{ + get_dataframe_buffer_begin(key_buffer)}); + + auto new_key_buffer = allocate_dataframe_buffer(num_uniques, handle.get_stream()); + auto new_payload_buffer = + allocate_dataframe_buffer(num_uniques, handle.get_stream()); + + thrust::reduce_by_key(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_end(key_buffer), + get_optional_payload_buffer_begin(payload_buffer), + get_dataframe_buffer_begin(new_key_buffer), + get_dataframe_buffer_begin(new_payload_buffer), + thrust::equal_to(), + reduce_op); + + key_buffer = std::move(new_key_buffer); + payload_buffer = std::move(new_payload_buffer); } - return num_reduced_buffer_elements; + return std::make_tuple(std::move(key_buffer), std::move(payload_buffer)); } } // namespace detail @@ -837,17 +799,14 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( auto const& cur_frontier_bucket = frontier.bucket(cur_frontier_bucket_idx); vertex_t const* local_frontier_vertex_first{nullptr}; - vertex_t const* local_frontier_vertex_last{nullptr}; if constexpr (std::is_same_v) { local_frontier_vertex_first = cur_frontier_bucket.begin(); - local_frontier_vertex_last = cur_frontier_bucket.end(); } else { local_frontier_vertex_first = thrust::get<0>(cur_frontier_bucket.begin().get_iterator_tuple()); - local_frontier_vertex_last = thrust::get<0>(cur_frontier_bucket.end().get_iterator_tuple()); } std::vector local_frontier_sizes{}; - if (GraphViewType::is_multi_gpu) { + if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); local_frontier_sizes = host_scalar_allgather(col_comm, cur_frontier_bucket.size(), handle.get_stream()); @@ -860,7 +819,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( graph_view.local_edge_partition_view(i)); auto execution_policy = handle.get_thrust_policy(); - if (GraphViewType::is_multi_gpu) { + if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); @@ -919,7 +878,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( ret += thrust::transform_reduce( execution_policy, local_frontier_vertex_first, - local_frontier_vertex_last, + local_frontier_vertex_first + cur_frontier_bucket.size(), [edge_partition] __device__(auto major) { auto major_offset = edge_partition.major_offset_from_major_nocheck(major); return edge_partition.local_degree(major_offset); @@ -932,10 +891,13 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( return ret; } -// FIXME: this documentation needs to be updated due to (tagged-)vertex support /** - * @brief Update (tagged-)vertex frontier and (tagged-)vertex property values iterating over the - * outgoing edges from the vertices in the current frontier. + * @brief Iterate over outgoing edges from the current vertex frontier and reduce valid edge functor + * outputs by (tagged-)destination ID. + * + * Edge functor outputs are thrust::optional objects and invalid if thrust::nullopt. Vertices are + * assumed to be tagged if VertexFrontierType::key_type is a tuple of a vertex type and a tag type + * (VertexFrontierType::key_type is identical to a vertex type otherwise). * * @tparam GraphViewType Type of the passed non-owning graph object. * @tparam VertexFrontierType Type of the vertex frontier class which abstracts vertex frontier @@ -946,9 +908,6 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( * property values. * @tparam EdgeOp Type of the quaternary (or quinary) edge operator. * @tparam ReduceOp Type of the binary reduction operator. - * @tparam VertexValueInputIterator Type of the iterator for vertex property values. - * @tparam VertexValueOutputIterator Type of the iterator for vertex property variables. - * @tparam VertexOp Type of the binary vertex operator. * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and * handles to various CUDA libraries) to run graph algorithms. * @param graph_view Non-owning graph object. @@ -956,8 +915,6 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( * includes multiple bucket objects. * @param cur_frontier_bucket_idx Index of the vertex frontier bucket holding vertices for the * current iteration. - * @param next_frontier_bucket_indices Indices of the vertex frontier buckets to store new frontier - * vertices for the next iteration. * @param edge_partition_src_value_input Device-copyable wrapper used to access source input * property values (for the edge sources assigned to this process in multi-GPU). Use either * cugraph::edge_partition_src_property_t::device_view() (if @p e_op needs to access source property @@ -969,63 +926,46 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( * property values) or cugraph::dummy_property_t::device_view() (if @p e_op does not access * destination property values). Use update_edge_partition_dst_property to fill the wrapper. * @param e_op Quaternary (or quinary) operator takes edge source, edge destination, (optional edge - * weight), property values for the source, and property values for the destination and returns a - * value to be reduced using the @p reduce_op. + * weight), property values for the source, and property values for the destination and returns + * 1) thrust::nullopt (if invalid and to be discarded); 2) dummy (but valid) thrust::optional object + * (e.g. thrust::optional{std::byte{0}}, if vertices are not tagged and + * ReduceOp::value_type is void); 3) a tag (if vertices are tagged and ReduceOp::value_type is + * void); 4) a value to be reduced using the @p reduce_op (if vertices are not tagged and + * ReduceOp::value_type is not void); or 5) a tuple of a tag and a value to be reduced (if vertices + * are tagged and ReduceOp::value_type is not void). * @param reduce_op Binary operator that takes two input arguments and reduce the two values to one. * There are pre-defined reduction operators in include/cugraph/prims/reduce_op.cuh. It is * recommended to use the pre-defined reduction operators whenever possible as the current (and * future) implementations of graph primitives may check whether @p ReduceOp is a known type (or has * known member variables) to take a more optimized code path. See the documentation in the * reduce_op.cuh file for instructions on writing custom reduction operators. - * @param vertex_value_input_first Iterator pointing to the vertex property values for the first - * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) - * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). - * @param vertex_value_output_first Iterator pointing to the vertex property variables for the first - * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_output_last` - * (exclusive) is deduced as @p vertex_value_output_first + @p - * graph_view.local_vertex_partition_range_size(). - * @param v_op Ternary operator takes (tagged-)vertex ID, *(@p vertex_value_input_first + i) (where - * i is [0, @p graph_view.local_vertex_partition_range_size())) and reduced value of the @p e_op - * outputs for this vertex and returns the target bucket index (for frontier update) and new vertex - * property values (to update *(@p vertex_value_output_first + i)). The target bucket index should - * either be VertexFrontierType::kInvalidBucketIdx or an index in @p next_frontier_bucket_indices. + * @return Tuple of key values and payload values (if ReduceOp::value_type is not void) or just key + * values (if ReduceOp::value_type is void). */ template -void update_v_frontier_from_outgoing_e( + typename ReduceOp> +std::conditional_t< + !std::is_same_v, + std::tuple( + 0, rmm::cuda_stream_view{})), + decltype(detail::allocate_optional_payload_buffer( + 0, rmm::cuda_stream_view{}))>, + decltype( + allocate_dataframe_buffer(0, rmm::cuda_stream_view{}))> +transform_reduce_v_frontier_outgoing_e_by_dst( raft::handle_t const& handle, GraphViewType const& graph_view, - VertexFrontierType& frontier, + VertexFrontierType const& frontier, size_t cur_frontier_bucket_idx, - std::vector const& next_frontier_bucket_indices, - // FIXME: if vertices in the frontier are tagged, we should have an option to access with (vertex, - // tag) pair (currently we can access only with vertex, we may use cuco::static_map for this - // purpose) EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgeOp e_op, ReduceOp reduce_op, - // FIXME: if vertices in the frontier are tagged, we should have an option to access with (vertex, - // tag) pair (currently we can access only with vertex, we may use cuco::static_map for this - // purpose) - VertexValueInputIterator vertex_value_input_first, - // FIXME: if vertices in the frontier are tagged, we should have an option to access with (vertex, - // tag) pair (currently we can access only with vertex, we may use cuco::static_map for this - // purpose) - // FIXME: currently, it is undefined behavior if vertices in the frontier are tagged and the same - // vertex property is updated by multiple v_op invocations with the same vertex but with different - // tags. - VertexValueOutputIterator vertex_value_output_first, - // FIXME: this takes (tagged-)vertex ID in addition, think about consistency with the other - // primitives. - VertexOp v_op) + bool do_expensive_check = false) { static_assert(!GraphViewType::is_storage_transposed, "GraphViewType should support the push model."); @@ -1034,7 +974,14 @@ void update_v_frontier_from_outgoing_e( using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; using key_t = typename VertexFrontierType::key_type; - using payload_t = typename ReduceOp::type; + using payload_t = typename ReduceOp::value_type; + + CUGRAPH_EXPECTS(cur_frontier_bucket_idx < frontier.num_buckets(), + "Invalid input argument: invalid current bucket index."); + + if (do_expensive_check) { + // currently, nothing to do + } auto frontier_key_first = frontier.bucket(cur_frontier_bucket_idx).begin(); auto frontier_key_last = frontier.bucket(cur_frontier_bucket_idx).end(); @@ -1046,7 +993,7 @@ void update_v_frontier_from_outgoing_e( detail::allocate_optional_payload_buffer(size_t{0}, handle.get_stream()); rmm::device_scalar buffer_idx(size_t{0}, handle.get_stream()); std::vector local_frontier_sizes{}; - if (GraphViewType::is_multi_gpu) { + if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); local_frontier_sizes = host_scalar_allgather( col_comm, @@ -1064,7 +1011,7 @@ void update_v_frontier_from_outgoing_e( auto edge_partition_frontier_key_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); vertex_t edge_partition_frontier_size = static_cast(local_frontier_sizes[i]); - if (GraphViewType::is_multi_gpu) { + if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); @@ -1270,13 +1217,16 @@ void update_v_frontier_from_outgoing_e( // 2. reduce the buffer - auto num_buffer_elements = detail::sort_and_reduce_buffer_elements( - handle, - get_dataframe_buffer_begin(key_buffer), - detail::get_optional_payload_buffer_begin(payload_buffer), - buffer_idx.value(handle.get_stream()), - reduce_op); - if (GraphViewType::is_multi_gpu) { + resize_dataframe_buffer(key_buffer, buffer_idx.value(handle.get_stream()), handle.get_stream()); + detail::resize_optional_payload_buffer( + payload_buffer, size_dataframe_buffer(key_buffer), handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + detail::shrink_to_fit_optional_payload_buffer(payload_buffer, handle.get_stream()); + + std::tie(key_buffer, payload_buffer) = + detail::sort_and_reduce_buffer_elements( + handle, std::move(key_buffer), std::move(payload_buffer), reduce_op); + if constexpr (GraphViewType::is_multi_gpu) { // FIXME: this step is unnecessary if row_comm_size== 1 auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); @@ -1302,7 +1252,7 @@ void update_v_frontier_from_outgoing_e( } thrust::lower_bound(handle.get_thrust_policy(), src_first, - src_first + num_buffer_elements, + src_first + size_dataframe_buffer(key_buffer), d_vertex_lasts.begin(), d_vertex_lasts.end(), d_tx_buffer_last_boundaries.begin()); @@ -1328,93 +1278,15 @@ void update_v_frontier_from_outgoing_e( payload_buffer = std::move(rx_payload_buffer); } - num_buffer_elements = detail::sort_and_reduce_buffer_elements( - handle, - get_dataframe_buffer_begin(key_buffer), - detail::get_optional_payload_buffer_begin(payload_buffer), - size_dataframe_buffer(key_buffer), - reduce_op); + std::tie(key_buffer, payload_buffer) = + detail::sort_and_reduce_buffer_elements( + handle, std::move(key_buffer), std::move(payload_buffer), reduce_op); } - // 3. update vertex property values and frontier - - if (num_buffer_elements > 0) { - assert(frontier.num_buckets() <= std::numeric_limits::max()); - rmm::device_uvector bucket_indices(num_buffer_elements, handle.get_stream()); - - auto vertex_partition = vertex_partition_device_view_t( - graph_view.local_vertex_partition_view()); - - if constexpr (!std::is_same_v) { - auto key_payload_pair_first = thrust::make_zip_iterator( - thrust::make_tuple(get_dataframe_buffer_begin(key_buffer), - detail::get_optional_payload_buffer_begin(payload_buffer))); - thrust::transform( - handle.get_thrust_policy(), - key_payload_pair_first, - key_payload_pair_first + num_buffer_elements, - bucket_indices.begin(), - [vertex_value_input_first, - vertex_value_output_first, - v_op, - vertex_partition, - invalid_bucket_idx = VertexFrontierType::kInvalidBucketIdx] __device__(auto pair) { - auto key = thrust::get<0>(pair); - auto payload = thrust::get<1>(pair); - vertex_t v_offset{}; - if constexpr (std::is_same_v) { - v_offset = vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(key); - } else { - v_offset = vertex_partition.local_vertex_partition_offset_from_vertex_nocheck( - thrust::get<0>(key)); - } - auto v_val = *(vertex_value_input_first + v_offset); - auto v_op_result = v_op(key, v_val, payload); - if (v_op_result) { - *(vertex_value_output_first + v_offset) = thrust::get<1>(*v_op_result); - return static_cast(thrust::get<0>(*v_op_result)); - } else { - return std::numeric_limits::max(); - } - }); - - resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); - shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); - } else { - thrust::transform(handle.get_thrust_policy(), - get_dataframe_buffer_begin(key_buffer), - get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, - bucket_indices.begin(), - detail::update_v_frontier_call_v_op_t{ - vertex_value_input_first, - vertex_value_output_first, - v_op, - vertex_partition, - VertexFrontierType::kInvalidBucketIdx}); - } - - auto bucket_key_pair_first = thrust::make_zip_iterator( - thrust::make_tuple(bucket_indices.begin(), get_dataframe_buffer_begin(key_buffer))); - bucket_indices.resize( - thrust::distance(bucket_key_pair_first, - thrust::remove_if(handle.get_thrust_policy(), - bucket_key_pair_first, - bucket_key_pair_first + num_buffer_elements, - detail::check_invalid_bucket_idx_t())), - handle.get_stream()); - resize_dataframe_buffer(key_buffer, bucket_indices.size(), handle.get_stream()); - bucket_indices.shrink_to_fit(handle.get_stream()); - shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); - - frontier.insert_to_buckets(bucket_indices.begin(), - bucket_indices.end(), - get_dataframe_buffer_begin(key_buffer), - next_frontier_bucket_indices); + if constexpr (!std::is_same_v) { + return std::make_tuple(std::move(key_buffer), std::move(payload_buffer)); + } else { + return key_buffer; } } diff --git a/cpp/include/cugraph/prims/update_v_frontier.cuh b/cpp/include/cugraph/prims/update_v_frontier.cuh new file mode 100644 index 00000000000..f7d17570a69 --- /dev/null +++ b/cpp/include/cugraph/prims/update_v_frontier.cuh @@ -0,0 +1,382 @@ +/* + * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace cugraph { + +namespace detail { + +template +struct update_v_frontier_call_v_op_t { + VertexValueInputIterator vertex_value_input_first{}; + VertexValueOutputIterator vertex_value_output_first{}; + VertexOp v_op{}; + vertex_t local_vertex_partition_range_first{}; + + __device__ uint8_t operator()(thrust::tuple pair) const + { + auto key = thrust::get<0>(pair); + auto payload = thrust::get<1>(pair); + vertex_t v_offset{}; + if constexpr (std::is_same_v) { + v_offset = key - local_vertex_partition_range_first; + } else { + v_offset = thrust::get<0>(key) - local_vertex_partition_range_first; + } + auto v_val = *(vertex_value_input_first + v_offset); + auto v_op_result = v_op(key, v_val, payload); + if (thrust::get<1>(v_op_result)) { + *(vertex_value_output_first + v_offset) = *(thrust::get<1>(v_op_result)); + } + if (thrust::get<0>(v_op_result)) { + assert(*(thrust::get<0>(v_op_result)) < std::numeric_limits::max()); + return static_cast(*(thrust::get<0>(v_op_result))); + } else { + return std::numeric_limits::max(); + } + } +}; + +template +struct update_v_frontier_call_v_op_t { + VertexValueInputIterator vertex_value_input_first{}; + VertexValueOutputIterator vertex_value_output_first{}; + VertexOp v_op{}; + vertex_t local_vertex_partition_range_first{}; + + __device__ uint8_t operator()(key_t key) const + { + vertex_t v_offset{}; + if constexpr (std::is_same_v) { + v_offset = key - local_vertex_partition_range_first; + } else { + v_offset = thrust::get<0>(key) - local_vertex_partition_range_first; + } + auto v_val = *(vertex_value_input_first + v_offset); + auto v_op_result = v_op(key, v_val); + if (thrust::get<1>(v_op_result)) { + *(vertex_value_output_first + v_offset) = *(thrust::get<1>(v_op_result)); + } + if (thrust::get<0>(v_op_result)) { + assert(*(thrust::get<0>(v_op_result)) < std::numeric_limits::max()); + return static_cast(*(thrust::get<0>(v_op_result))); + } else { + return std::numeric_limits::max(); + } + } +}; + +// FIXME: a temporary workaround for cudaErrorInvalidDeviceFunction error when device lambda is used +// after if constexpr else statement that involves device lambda (bug report submitted) +template +struct check_invalid_bucket_idx_t { + __device__ bool operator()(thrust::tuple pair) + { + return thrust::get<0>(pair) == std::numeric_limits::max(); + } +}; + +} // namespace detail + +/** + * @brief Insert (tagged-)vertices to the vertex frontier and update vertex property values of the + * newly inserted vertices . + * + * This primitive often works in pair with transform_reduce_v_frontier_outgoing_e_by_dst. This + * version of update_v_frontier takes @p payload_buffer and @v_op takes a payload value in addition + * to a (tagged-)vertex and a vertex property value as input arguments. + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @tparam KeyBuffer Type of the buffer storing (tagged-)vertices. + * @tparam PayloadBuffer Type of the buffer storing payload values. + * @tparam VertexFrontierType Type of the vertex frontier class which abstracts vertex frontier + * managements. + * @tparam VertexValueInputIterator Type of the iterator for input vertex property values. + * @tparam VertexValueOutputIterator Type of the iterator for output vertex property variables. + * @tparam VertexOp Type of the ternary vertex operator. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @param key_buffer buffer object storing (tagged-)vertices to insert. + * @param payload_buffer buffer object storing payload values for each (tagged-)vertices in the @p + * key_buffer. + * @param frontier VertexFrontierType class object for vertex frontier managements. This object + * includes multiple bucket objects. + * @param next_frontier_bucket_indices Indices of the vertex frontier buckets to store new frontier + * (tagged-)vertices. + * @param vertex_value_input_first Iterator pointing to the vertex property values for the first + * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) + * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). + * @param vertex_value_output_first Iterator pointing to the vertex property variables for the first + * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_output_last` + * (exclusive) is deduced as @p vertex_value_output_first + @p + * graph_view.local_vertex_partition_range_size(). + * @param v_op Ternary operator that takes (tagged-)vertex ID, *(@p vertex_value_input_first + i) + * (where i is [0, @p graph_view.local_vertex_partition_range_size())) and the payload value for the + * (tagged-)vertex ID and returns a tuple of 1) a thrust::optional object optionally storing a + * bucket index and 2) a thrust::optional object optionally storing a new vertex property value. If + * the first element of the returned tuple is thrust::nullopt, this (tagged-)vertex won't be + * inserted to the vertex frontier. If the second element is thrust::nullopt, the vertex property + * value for this vertex won't be updated. Note that it is currently undefined behavior if there are + * multiple tagged-vertices with the same vertex ID (but with different tags) AND @p v_op results on + * the tagged-vertices with the same vertex ID have more than one valid new vertex property values. + */ +template +void update_v_frontier(raft::handle_t const& handle, + GraphViewType const& graph_view, + KeyBuffer&& key_buffer, + PayloadBuffer&& payload_buffer, + VertexFrontierType& frontier, + std::vector const& next_frontier_bucket_indices, + VertexValueInputIterator vertex_value_input_first, + // FIXME: currently, it is undefined behavior if vertices in the frontier are + // tagged and the same vertex property is updated by multiple v_op + // invocations with the same vertex but with different tags. + VertexValueOutputIterator vertex_value_output_first, + VertexOp v_op, + bool do_expensive_check = false) +{ + using vertex_t = typename GraphViewType::vertex_type; + using key_t = + typename thrust::iterator_traits::value_type; + using payload_t = typename thrust::iterator_traits::value_type; + + static_assert(std::is_rvalue_reference_v); + static_assert(std::is_rvalue_reference_v); + + std::for_each(next_frontier_bucket_indices.begin(), + next_frontier_bucket_indices.end(), + [&frontier](auto idx) { + CUGRAPH_EXPECTS(idx < frontier.num_buckets(), + "Invalid input argument: invalid next bucket indices."); + }); + + if (do_expensive_check) { + // currently, nothing to do + } + + if (size_dataframe_buffer(key_buffer) > 0) { + assert(frontier.num_buckets() <= std::numeric_limits::max()); + rmm::device_uvector bucket_indices(size_dataframe_buffer(key_buffer), + handle.get_stream()); + + auto key_payload_pair_first = thrust::make_zip_iterator(thrust::make_tuple( + get_dataframe_buffer_begin(key_buffer), get_dataframe_buffer_begin(payload_buffer))); + thrust::transform(handle.get_thrust_policy(), + key_payload_pair_first, + key_payload_pair_first + size_dataframe_buffer(key_buffer), + bucket_indices.begin(), + detail::update_v_frontier_call_v_op_t{ + vertex_value_input_first, + vertex_value_output_first, + v_op, + graph_view.local_vertex_partition_range_first()}); + + resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); + shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); + + auto bucket_key_pair_first = thrust::make_zip_iterator( + thrust::make_tuple(bucket_indices.begin(), get_dataframe_buffer_begin(key_buffer))); + bucket_indices.resize( + thrust::distance(bucket_key_pair_first, + thrust::remove_if(handle.get_thrust_policy(), + bucket_key_pair_first, + bucket_key_pair_first + bucket_indices.size(), + detail::check_invalid_bucket_idx_t())), + handle.get_stream()); + resize_dataframe_buffer(key_buffer, bucket_indices.size(), handle.get_stream()); + bucket_indices.shrink_to_fit(handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + + frontier.insert_to_buckets(bucket_indices.begin(), + bucket_indices.end(), + get_dataframe_buffer_begin(key_buffer), + next_frontier_bucket_indices); + + resize_dataframe_buffer(key_buffer, 0, handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + } +} + +/** + * @brief Insert (tagged-)vertices to the vertex frontier and update vertex property values of the + * newly inserted vertices . + * + * This primitive often works in pair with transform_reduce_v_frontier_outgoing_e_by_dst. This + * version of update_v_frontier does not take @p payload_buffer and @v_op takes a (tagged-)vertex + * and a vertex property value as input arguments (no payload value in the input parameter list). + * + * @tparam GraphViewType Type of the passed non-owning graph object. + * @tparam KeyBuffer Type of the buffer storing (tagged-)vertices. + * @tparam VertexFrontierType Type of the vertex frontier class which abstracts vertex frontier + * managements. + * @tparam VertexValueInputIterator Type of the iterator for input vertex property values. + * @tparam VertexValueOutputIterator Type of the iterator for output vertex property variables. + * @tparam VertexOp Type of the binary vertex operator. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param graph_view Non-owning graph object. + * @param key_buffer buffer object storing (tagged-)vertices to insert. + * @param frontier VertexFrontierType class object for vertex frontier managements. This object + * includes multiple bucket objects. + * @param next_frontier_bucket_indices Indices of the vertex frontier buckets to store new frontier + * (tagged-)vertices. + * @param vertex_value_input_first Iterator pointing to the vertex property values for the first + * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) + * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). + * @param vertex_value_output_first Iterator pointing to the vertex property variables for the first + * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_output_last` + * (exclusive) is deduced as @p vertex_value_output_first + @p + * graph_view.local_vertex_partition_range_size(). + * @param v_op Binary operator that takes (tagged-)vertex ID, and *(@p vertex_value_input_first + i) + * (where i is [0, @p graph_view.local_vertex_partition_range_size())) and returns a tuple of 1) a + * thrust::optional object optionally storing a bucket index and 2) a thrust::optional object + * optionally storing a new vertex property value. If the first element of the returned tuple is + * thrust::nullopt, this (tagged-)vertex won't be inserted to the vertex frontier. If the second + * element is thrust::nullopt, the vertex property value for this vertex won't be updated. Note that + * it is currently undefined behavior if there are multiple tagged-vertices with the same vertex ID + * (but with different tags) AND @p v_op results on the tagged-vertices with the same vertex ID have + * more than one valid new vertex property values. + */ +template +void update_v_frontier(raft::handle_t const& handle, + GraphViewType const& graph_view, + KeyBuffer&& key_buffer, + VertexFrontierType& frontier, + std::vector const& next_frontier_bucket_indices, + VertexValueInputIterator vertex_value_input_first, + // FIXME: currently, it is undefined behavior if vertices in the frontier are + // tagged and the same vertex property is updated by multiple v_op + // invocations with the same vertex but with different tags. + VertexValueOutputIterator vertex_value_output_first, + VertexOp v_op, + bool do_expensive_check = false) +{ + using vertex_t = typename GraphViewType::vertex_type; + using key_t = + typename thrust::iterator_traits::value_type; + + static_assert(std::is_rvalue_reference_v); + + std::for_each(next_frontier_bucket_indices.begin(), + next_frontier_bucket_indices.end(), + [&frontier](auto idx) { + CUGRAPH_EXPECTS(idx < frontier.num_buckets(), + "Invalid input argument: invalid next bucket indices."); + }); + + if (do_expensive_check) { + // currently, nothing to do + } + + if (size_dataframe_buffer(key_buffer) > 0) { + assert(frontier.num_buckets() <= std::numeric_limits::max()); + rmm::device_uvector bucket_indices(size_dataframe_buffer(key_buffer), + handle.get_stream()); + + thrust::transform( + handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer) + size_dataframe_buffer(key_buffer), + bucket_indices.begin(), + detail::update_v_frontier_call_v_op_t{vertex_value_input_first, + vertex_value_output_first, + v_op, + graph_view.local_vertex_partition_range_first()}); + + auto bucket_key_pair_first = thrust::make_zip_iterator( + thrust::make_tuple(bucket_indices.begin(), get_dataframe_buffer_begin(key_buffer))); + bucket_indices.resize( + thrust::distance(bucket_key_pair_first, + thrust::remove_if(handle.get_thrust_policy(), + bucket_key_pair_first, + bucket_key_pair_first + bucket_indices.size(), + detail::check_invalid_bucket_idx_t())), + handle.get_stream()); + resize_dataframe_buffer(key_buffer, bucket_indices.size(), handle.get_stream()); + bucket_indices.shrink_to_fit(handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + + frontier.insert_to_buckets(bucket_indices.begin(), + bucket_indices.end(), + get_dataframe_buffer_begin(key_buffer), + next_frontier_bucket_indices); + + resize_dataframe_buffer(key_buffer, 0, handle.get_stream()); + shrink_to_fit_dataframe_buffer(key_buffer, handle.get_stream()); + } +} + +} // namespace cugraph diff --git a/cpp/include/cugraph/utilities/device_functors.cuh b/cpp/include/cugraph/utilities/device_functors.cuh index d0a189b38c3..8d9b6f325c1 100644 --- a/cpp/include/cugraph/utilities/device_functors.cuh +++ b/cpp/include/cugraph/utilities/device_functors.cuh @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -50,6 +51,16 @@ struct multiplier_t { __device__ T operator()(T input) const { return input * multiplier; } }; +template +struct is_first_in_run_t { + Iterator iter{}; + + __device__ bool operator()(size_t i) const + { + return (i == 0) || (*(iter + (i - 1)) != *(iter + i)); + } +}; + } // namespace detail } // namespace cugraph diff --git a/cpp/src/community/louvain.cuh b/cpp/src/community/louvain.cuh index 1274acdefaf..149c474694c 100644 --- a/cpp/src/community/louvain.cuh +++ b/cpp/src/community/louvain.cuh @@ -21,10 +21,10 @@ #include #include -#include #include #include #include +#include #include #include @@ -544,7 +544,7 @@ class Louvain { handle_, current_graph_view_, next_clusters_v_.begin(), dst_clusters_cache_); } - std::tie(cluster_keys_v_, cluster_weights_v_) = cugraph::per_src_key_transform_reduce_e( + std::tie(cluster_keys_v_, cluster_weights_v_) = cugraph::transform_reduce_e_by_src_key( handle_, current_graph_view_, dummy_property_t{}.device_view(), diff --git a/cpp/src/community/triangle_count_impl.cuh b/cpp/src/community/triangle_count_impl.cuh index a4bee0535c8..ea7a5f8af7d 100644 --- a/cpp/src/community/triangle_count_impl.cuh +++ b/cpp/src/community/triangle_count_impl.cuh @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -341,7 +341,7 @@ void triangle_count(raft::handle_t const& handle, cur_graph_counts.resize(cur_graph_view.local_vertex_partition_range_size(), handle.get_stream()); - per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints( + transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v( handle, cur_graph_view, dummy_property_t{}.device_view(), diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 861dc3652d9..9cd35774e0b 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -20,8 +20,9 @@ #include #include #include +#include #include -#include +#include #include #include #include @@ -183,7 +184,8 @@ struct v_op_t { size_t bucket_idx_conflict{}; // relevant only if GraphViewType::is_multi_gpu is true template - __device__ std::enable_if_t>> + __device__ std::enable_if_t, thrust::optional>> operator()(thrust::tuple tagged_v, int /* v_val */) const { auto tag = thrust::get<1>(tagged_v); @@ -194,22 +196,23 @@ struct v_op_t { auto old = atomicCAS(level_components + v_offset, invalid_component_id::value, tag); if (old != invalid_component_id::value && old != tag) { // conflict - return thrust::optional>{ - thrust::make_tuple(bucket_idx_conflict, std::byte{0} /* dummy */)}; + return thrust::make_tuple(thrust::optional{bucket_idx_conflict}, + thrust::optional{std::byte{0}} /* dummy */); } else { - return (old == invalid_component_id::value) - ? thrust::optional>{thrust::make_tuple( - bucket_idx_next, std::byte{0} /* dummy */)} - : thrust::nullopt; + auto update = (old == invalid_component_id::value); + return thrust::make_tuple( + update ? thrust::optional{bucket_idx_next} : thrust::nullopt, + update ? thrust::optional{std::byte{0}} /* dummy */ : thrust::nullopt); } } template - __device__ std::enable_if_t>> + __device__ std::enable_if_t, thrust::optional>> operator()(thrust::tuple /* tagged_v */, int /* v_val */) const { - return thrust::optional>{ - thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; + return thrust::make_tuple(thrust::optional{bucket_idx_next}, + thrust::optional{std::byte{0}} /* dummy */); } }; @@ -523,13 +526,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto old_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); resize_dataframe_buffer(edge_buffer, old_num_edge_inserts + max_pushes, handle.get_stream()); - update_v_frontier_from_outgoing_e( + auto new_frontier_tagged_vertex_buffer = transform_reduce_v_frontier_outgoing_e_by_dst( handle, level_graph_view, vertex_frontier, bucket_idx_cur, - GraphViewType::is_multi_gpu ? std::vector{bucket_idx_next, bucket_idx_conflict} - : std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), [col_components = @@ -555,18 +556,26 @@ void weakly_connected_components_impl(raft::handle_t const& handle, *(edge_buffer_first + edge_idx) = tag >= old ? thrust::make_tuple(tag, old) : thrust::make_tuple(old, tag); } - return (old == invalid_component_id::value) ? thrust::optional{tag} - : thrust::nullopt; + return old == invalid_component_id::value ? thrust::optional{tag} + : thrust::nullopt; }, - reduce_op::null(), - thrust::make_constant_iterator(0) /* dummy */, - thrust::make_discard_iterator() /* dummy */, - v_op_t{vertex_partition, - level_components, - get_dataframe_buffer_begin(edge_buffer), - num_edge_inserts.data(), - bucket_idx_next, - bucket_idx_conflict}); + reduce_op::null()); + + update_v_frontier(handle, + level_graph_view, + std::move(new_frontier_tagged_vertex_buffer), + vertex_frontier, + GraphViewType::is_multi_gpu + ? std::vector{bucket_idx_next, bucket_idx_conflict} + : std::vector{bucket_idx_next}, + thrust::make_constant_iterator(0) /* dummy */, + thrust::make_discard_iterator() /* dummy */, + v_op_t{vertex_partition, + level_components, + get_dataframe_buffer_begin(edge_buffer), + num_edge_inserts.data(), + bucket_idx_next, + bucket_idx_conflict}); if (GraphViewType::is_multi_gpu) { auto cur_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); diff --git a/cpp/src/cores/core_number_impl.cuh b/cpp/src/cores/core_number_impl.cuh index 3ae55c7f841..d5b470f086e 100644 --- a/cpp/src/cores/core_number_impl.cuh +++ b/cpp/src/cores/core_number_impl.cuh @@ -19,8 +19,9 @@ #include #include #include +#include #include -#include +#include #include #include @@ -193,18 +194,26 @@ void core_number(raft::handle_t const& handle, // mask-out/delete edges. if (graph_view.is_symmetric() || ((degree_type == k_core_degree_type_t::IN) || (degree_type == k_core_degree_type_t::INOUT))) { - update_v_frontier_from_outgoing_e( + auto [new_frontier_vertex_buffer, delta_buffer] = + transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + graph_view, + vertex_frontier, + bucket_idx_cur, + dummy_property_t{}.device_view(), + dst_core_numbers.device_view(), + [k, delta] __device__(vertex_t src, vertex_t dst, auto, auto dst_val) { + return dst_val >= k ? thrust::optional{delta} : thrust::nullopt; + }, + reduce_op::plus()); + + update_v_frontier( handle, graph_view, + std::move(new_frontier_vertex_buffer), + std::move(delta_buffer), vertex_frontier, - bucket_idx_cur, std::vector{bucket_idx_next}, - dummy_property_t{}.device_view(), - dst_core_numbers.device_view(), - [k, delta] __device__(vertex_t src, vertex_t dst, auto, auto dst_val) { - return dst_val >= k ? thrust::optional{delta} : thrust::nullopt; - }, - reduce_op::plus(), core_numbers, core_numbers, [k_first, @@ -217,8 +226,8 @@ void core_number(raft::handle_t const& handle, auto new_core_number = v_val >= pushed_val ? v_val - pushed_val : edge_t{0}; new_core_number = new_core_number < (k - delta) ? (k - delta) : new_core_number; new_core_number = new_core_number < k_first ? edge_t{0} : new_core_number; - return thrust::optional>{ - thrust::make_tuple(bucket_idx_next, new_core_number)}; + return thrust::make_tuple(thrust::optional{bucket_idx_next}, + thrust::optional{new_core_number}); }); } diff --git a/cpp/src/sampling/detail/sampling_utils_impl.cuh b/cpp/src/sampling/detail/sampling_utils_impl.cuh index 2e4ced78897..ce39fa3bcea 100644 --- a/cpp/src/sampling/detail/sampling_utils_impl.cuh +++ b/cpp/src/sampling/detail/sampling_utils_impl.cuh @@ -17,10 +17,10 @@ #pragma once #include -#include #include #include #include +#include #include #include @@ -923,11 +923,11 @@ count_and_remove_duplicates(raft::handle_t const& handle, thrust::sort(handle.get_thrust_policy(), tuple_iter_begin, tuple_iter_begin + src.size()); - auto num_uniques = - thrust::count_if(handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(src.size()), - detail::is_first_in_run_pair_t{src.data(), dst.data()}); + auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(src.begin(), dst.begin())); + auto num_uniques = thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(src.size()), + detail::is_first_in_run_t{pair_first}); rmm::device_uvector result_src(num_uniques, handle.get_stream()); rmm::device_uvector result_dst(num_uniques, handle.get_stream()); diff --git a/cpp/src/structure/coarsen_graph_impl.cuh b/cpp/src/structure/coarsen_graph_impl.cuh index 4ec252043d8..68c295b76c7 100644 --- a/cpp/src/structure/coarsen_graph_impl.cuh +++ b/cpp/src/structure/coarsen_graph_impl.cuh @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -79,11 +80,11 @@ groupby_e_and_coarsen_edgelist(rmm::device_uvector&& edgelist_majors, pair_first + edgelist_majors.size(), (*edgelist_weights).begin()); - auto num_uniques = thrust::count_if( - rmm::exec_policy(stream_view), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(edgelist_majors.size()), - detail::is_first_in_run_pair_t{edgelist_majors.data(), edgelist_minors.data()}); + auto num_uniques = + thrust::count_if(rmm::exec_policy(stream_view), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(edgelist_majors.size()), + detail::is_first_in_run_t{pair_first}); rmm::device_uvector tmp_edgelist_majors(num_uniques, stream_view); rmm::device_uvector tmp_edgelist_minors(tmp_edgelist_majors.size(), stream_view); diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 7ceafd784c1..92b2db13b7e 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -287,10 +288,11 @@ std::tuple, std::vector> compute_renumbe tmp_majors.begin()); thrust::sort( rmm::exec_policy(loop_stream), tmp_majors.begin(), tmp_majors.begin() + this_chunk_size); - auto num_unique_majors = thrust::count_if(rmm::exec_policy(loop_stream), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(this_chunk_size), - is_first_in_run_t{tmp_majors.data()}); + auto num_unique_majors = + thrust::count_if(rmm::exec_policy(loop_stream), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(this_chunk_size), + is_first_in_run_t{tmp_majors.data()}); rmm::device_uvector tmp_keys(num_unique_majors, loop_stream); rmm::device_uvector tmp_values(num_unique_majors, loop_stream); thrust::reduce_by_key(rmm::exec_policy(loop_stream), @@ -332,10 +334,11 @@ std::tuple, std::vector> compute_renumbe edgelist_majors[0] + edgelist_edge_counts[0], tmp_majors.begin()); thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); - auto num_unique_majors = thrust::count_if(handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(tmp_majors.size()), - is_first_in_run_t{tmp_majors.data()}); + auto num_unique_majors = + thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(tmp_majors.size()), + is_first_in_run_t{tmp_majors.data()}); rmm::device_uvector tmp_keys(num_unique_majors, handle.get_stream()); rmm::device_uvector tmp_values(num_unique_majors, handle.get_stream()); thrust::reduce_by_key(handle.get_thrust_policy(), diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index a16bb3bd49d..573c6cc10e0 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -19,8 +19,9 @@ #include #include #include +#include #include -#include +#include #include #include #include @@ -61,22 +62,22 @@ struct e_op_t { thrust::nullopt_t, thrust::nullopt_t) const { - thrust::optional ret{}; + bool push{}; if constexpr (multi_gpu) { auto dst_offset = dst - dst_first; auto old = atomicOr(visited_flags.get_iter(dst_offset), uint8_t{1}); - ret = old == uint8_t{0} ? thrust::optional{src} : thrust::nullopt; + push = (old == uint8_t{0}); } else { auto mask = uint32_t{1} << (dst % (sizeof(uint32_t) * 8)); if (*(prev_visited_flags + (dst / (sizeof(uint32_t) * 8))) & mask) { // check if unvisited in previous iterations - ret = thrust::nullopt; + push = false; } else { // check if unvisited in this iteration as well auto old = atomicOr(visited_flags + (dst / (sizeof(uint32_t) * 8)), mask); - ret = (old & mask) == 0 ? thrust::optional{src} : thrust::nullopt; + push = ((old & mask) == 0); } } - return ret; + return push ? thrust::optional{src} : thrust::nullopt; } }; @@ -222,40 +223,49 @@ void bfs(raft::handle_t const& handle, e_op.prev_visited_flags = prev_visited_flags.data(); } - update_v_frontier_from_outgoing_e( + auto [new_frontier_vertex_buffer, predecessor_buffer] = + transform_reduce_v_frontier_outgoing_e_by_dst(handle, + push_graph_view, + vertex_frontier, + bucket_idx_cur, + dummy_property_t{}.device_view(), + dummy_property_t{}.device_view(), +#if 1 + e_op, +#else + // FIXME: need to test more about the performance trade-offs between additional + // communication in updating dst_visited_flags (+ using atomics) vs reduced number of + // pushes (leading to both less computation & communication in reduction) + [vertex_partition, distances] __device__( + vertex_t src, vertex_t dst, auto src_val, auto dst_val) { + auto push = true; + if (vertex_partition.in_local_vertex_partition_range_nocheck(dst)) { + auto distance = + *(distances + + vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(dst)); + if (distance != invalid_distance) { push = false; } + } + return thrust::make_tuple(push, src); + }, +#endif + reduce_op::any()); + + update_v_frontier( handle, push_graph_view, + std::move(new_frontier_vertex_buffer), + std::move(predecessor_buffer), vertex_frontier, - bucket_idx_cur, std::vector{bucket_idx_next}, - dummy_property_t{}.device_view(), - dummy_property_t{}.device_view(), -#if 1 - e_op, -#else - // FIXME: need to test more about the performance trade-offs between additional - // communication in updating dst_visited_flags (+ using atomics) vs reduced number of pushes - // (leading to both less computation & communication in reduction) - [vertex_partition, distances] __device__( - vertex_t src, vertex_t dst, auto src_val, auto dst_val) { - auto push = true; - if (vertex_partition.in_local_vertex_partition_range_nocheck(dst)) { - auto distance = *( - distances + vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(dst)); - if (distance != invalid_distance) { push = false; } - } - return push ? thrust::optional{src} : thrust::nullopt; - }, -#endif - reduce_op::any(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), [depth] __device__(auto v, auto v_val, auto pushed_val) { - return (v_val == invalid_distance) - ? thrust::optional< - thrust::tuple>>{thrust::make_tuple( - bucket_idx_next, thrust::make_tuple(depth + 1, pushed_val))} - : thrust::nullopt; + auto update = (v_val == invalid_distance); + return thrust::make_tuple( + update ? thrust::optional{bucket_idx_next} : thrust::nullopt, + update ? thrust::optional>{thrust::make_tuple( + depth + 1, pushed_val)} + : thrust::nullopt); }); vertex_frontier.bucket(bucket_idx_cur).clear(); diff --git a/cpp/src/traversal/sssp_impl.cuh b/cpp/src/traversal/sssp_impl.cuh index 4f8e7791aa2..5fb6ef37a6b 100644 --- a/cpp/src/traversal/sssp_impl.cuh +++ b/cpp/src/traversal/sssp_impl.cuh @@ -21,8 +21,9 @@ #include #include #include +#include #include -#include +#include #include #include #include @@ -161,45 +162,53 @@ void sssp(raft::handle_t const& handle, auto vertex_partition = vertex_partition_device_view_t( push_graph_view.local_vertex_partition_view()); - update_v_frontier_from_outgoing_e( + auto [new_frontier_vertex_buffer, distance_predecessor_buffer] = + transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + push_graph_view, + vertex_frontier, + bucket_idx_cur_near, + GraphViewType::is_multi_gpu + ? edge_partition_src_distances.device_view() + : detail::edge_partition_major_property_device_view_t( + distances), + dummy_property_t{}.device_view(), + [vertex_partition, distances, cutoff] __device__( + vertex_t src, vertex_t dst, weight_t w, auto src_val, auto) { + auto push = true; + auto new_distance = src_val + w; + auto threshold = cutoff; + if (vertex_partition.in_local_vertex_partition_range_nocheck(dst)) { + auto local_vertex_offset = + vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(dst); + auto old_distance = *(distances + local_vertex_offset); + threshold = old_distance < threshold ? old_distance : threshold; + } + if (new_distance >= threshold) { push = false; } + return push ? thrust::optional>{thrust::make_tuple( + new_distance, src)} + : thrust::nullopt; + }, + reduce_op::minimum>()); + + update_v_frontier( handle, push_graph_view, + std::move(new_frontier_vertex_buffer), + std::move(distance_predecessor_buffer), vertex_frontier, - bucket_idx_cur_near, std::vector{bucket_idx_next_near, bucket_idx_far}, - GraphViewType::is_multi_gpu - ? edge_partition_src_distances.device_view() - : detail::edge_partition_major_property_device_view_t(distances), - dummy_property_t{}.device_view(), - [vertex_partition, distances, cutoff] __device__( - vertex_t src, vertex_t dst, weight_t w, auto src_val, auto) { - auto push = true; - auto new_distance = src_val + w; - auto threshold = cutoff; - if (vertex_partition.in_local_vertex_partition_range_nocheck(dst)) { - auto local_vertex_offset = - vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(dst); - auto old_distance = *(distances + local_vertex_offset); - threshold = old_distance < threshold ? old_distance : threshold; - } - if (new_distance >= threshold) { push = false; } - return push ? thrust::optional>{thrust::make_tuple( - new_distance, src)} - : thrust::nullopt; - }, - reduce_op::minimum>(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), [near_far_threshold] __device__(auto v, auto v_val, auto pushed_val) { auto new_dist = thrust::get<0>(pushed_val); - auto idx = new_dist < v_val - ? (new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far) - : vertex_frontier_t::kInvalidBucketIdx; - return new_dist < v_val - ? thrust::optional>{thrust::make_tuple( - new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far, - pushed_val)} - : thrust::nullopt; + auto update = (new_dist < v_val); + return thrust::make_tuple( + update ? thrust::optional{new_dist < near_far_threshold ? bucket_idx_next_near + : bucket_idx_far} + : thrust::nullopt, + update ? thrust::optional>{pushed_val} + : thrust::nullopt); }); vertex_frontier.bucket(bucket_idx_cur_near).clear(); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 79fde6b3af7..f37463ad818 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -595,9 +595,9 @@ if(BUILD_CUGRAPH_MG_TESTS) ConfigureTestMG(MG_COUNT_IF_V_TEST prims/mg_count_if_v.cu) ########################################################################################### - # - MG PRIMS UPDATE_V_FRONTIER_FROM_OUTGOING_E tests -------------------------------------- - ConfigureTestMG(MG_UPDATE_V_FRONTIER_FROM_OUTGOING_E_TEST - prims/mg_update_v_frontier_from_outgoing_e.cu) + # - MG PRIMS TRANSFORM_REDUCE_V_FRONTIER_OUTGOING_E_BY_DST tests -------------------------- + ConfigureTestMG(MG_TRANSFORM_REDUCE_V_FRONTIER_OUTGOING_E_BY_DST_TEST + prims/mg_transform_reduce_v_frontier_outgoing_e_by_dst.cu) ########################################################################################### # - MG PRIMS REDUCE_V tests --------------------------------------------------------------- diff --git a/cpp/tests/prims/mg_transform_reduce_v_frontier_outgoing_e_by_dst.cu b/cpp/tests/prims/mg_transform_reduce_v_frontier_outgoing_e_by_dst.cu new file mode 100644 index 00000000000..48e832c9a91 --- /dev/null +++ b/cpp/tests/prims/mg_transform_reduce_v_frontier_outgoing_e_by_dst.cu @@ -0,0 +1,621 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +template +__device__ __host__ auto make_type_casted_tuple_from_scalar(T val, std::index_sequence) +{ + return thrust::make_tuple( + static_cast::type>(val)...); +} + +template +__device__ __host__ auto make_property_value(T val) +{ + property_t ret{}; + if constexpr (cugraph::is_thrust_tuple_of_arithmetic::value) { + ret = make_type_casted_tuple_from_scalar( + val, std::make_index_sequence::value>{}); + } else { + ret = static_cast(val); + } + return ret; +} + +template +struct property_transform_t { + int mod{}; + + constexpr __device__ property_t operator()(vertex_t const v) const + { + static_assert(cugraph::is_thrust_tuple_of_arithmetic::value || + std::is_arithmetic_v); + cuco::detail::MurmurHash3_32 hash_func{}; + return make_property_value(hash_func(v) % mod); + } +}; + +template +struct e_op_t { + __device__ auto operator()(key_t optionally_tagged_src, + vertex_t dst, + property_t src_val, + property_t dst_val) const + { + if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { + return src_val < dst_val ? thrust::optional{std::byte{0}} /* dummy */ + : thrust::nullopt; + } else { + return src_val < dst_val ? thrust::optional{static_cast(1)} + : thrust::nullopt; + } + } else { + auto tag = thrust::get<1>(optionally_tagged_src); + if constexpr (std::is_same_v) { + return src_val < dst_val ? thrust::optional{tag} : thrust::nullopt; + } else { + return src_val < dst_val + ? thrust::optional>{thrust::make_tuple( + tag, static_cast(1))} + : thrust::nullopt; + } + } + } +}; + +struct Prims_Usecase { + bool check_correctness{true}; +}; + +template +class Tests_MG_TransformReduceVFrontierOutgoingEByDst + : public ::testing::TestWithParam> { + public: + Tests_MG_TransformReduceVFrontierOutgoingEByDst() {} + static void SetupTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + // Compare the results of transform_reduce_v_frontier_outgoing_e_by_dst primitive + template + void run_current_test(Prims_Usecase const& prims_usecase, input_usecase_t const& input_usecase) + { + using property_t = int32_t; + + using key_t = + std::conditional_t, vertex_t, thrust::tuple>; + + static_assert(std::is_same_v || std::is_arithmetic_v); + static_assert(std::is_same_v || + cugraph::is_arithmetic_or_thrust_tuple_of_arithmetic::value); + if constexpr (cugraph::is_thrust_tuple::value) { + static_assert(thrust::tuple_size::value == size_t{2}); + } + + // 1. initialize handle + + raft::handle_t handle{}; + HighResClock hr_clock{}; + + raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + auto row_comm_size = static_cast(sqrt(static_cast(comm_size))); + while (comm_size % row_comm_size != 0) { + --row_comm_size; + } + cugraph::partition_2d::subcomm_factory_t + subcomm_factory(handle, row_comm_size); + + // 2. create MG graph + + constexpr bool is_multi_gpu = true; + constexpr bool renumber = true; // needs to be true for multi gpu case + constexpr bool store_transposed = + false; // needs to be false for using transform_reduce_v_frontier_outgoing_e_by_dst + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + auto [mg_graph, mg_renumber_map_labels] = + cugraph::test::construct_graph( + handle, input_usecase, false, renumber); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s.\n"; + } + + auto mg_graph_view = mg_graph.view(); + + // 3. run MG transform reduce + + const int hash_bin_count = 5; + + auto mg_property_buffer = cugraph::allocate_dataframe_buffer( + mg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); + + thrust::transform(handle.get_thrust_policy(), + (*mg_renumber_map_labels).begin(), + (*mg_renumber_map_labels).end(), + cugraph::get_dataframe_buffer_begin(mg_property_buffer), + property_transform_t{hash_bin_count}); + + cugraph::edge_partition_src_property_t mg_src_properties( + handle, mg_graph_view); + cugraph::edge_partition_dst_property_t mg_dst_properties( + handle, mg_graph_view); + + update_edge_partition_src_property(handle, + mg_graph_view, + cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), + mg_src_properties); + update_edge_partition_dst_property(handle, + mg_graph_view, + cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), + mg_dst_properties); + + auto mg_key_buffer = cugraph::allocate_dataframe_buffer( + mg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); + if constexpr (std::is_same_v) { + thrust::sequence(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(mg_key_buffer), + cugraph::get_dataframe_buffer_end(mg_key_buffer), + mg_graph_view.local_vertex_partition_range_first()); + } else { + thrust::tabulate(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(mg_key_buffer), + cugraph::get_dataframe_buffer_end(mg_key_buffer), + [mg_renumber_map_labels = (*mg_renumber_map_labels).data(), + local_vertex_partition_range_first = + mg_graph_view.local_vertex_partition_range_first()] __device__(size_t i) { + return thrust::make_tuple( + static_cast(local_vertex_partition_range_first + i), + static_cast(*(mg_renumber_map_labels + i) % size_t{10})); + }); + } + + constexpr size_t bucket_idx_cur = 0; + constexpr size_t num_buckets = 1; + + cugraph::vertex_frontier_t mg_vertex_frontier(handle, + num_buckets); + mg_vertex_frontier.bucket(bucket_idx_cur) + .insert(cugraph::get_dataframe_buffer_begin(mg_key_buffer), + cugraph::get_dataframe_buffer_end(mg_key_buffer)); + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + auto mg_new_frontier_key_buffer = + cugraph::allocate_dataframe_buffer(0, handle.get_stream()); + [[maybe_unused]] auto mg_payload_buffer = + cugraph::detail::allocate_optional_payload_buffer(0, handle.get_stream()); + + if constexpr (std::is_same_v) { + mg_new_frontier_key_buffer = cugraph::transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + mg_graph_view, + mg_vertex_frontier, + bucket_idx_cur, + mg_src_properties.device_view(), + mg_dst_properties.device_view(), + e_op_t{}, + cugraph::reduce_op::null{}); + } else { + std::tie(mg_new_frontier_key_buffer, mg_payload_buffer) = + cugraph::transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + mg_graph_view, + mg_vertex_frontier, + bucket_idx_cur, + mg_src_properties.device_view(), + mg_dst_properties.device_view(), + e_op_t{}, + cugraph::reduce_op::plus{}); + } + + if (cugraph::test::g_perf) { + RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG transform_reduce_v_frontier_outgoing_e_by_dst took " << elapsed_time * 1e-6 + << " s.\n"; + } + + // 4. compare SG & MG results + + if (prims_usecase.check_correctness) { + auto mg_aggregate_renumber_map_labels = cugraph::test::device_gatherv( + handle, (*mg_renumber_map_labels).data(), (*mg_renumber_map_labels).size()); + + auto mg_aggregate_new_frontier_key_buffer = + cugraph::allocate_dataframe_buffer(0, handle.get_stream()); + if constexpr (std::is_same_v) { + mg_aggregate_new_frontier_key_buffer = cugraph::test::device_gatherv( + handle, mg_new_frontier_key_buffer.data(), mg_new_frontier_key_buffer.size()); + } else { + std::get<0>(mg_aggregate_new_frontier_key_buffer) = + cugraph::test::device_gatherv(handle, + std::get<0>(mg_new_frontier_key_buffer).data(), + std::get<0>(mg_new_frontier_key_buffer).size()); + std::get<1>(mg_aggregate_new_frontier_key_buffer) = + cugraph::test::device_gatherv(handle, + std::get<1>(mg_new_frontier_key_buffer).data(), + std::get<1>(mg_new_frontier_key_buffer).size()); + } + + [[maybe_unused]] auto mg_aggregate_payload_buffer = + cugraph::detail::allocate_optional_payload_buffer(0, handle.get_stream()); + if constexpr (!std::is_same_v) { + if constexpr (std::is_arithmetic_v) { + mg_aggregate_payload_buffer = cugraph::test::device_gatherv( + handle, mg_payload_buffer.data(), mg_payload_buffer.size()); + } else { + std::get<0>(mg_aggregate_payload_buffer) = cugraph::test::device_gatherv( + handle, std::get<0>(mg_payload_buffer).data(), std::get<0>(mg_payload_buffer).size()); + std::get<1>(mg_aggregate_payload_buffer) = cugraph::test::device_gatherv( + handle, std::get<1>(mg_payload_buffer).data(), std::get<1>(mg_payload_buffer).size()); + } + } + + if (handle.get_comms().get_rank() == int{0}) { + if constexpr (std::is_same_v) { + cugraph::unrenumber_int_vertices( + handle, + mg_aggregate_new_frontier_key_buffer.begin(), + mg_aggregate_new_frontier_key_buffer.size(), + mg_aggregate_renumber_map_labels.data(), + std::vector{mg_graph_view.number_of_vertices()}); + } else { + cugraph::unrenumber_int_vertices( + handle, + std::get<0>(mg_aggregate_new_frontier_key_buffer).begin(), + std::get<0>(mg_aggregate_new_frontier_key_buffer).size(), + mg_aggregate_renumber_map_labels.data(), + std::vector{mg_graph_view.number_of_vertices()}); + } + + if constexpr (std::is_same_v) { + thrust::sort(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(mg_aggregate_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_end(mg_aggregate_new_frontier_key_buffer)); + } else { + thrust::sort_by_key( + handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(mg_aggregate_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_end(mg_aggregate_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_begin(mg_aggregate_payload_buffer)); + } + + cugraph::graph_t sg_graph( + handle); + std::tie(sg_graph, std::ignore) = cugraph::test:: + construct_graph( + handle, input_usecase, false, false); + auto sg_graph_view = sg_graph.view(); + + auto sg_property_buffer = cugraph::allocate_dataframe_buffer( + sg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); + + thrust::transform( + handle.get_thrust_policy(), + thrust::make_counting_iterator(vertex_t{0}), + thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_size()), + cugraph::get_dataframe_buffer_begin(sg_property_buffer), + property_transform_t{hash_bin_count}); + + cugraph::edge_partition_src_property_t + sg_src_properties(handle, sg_graph_view); + cugraph::edge_partition_dst_property_t + sg_dst_properties(handle, sg_graph_view); + update_edge_partition_src_property(handle, + sg_graph_view, + cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), + sg_src_properties); + update_edge_partition_dst_property(handle, + sg_graph_view, + cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), + sg_dst_properties); + + auto sg_key_buffer = cugraph::allocate_dataframe_buffer( + sg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); + if constexpr (std::is_same_v) { + thrust::sequence(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_key_buffer), + cugraph::get_dataframe_buffer_end(sg_key_buffer), + sg_graph_view.local_vertex_partition_range_first()); + } else { + thrust::tabulate(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_key_buffer), + cugraph::get_dataframe_buffer_end(sg_key_buffer), + [] __device__(size_t i) { + return thrust::make_tuple( + static_cast(i), + static_cast(static_cast(i) % size_t{10})); + }); + } + + cugraph::vertex_frontier_t sg_vertex_frontier(handle, + num_buckets); + sg_vertex_frontier.bucket(bucket_idx_cur) + .insert(cugraph::get_dataframe_buffer_begin(sg_key_buffer), + cugraph::get_dataframe_buffer_end(sg_key_buffer)); + + auto sg_new_frontier_key_buffer = + cugraph::allocate_dataframe_buffer(0, handle.get_stream()); + [[maybe_unused]] auto sg_payload_buffer = + cugraph::detail::allocate_optional_payload_buffer(0, handle.get_stream()); + if constexpr (std::is_same_v) { + sg_new_frontier_key_buffer = cugraph::transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + sg_graph_view, + sg_vertex_frontier, + bucket_idx_cur, + sg_src_properties.device_view(), + sg_dst_properties.device_view(), + e_op_t{}, + cugraph::reduce_op::null{}); + } else { + std::tie(sg_new_frontier_key_buffer, sg_payload_buffer) = + cugraph::transform_reduce_v_frontier_outgoing_e_by_dst( + handle, + sg_graph_view, + sg_vertex_frontier, + bucket_idx_cur, + sg_src_properties.device_view(), + sg_dst_properties.device_view(), + e_op_t{}, + cugraph::reduce_op::plus{}); + } + + if constexpr (std::is_same_v) { + thrust::sort(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_end(sg_new_frontier_key_buffer)); + } else { + thrust::sort_by_key(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_end(sg_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_begin(sg_payload_buffer)); + } + + bool key_passed = + thrust::equal(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_end(sg_new_frontier_key_buffer), + cugraph::get_dataframe_buffer_begin(mg_aggregate_new_frontier_key_buffer)); + ASSERT_TRUE(key_passed); + + if constexpr (!std::is_same_v) { + bool payload_passed = + thrust::equal(handle.get_thrust_policy(), + cugraph::get_dataframe_buffer_begin(sg_payload_buffer), + cugraph::get_dataframe_buffer_begin(sg_payload_buffer), + cugraph::get_dataframe_buffer_end(mg_aggregate_payload_buffer)); + ASSERT_TRUE(payload_passed); + } + } + } + } +}; + +using Tests_MG_TransformReduceVFrontierOutgoingEByDst_File = + Tests_MG_TransformReduceVFrontierOutgoingEByDst; +using Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat = + Tests_MG_TransformReduceVFrontierOutgoingEByDst; + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt32Int32FloatVoidVoid) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt32Int32FloatVoidVoid) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt32Int32FloatVoidInt32) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt32Int32FloatVoidInt32) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, + CheckInt32Int32FloatVoidTupleFloatInt32) +{ + auto param = GetParam(); + run_current_test>( + std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, + CheckInt32Int32FloatVoidTupleFloatInt32) +{ + auto param = GetParam(); + run_current_test>( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt32Int32FloatInt32Void) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt32Int32FloatInt32Void) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt32Int32FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), + std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt32Int32FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, + CheckInt32Int32FloatInt32TupleFloatInt32) +{ + auto param = GetParam(); + run_current_test>( + std::get<0>(param), std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, + CheckInt32Int32FloatInt32TupleFloatInt32) +{ + auto param = GetParam(); + run_current_test>( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt32Int64FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), + std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt32Int64FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, CheckInt64Int64FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test(std::get<0>(param), + std::get<1>(param)); +} + +TEST_P(Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, CheckInt64Int64FloatInt32Int32) +{ + auto param = GetParam(); + run_current_test( + std::get<0>(param), + cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); +} + +INSTANTIATE_TEST_SUITE_P( + file_test, + Tests_MG_TransformReduceVFrontierOutgoingEByDst_File, + ::testing::Combine( + ::testing::Values(Prims_Usecase{true}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), + cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), + cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), + cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{true}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 10, 16, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_MG_TransformReduceVFrontierOutgoingEByDst_Rmat, + ::testing::Combine(::testing::Values(Prims_Usecase{false}), + ::testing::Values(cugraph::test::Rmat_Usecase( + 20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); + +CUGRAPH_MG_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/prims/mg_update_v_frontier_from_outgoing_e.cu b/cpp/tests/prims/mg_update_v_frontier_from_outgoing_e.cu deleted file mode 100644 index 60680bba016..00000000000 --- a/cpp/tests/prims/mg_update_v_frontier_from_outgoing_e.cu +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include - -template -__device__ __host__ auto make_type_casted_tuple_from_scalar(T val, std::index_sequence) -{ - return thrust::make_tuple( - static_cast::type>(val)...); -} - -template -__device__ __host__ auto make_property_value(T val) -{ - property_t ret{}; - if constexpr (cugraph::is_thrust_tuple_of_arithmetic::value) { - ret = make_type_casted_tuple_from_scalar( - val, std::make_index_sequence::value>{}); - } else { - ret = static_cast(val); - } - return ret; -} - -template -struct property_transform_t { - int mod{}; - - constexpr __device__ property_t operator()(vertex_t const v) const - { - static_assert(cugraph::is_thrust_tuple_of_arithmetic::value || - std::is_arithmetic_v); - cuco::detail::MurmurHash3_32 hash_func{}; - return make_property_value(hash_func(v) % mod); - } -}; - -struct Prims_Usecase { - bool check_correctness{true}; -}; - -template -class Tests_MG_UpdateVFrontierFromOutgoingE - : public ::testing::TestWithParam> { - public: - Tests_MG_UpdateVFrontierFromOutgoingE() {} - static void SetupTestCase() {} - static void TearDownTestCase() {} - - virtual void SetUp() {} - virtual void TearDown() {} - - // Compare the results of update_v_frontier_from_outgoing_e primitive - template - void run_current_test(Prims_Usecase const& prims_usecase, input_usecase_t const& input_usecase) - { - // 1. initialize handle - - raft::handle_t handle{}; - HighResClock hr_clock{}; - - raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto const comm_rank = comm.get_rank(); - - auto row_comm_size = static_cast(sqrt(static_cast(comm_size))); - while (comm_size % row_comm_size != 0) { - --row_comm_size; - } - cugraph::partition_2d::subcomm_factory_t - subcomm_factory(handle, row_comm_size); - - // 2. create MG graph - - constexpr bool is_multi_gpu = true; - constexpr bool renumber = true; // needs to be true for multi gpu case - constexpr bool store_transposed = - false; // needs to be false for using update_v_frontier_from_outgoing_e - if (cugraph::test::g_perf) { - RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - hr_clock.start(); - } - auto [mg_graph, mg_renumber_map_labels] = - cugraph::test::construct_graph( - handle, input_usecase, false, renumber); - - if (cugraph::test::g_perf) { - RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - double elapsed_time{0.0}; - hr_clock.stop(&elapsed_time); - std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s.\n"; - } - - auto mg_graph_view = mg_graph.view(); - - // 3. run MG transform reduce - - const int hash_bin_count = 5; - // const int initial_value = 4; - - auto mg_property_buffer = cugraph::allocate_dataframe_buffer( - mg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); - - thrust::transform(handle.get_thrust_policy(), - (*mg_renumber_map_labels).begin(), - (*mg_renumber_map_labels).end(), - cugraph::get_dataframe_buffer_begin(mg_property_buffer), - property_transform_t{hash_bin_count}); - rmm::device_uvector sources(mg_graph_view.number_of_vertices(), handle.get_stream()); - thrust::sequence(handle.get_thrust_policy(), - sources.begin(), - sources.end(), - mg_graph_view.local_vertex_partition_range_first()); - - cugraph::edge_partition_src_property_t mg_src_properties( - handle, mg_graph_view); - cugraph::edge_partition_dst_property_t mg_dst_properties( - handle, mg_graph_view); - - update_edge_partition_src_property(handle, - mg_graph_view, - cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), - mg_src_properties); - update_edge_partition_dst_property(handle, - mg_graph_view, - cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), - mg_dst_properties); - - constexpr size_t bucket_idx_cur = 0; - constexpr size_t bucket_idx_next = 1; - constexpr size_t num_buckets = 2; - - cugraph::vertex_frontier_t mg_vertex_frontier(handle, - num_buckets); - mg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); - - if (cugraph::test::g_perf) { - RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - hr_clock.start(); - } - - // prims call - update_v_frontier_from_outgoing_e( - handle, - mg_graph_view, - mg_vertex_frontier, - bucket_idx_cur, - std::vector{bucket_idx_next}, - mg_src_properties.device_view(), - mg_dst_properties.device_view(), - [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { - thrust::optional result; - if (src_val < dst_val) { result.emplace(src); } - return result; - }, - cugraph::reduce_op::any(), - cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), - thrust::make_discard_iterator() /* dummy */, - [] __device__(auto v, auto v_val, auto pushed_val) { - return thrust::optional>{ - thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; - }); - - if (cugraph::test::g_perf) { - RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - double elapsed_time{0.0}; - hr_clock.stop(&elapsed_time); - std::cout << "MG update_v_frontier_from_outgoing_e took " << elapsed_time * 1e-6 << " s.\n"; - } - - //// 4. compare SG & MG results - - if (prims_usecase.check_correctness) { - auto mg_aggregate_renumber_map_labels = cugraph::test::device_gatherv( - handle, (*mg_renumber_map_labels).data(), (*mg_renumber_map_labels).size()); - - auto& next_bucket = mg_vertex_frontier.bucket(bucket_idx_next); - auto mg_aggregate_frontier_dsts = - cugraph::test::device_gatherv(handle, next_bucket.begin(), next_bucket.size()); - - if (handle.get_comms().get_rank() == int{0}) { - cugraph::unrenumber_int_vertices( - handle, - mg_aggregate_frontier_dsts.begin(), - mg_aggregate_frontier_dsts.size(), - mg_aggregate_renumber_map_labels.data(), - std::vector{mg_graph_view.number_of_vertices()}); - thrust::sort(handle.get_thrust_policy(), - mg_aggregate_frontier_dsts.begin(), - mg_aggregate_frontier_dsts.end()); - - cugraph::graph_t sg_graph( - handle); - std::tie(sg_graph, std::ignore) = cugraph::test:: - construct_graph( - handle, input_usecase, false, false); - auto sg_graph_view = sg_graph.view(); - - auto sg_property_buffer = cugraph::allocate_dataframe_buffer( - sg_graph_view.local_vertex_partition_range_size(), handle.get_stream()); - - thrust::transform(handle.get_thrust_policy(), - sources.begin(), - sources.end(), - cugraph::get_dataframe_buffer_begin(sg_property_buffer), - property_transform_t{hash_bin_count}); - - cugraph::edge_partition_src_property_t - sg_src_properties(handle, sg_graph_view); - cugraph::edge_partition_dst_property_t - sg_dst_properties(handle, sg_graph_view); - update_edge_partition_src_property(handle, - sg_graph_view, - cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), - sg_src_properties); - update_edge_partition_dst_property(handle, - sg_graph_view, - cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), - sg_dst_properties); - cugraph::vertex_frontier_t sg_vertex_frontier(handle, - num_buckets); - sg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); - - update_v_frontier_from_outgoing_e( - handle, - sg_graph_view, - sg_vertex_frontier, - bucket_idx_cur, - std::vector{bucket_idx_next}, - sg_src_properties.device_view(), - sg_dst_properties.device_view(), - [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { - thrust::optional result; - if (src_val < dst_val) { result.emplace(src); } - return result; - }, - cugraph::reduce_op::any(), - cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), - thrust::make_discard_iterator() /* dummy */, - [] __device__(auto v, auto v_val, auto pushed_val) { - return thrust::optional>{ - thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; - }); - - thrust::sort(handle.get_thrust_policy(), - sg_vertex_frontier.bucket(bucket_idx_next).begin(), - sg_vertex_frontier.bucket(bucket_idx_next).end()); - bool passed = thrust::equal(handle.get_thrust_policy(), - sg_vertex_frontier.bucket(bucket_idx_next).begin(), - sg_vertex_frontier.bucket(bucket_idx_next).end(), - mg_aggregate_frontier_dsts.begin()); - ASSERT_TRUE(passed); - } - } - } -}; - -using Tests_MG_UpdateVFrontierFromOutgoingE_File = - Tests_MG_UpdateVFrontierFromOutgoingE; -using Tests_MG_UpdateVFrontierFromOutgoingE_Rmat = - Tests_MG_UpdateVFrontierFromOutgoingE; - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_File, CheckInt32Int32FloatTupleIntFloat) -{ - auto param = GetParam(); - run_current_test>(std::get<0>(param), - std::get<1>(param)); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, CheckInt32Int32FloatTupleIntFloat) -{ - auto param = GetParam(); - run_current_test>( - std::get<0>(param), - cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_File, CheckInt32Int32Float) -{ - auto param = GetParam(); - run_current_test(std::get<0>(param), std::get<1>(param)); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, CheckInt32Int32Float) -{ - auto param = GetParam(); - run_current_test( - std::get<0>(param), - cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_File, CheckInt32Int64Float) -{ - auto param = GetParam(); - run_current_test(std::get<0>(param), std::get<1>(param)); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, CheckInt32Int64Float) -{ - auto param = GetParam(); - run_current_test( - std::get<0>(param), - cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_File, CheckInt64Int64Float) -{ - auto param = GetParam(); - run_current_test(std::get<0>(param), std::get<1>(param)); -} - -TEST_P(Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, CheckInt64Int64Float) -{ - auto param = GetParam(); - run_current_test( - std::get<0>(param), - cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); -} - -INSTANTIATE_TEST_SUITE_P( - file_test, - Tests_MG_UpdateVFrontierFromOutgoingE_File, - ::testing::Combine( - ::testing::Values(Prims_Usecase{true}), - ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), - cugraph::test::File_Usecase("test/datasets/web-Google.mtx"), - cugraph::test::File_Usecase("test/datasets/ljournal-2008.mtx"), - cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx")))); - -INSTANTIATE_TEST_SUITE_P( - rmat_small_test, - Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, - ::testing::Combine(::testing::Values(Prims_Usecase{true}), - ::testing::Values(cugraph::test::Rmat_Usecase( - 10, 16, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); - -INSTANTIATE_TEST_SUITE_P( - rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with - --gtest_filter to select only the rmat_benchmark_test with a specific - vertex & edge type combination) by command line arguments and do not - include more than one Rmat_Usecase that differ only in scale or edge - factor (to avoid running same benchmarks more than once) */ - Tests_MG_UpdateVFrontierFromOutgoingE_Rmat, - ::testing::Combine(::testing::Values(Prims_Usecase{false}), - ::testing::Values(cugraph::test::Rmat_Usecase( - 20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); - -CUGRAPH_MG_TEST_PROGRAM_MAIN()