Skip to content

Commit

Permalink
Split update_v_frontier_from_outgoing_e to two simpler primitives (#2290
Browse files Browse the repository at this point in the history
)

Close #2003

- `Split update_v_frontier_from_outgoing_e` to `transform_reduce_v_frontier_outgoing_e_by_dst` and `update_v_frontier`
- Previously `per_v` was used to specify both vertex-centric primitives (e.g. for each vertex, iterate over all the incoming or outgoing edges) AND reduction granularity of edge operation outputs (e.g. reduce edge operation outputs by keys associated with destinations). Updated to use `by_...` to specify reduction granularity.
- Bug fix in `update_v_frontier_from_outgoing_e` when key_t == vertex_t && payload_t = void.
- Other code-style updates.

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Kumar Aatish (https://github.com/kaatish)

URL: #2290
  • Loading branch information
seunghwak authored May 25, 2022
1 parent 4c0531d commit f55d2e5
Show file tree
Hide file tree
Showing 20 changed files with 1,405 additions and 899 deletions.
19 changes: 0 additions & 19 deletions cpp/include/cugraph/detail/graph_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,5 @@ struct compute_partition_id_from_edge_t {
}
};

template <typename vertex_t>
struct is_first_in_run_t {
vertex_t const* vertices{nullptr};
__device__ bool operator()(size_t i) const
{
return (i == 0) || (vertices[i - 1] != vertices[i]);
}
};

template <typename vertex_t>
struct is_first_in_run_pair_t {
vertex_t const* vertices0{nullptr};
vertex_t const* vertices1{nullptr};
__device__ bool operator()(size_t i) const
{
return (i == 0) || ((vertices0[i - 1] != vertices0[i]) || (vertices1[i - 1] != vertices1[i]));
}
};

} // namespace detail
} // namespace cugraph
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cugraph/graph_view.hpp>
#include <cugraph/utilities/collect_comm.cuh>
#include <cugraph/utilities/dataframe_buffer.cuh>
#include <cugraph/utilities/device_functors.cuh>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/host_scalar_comm.cuh>
#include <cugraph/utilities/misc_utils.cuh>
Expand Down Expand Up @@ -549,11 +550,11 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(
key_pair_first + rx_majors.size(),
rx_key_aggregated_edge_weights.begin());
}
auto num_uniques = thrust::count_if(
handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(rx_majors.size()),
detail::is_first_in_run_pair_t<vertex_t>{rx_majors.data(), rx_minor_keys.data()});
auto num_uniques =
thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(rx_majors.size()),
detail::is_first_in_run_t<decltype(key_pair_first)>{key_pair_first});
tmp_majors.resize(num_uniques, handle.get_stream());
tmp_minor_keys.resize(tmp_majors.size(), handle.get_stream());
tmp_key_aggregated_edge_weights.resize(tmp_majors.size(), handle.get_stream());
Expand Down Expand Up @@ -652,10 +653,11 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(
tmp_key_aggregated_edge_weights.shrink_to_fit(handle.get_stream());

{
auto num_uniques = thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(tmp_majors.size()),
detail::is_first_in_run_t<vertex_t>{tmp_majors.data()});
auto num_uniques =
thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(tmp_majors.size()),
detail::is_first_in_run_t<vertex_t const*>{tmp_majors.data()});
rmm::device_uvector<vertex_t> unique_majors(num_uniques, handle.get_stream());
auto reduced_e_op_result_buffer =
allocate_dataframe_buffer<T>(unique_majors.size(), handle.get_stream());
Expand Down Expand Up @@ -725,7 +727,7 @@ void per_v_transform_reduce_dst_key_aggregated_outgoing_e(
auto num_uniques = thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(majors.size()),
detail::is_first_in_run_t<vertex_t>{majors.data()});
detail::is_first_in_run_t<vertex_t const*>{majors.data()});
rmm::device_uvector<vertex_t> unique_majors(num_uniques, handle.get_stream());
auto reduced_e_op_result_buffer =
allocate_dataframe_buffer<T>(unique_majors.size(), handle.get_stream());
Expand Down
14 changes: 7 additions & 7 deletions cpp/include/cugraph/prims/reduce_op.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace cugraph {
namespace reduce_op {

// Guidance on writing a custom reduction operator.
// 1. It is advised to add a "using type = type_of_the_reduced_values" statement.
// 1. It is required to add an "using value_type = type_of_the_reduced_values" statement.
// 2. A custom reduction operator MUST be side-effect free. We use thrust::reduce internally to
// implement reductions in multiple primitives. The current (version 1.16) implementation of thrust
// reduce rounds up the number of invocations based on the CUDA block size and discards the values
Expand All @@ -47,14 +47,14 @@ namespace reduce_op {

// in case there is no payload to reduce
struct null {
using type = void;
using value_type = void;
};

// Binary reduction operator selecting any of the two input arguments, T should be arithmetic types
// or thrust tuple of arithmetic types.
template <typename T>
struct any {
using type = T;
using value_type = T;
static constexpr bool pure_function = true; // this can be called in any process

__host__ __device__ T operator()(T const& lhs, T const& rhs) const { return lhs; }
Expand All @@ -64,7 +64,7 @@ struct any {
// operator <), T should be arithmetic types or thrust tuple of arithmetic types.
template <typename T>
struct minimum {
using type = T;
using value_type = T;
static constexpr bool pure_function = true; // this can be called in any process
static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::MIN;
inline static T const identity_element = max_identity_element<T>();
Expand All @@ -79,7 +79,7 @@ struct minimum {
// operator <), T should be arithmetic types or thrust tuple of arithmetic types.
template <typename T>
struct maximum {
using type = T;
using value_type = T;
static constexpr bool pure_function = true; // this can be called in any process
static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::MAX;
inline static T const identity_element = min_identity_element<T>();
Expand All @@ -94,7 +94,7 @@ struct maximum {
// tuple of arithmetic types.
template <typename T>
struct plus {
using type = T;
using value_type = T;
static constexpr bool pure_function = true; // this can be called in any process
static constexpr raft::comms::op_t compatible_raft_comms_op = raft::comms::op_t::SUM;
inline static T const identity_element = T{};
Expand All @@ -117,7 +117,7 @@ template <typename ReduceOp>
inline constexpr bool has_compatible_raft_comms_op_v =
has_compatible_raft_comms_op<ReduceOp>::value;

template <typename ReduceOp, typename = typename ReduceOp::type>
template <typename ReduceOp, typename = typename ReduceOp::value_type>
struct has_identity_element : std::false_type {
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cugraph/prims/detail/nbr_intersection.cuh>
#include <cugraph/prims/edge_partition_src_dst_property.cuh>
#include <cugraph/prims/property_op_utils.cuh>
#include <cugraph/utilities/device_functors.cuh>
#include <cugraph/utilities/error.hpp>

#include <raft/handle.hpp>
Expand Down Expand Up @@ -105,7 +106,7 @@ std::tuple<rmm::device_uvector<vertex_t>, ValueBuffer> sort_and_reduce_by_vertic
auto num_uniques = thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(vertices.size()),
detail::is_first_in_run_t<vertex_t>{vertices.data()});
detail::is_first_in_run_t<vertex_t const*>{vertices.data()});
rmm::device_uvector<vertex_t> reduced_vertices(num_uniques, handle.get_stream());
auto reduced_value_buffer = allocate_dataframe_buffer<value_t>(num_uniques, handle.get_stream());
thrust::reduce_by_key(handle.get_thrust_policy(),
Expand Down Expand Up @@ -169,7 +170,7 @@ struct accumulate_vertex_property_t {
* vertex; invoke a user-provided functor per intersection, and reduce the functor output
* values (thrust::tuple of three values having the same type: one for source, one for destination,
* and one for every vertex in the intersection) per-vertex. We may add
* per_v_transform_reduce_triplet_of_dst_nbr_intersection_of_e_endpoints in the future to allow
* transform_reduce_triplet_of_dst_nbr_intersection_of_e_endpoints_by_v in the future to allow
* emitting different values for different vertices in the intersection of edge endpoints. This
* function is inspired by thrust::transfrom_reduce().
*
Expand Down Expand Up @@ -213,7 +214,7 @@ template <typename GraphViewType,
typename IntersectionOp,
typename T,
typename VertexValueOutputIterator>
void per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints(
void transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v(
raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
Expand Down Expand Up @@ -424,7 +425,7 @@ void per_v_transform_reduce_dst_nbr_intersection_of_e_endpoints(
thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(merged_vertices.size()),
detail::is_first_in_run_t<vertex_t>{merged_vertices.data()});
detail::is_first_in_run_t<vertex_t const*>{merged_vertices.data()});
rmm::device_uvector<vertex_t> reduced_vertices(num_uniques, handle.get_stream());
auto reduced_value_buffer = allocate_dataframe_buffer<T>(num_uniques, handle.get_stream());
thrust::reduce_by_key(handle.get_thrust_policy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace cugraph {
namespace detail {

// FIXME: block size requires tuning
int32_t constexpr per_src_dst_key_transform_reduce_e_kernel_block_size = 128;
int32_t constexpr transform_reduce_e_by_src_dst_key_kernel_block_size = 128;

template <bool edge_partition_src_key,
typename GraphViewType,
Expand Down Expand Up @@ -225,7 +225,7 @@ __global__ void transform_reduce_by_src_dst_key_mid_degree(
using weight_t = typename GraphViewType::weight_type;

auto const tid = threadIdx.x + blockIdx.x * blockDim.x;
static_assert(per_src_dst_key_transform_reduce_e_kernel_block_size % raft::warp_size() == 0);
static_assert(transform_reduce_e_by_src_dst_key_kernel_block_size % raft::warp_size() == 0);
auto const lane_id = tid % raft::warp_size();
auto major_start_offset =
static_cast<size_t>(major_range_first - edge_partition.major_range_first());
Expand Down Expand Up @@ -352,7 +352,7 @@ template <bool edge_partition_src_key,
typename T>
std::tuple<rmm::device_uvector<typename GraphViewType::vertex_type>,
decltype(allocate_dataframe_buffer<T>(0, cudaStream_t{nullptr}))>
per_src_dst_key_transform_reduce_e(
transform_reduce_e_by_src_dst_key(
raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
Expand Down Expand Up @@ -414,7 +414,7 @@ per_src_dst_key_transform_reduce_e(
if ((*segment_offsets)[1] > 0) {
raft::grid_1d_block_t update_grid(
(*segment_offsets)[1],
detail::per_src_dst_key_transform_reduce_e_kernel_block_size,
detail::transform_reduce_e_by_src_dst_key_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
detail::transform_reduce_by_src_dst_key_high_degree<edge_partition_src_key, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
Expand All @@ -431,7 +431,7 @@ per_src_dst_key_transform_reduce_e(
if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) {
raft::grid_1d_warp_t update_grid(
(*segment_offsets)[2] - (*segment_offsets)[1],
detail::per_src_dst_key_transform_reduce_e_kernel_block_size,
detail::transform_reduce_e_by_src_dst_key_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
detail::transform_reduce_by_src_dst_key_mid_degree<edge_partition_src_key, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
Expand All @@ -448,7 +448,7 @@ per_src_dst_key_transform_reduce_e(
if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) {
raft::grid_1d_thread_t update_grid(
(*segment_offsets)[3] - (*segment_offsets)[2],
detail::per_src_dst_key_transform_reduce_e_kernel_block_size,
detail::transform_reduce_e_by_src_dst_key_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
detail::transform_reduce_by_src_dst_key_low_degree<edge_partition_src_key, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
Expand All @@ -466,7 +466,7 @@ per_src_dst_key_transform_reduce_e(
(*(edge_partition.dcs_nzd_vertex_count()) > 0)) {
raft::grid_1d_thread_t update_grid(
*(edge_partition.dcs_nzd_vertex_count()),
detail::per_src_dst_key_transform_reduce_e_kernel_block_size,
detail::transform_reduce_e_by_src_dst_key_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);
detail::transform_reduce_by_src_dst_key_hypersparse<edge_partition_src_key, GraphViewType>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
Expand All @@ -482,7 +482,7 @@ per_src_dst_key_transform_reduce_e(
} else {
raft::grid_1d_thread_t update_grid(
edge_partition.major_range_size(),
detail::per_src_dst_key_transform_reduce_e_kernel_block_size,
detail::transform_reduce_e_by_src_dst_key_kernel_block_size,
handle.get_device_properties().maxGridSize[0]);

detail::transform_reduce_by_src_dst_key_low_degree<edge_partition_src_key, GraphViewType>
Expand Down Expand Up @@ -553,8 +553,6 @@ per_src_dst_key_transform_reduce_e(

} // namespace detail

// FIXME: EdgeOp & VertexOp in update_frontier_v_push_if_out_nbr concatenates push inidicator or
// bucket idx with the value while EdgeOp here does not. This is inconsistent. Better be fixed.
/**
* @brief Iterate over the entire set of edges and reduce @p edge_op outputs to (key, value) pairs.
*
Expand Down Expand Up @@ -603,15 +601,14 @@ template <typename GraphViewType,
typename EdgePartitionSrcKeyInputWrapper,
typename EdgeOp,
typename T>
auto per_src_key_transform_reduce_e(
raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
EdgePartitionDstValueInputWrapper edge_partition_dst_value_input,
EdgePartitionSrcKeyInputWrapper edge_partition_src_key_input,
EdgeOp e_op,
T init,
bool do_expensive_check = false)
auto transform_reduce_e_by_src_key(raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
EdgePartitionDstValueInputWrapper edge_partition_dst_value_input,
EdgePartitionSrcKeyInputWrapper edge_partition_src_key_input,
EdgeOp e_op,
T init,
bool do_expensive_check = false)
{
static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic<T>::value);
static_assert(std::is_same<typename EdgePartitionSrcKeyInputWrapper::value_type,
Expand All @@ -621,17 +618,15 @@ auto per_src_key_transform_reduce_e(
// currently, nothing to do
}

return detail::per_src_dst_key_transform_reduce_e<true>(handle,
graph_view,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_src_key_input,
e_op,
init);
return detail::transform_reduce_e_by_src_dst_key<true>(handle,
graph_view,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_src_key_input,
e_op,
init);
}

// FIXME: EdgeOp & VertexOp in update_frontier_v_push_if_out_nbr concatenates push inidicator or
// bucket idx with the value while EdgeOp here does not. This is inconsistent. Better be fixed.
/**
* @brief Iterate over the entire set of edges and reduce @p edge_op outputs to (key, value) pairs.
*
Expand Down Expand Up @@ -681,15 +676,14 @@ template <typename GraphViewType,
typename EdgePartitionDstKeyInputWrapper,
typename EdgeOp,
typename T>
auto per_dst_key_transform_reduce_e(
raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
EdgePartitionDstValueInputWrapper edge_partition_dst_value_input,
EdgePartitionDstKeyInputWrapper edge_partition_dst_key_input,
EdgeOp e_op,
T init,
bool do_expensive_check = false)
auto transform_reduce_e_by_dst_key(raft::handle_t const& handle,
GraphViewType const& graph_view,
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
EdgePartitionDstValueInputWrapper edge_partition_dst_value_input,
EdgePartitionDstKeyInputWrapper edge_partition_dst_key_input,
EdgeOp e_op,
T init,
bool do_expensive_check = false)
{
static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic<T>::value);
static_assert(std::is_same<typename EdgePartitionDstKeyInputWrapper::value_type,
Expand All @@ -699,13 +693,13 @@ auto per_dst_key_transform_reduce_e(
// currently, nothing to do
}

return detail::per_src_dst_key_transform_reduce_e<false>(handle,
graph_view,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_dst_key_input,
e_op,
init);
return detail::transform_reduce_e_by_src_dst_key<false>(handle,
graph_view,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_dst_key_input,
e_op,
init);
}

} // namespace cugraph
Loading

0 comments on commit f55d2e5

Please sign in to comment.