From 26b8b64e5adb4fefe6e81c38d4d726b4b1054ea1 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Thu, 9 Dec 2021 23:51:20 -0800 Subject: [PATCH 01/11] update update_frontier_v_push_if_out_nbr to use fewer atomic instructions --- .../update_frontier_v_push_if_out_nbr.cuh | 253 +++++++++++------- 1 file changed, 157 insertions(+), 96 deletions(-) diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index fe9d884499b..f9df295c81d 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -180,61 +180,32 @@ struct check_invalid_bucket_idx_t { } }; -template -__device__ void push_if_buffer_element( - matrix_partition_device_view_t& matrix_partition, - typename std::iterator_traits::value_type key, - typename GraphViewType::vertex_type row_offset, - typename GraphViewType::vertex_type col, - typename GraphViewType::weight_type weight, - AdjMatrixRowValueInputWrapper adj_matrix_row_value_input, - AdjMatrixColValueInputWrapper adj_matrix_col_value_input, - BufferKeyOutputIterator buffer_key_output_first, - BufferPayloadOutputIterator buffer_payload_output_first, - size_t* buffer_idx_ptr, - EdgeOp e_op) + typename BufferPayloadOutputIterator> +__device__ void push_buffer_element(vertex_t col, + e_op_result_t e_op_result, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t buffer_idx) { - using vertex_t = typename GraphViewType::vertex_type; - using key_t = typename std::iterator_traits::value_type; + using key_t = typename std::iterator_traits::value_type; using payload_t = typename optional_payload_buffer_value_type_t::value; - auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); - auto e_op_result = evaluate_edge_op() - .compute(key, - col, - weight, - adj_matrix_row_value_input.get(row_offset), - adj_matrix_col_value_input.get(col_offset), - e_op); - if (e_op_result) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(1)); - if constexpr (std::is_same_v && std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = col; - } else if constexpr (std::is_same_v && !std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = col; - *(buffer_payload_output_first + buffer_idx) = *e_op_result; - } else if constexpr (!std::is_same_v && std::is_same_v) { - *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, *e_op_result); - } else { - *(buffer_key_output_first + buffer_idx) = - thrust::make_tuple(col, thrust::get<0>(*e_op_result)); - *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(*e_op_result); - } + assert(e_op_result.has_value()); + + if constexpr (std::is_same_v && std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = col; + } else if constexpr (std::is_same_v && !std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = col; + *(buffer_payload_output_first + buffer_idx) = *e_op_result; + } else if constexpr (!std::is_same_v && std::is_same_v) { + *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, *e_op_result); + } else { + *(buffer_key_output_first + buffer_idx) = thrust::make_tuple(col, thrust::get<0>(*e_op_result)); + *(buffer_payload_output_first + buffer_idx) = thrust::get<1>(*e_op_result); } } @@ -297,17 +268,26 @@ __global__ void for_all_frontier_row_for_all_nbr_hypersparse( edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_idx); for (edge_t i = 0; i < local_out_degree; ++i) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto col = indices[i]; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + auto e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + if (e_op_result) { + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(1)); + push_buffer_element( + col, e_op_result, buffer_key_output_first, buffer_payload_output_first, buffer_idx); + } } } idx += gridDim.x * blockDim.x; @@ -364,17 +344,26 @@ __global__ void for_all_frontier_row_for_all_nbr_low_degree( edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); for (edge_t i = 0; i < local_out_degree; ++i) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto col = indices[i]; + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + auto e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + if (e_op_result) { + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(1)); + push_buffer_element( + col, e_op_result, buffer_key_output_first, buffer_payload_output_first, buffer_idx); + } } idx += gridDim.x * blockDim.x; } @@ -409,6 +398,11 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); @@ -418,6 +412,9 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( auto const lane_id = tid % raft::warp_size(); auto idx = static_cast(tid / raft::warp_size()); + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; + while (idx < static_cast(thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); vertex_t row{}; @@ -431,18 +428,45 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( thrust::optional weights{thrust::nullopt}; edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = lane_id; i < local_out_degree; i += raft::warp_size()) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto rounded_up_local_out_degree = + ((local_out_degree + (raft::warp_size() - 1)) / raft::warp_size()) * raft::warp_size(); + for (edge_t i = lane_id; i < rounded_up_local_out_degree; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + + auto col = indices[i]; + if (i < local_out_degree) { + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot = __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (lane_id == 0) { + auto increment = __popc(ballot); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[threadIdx.x / raft::warp_size()] = + atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment)); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element( + col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[threadIdx.x / raft::warp_size()] + buffer_warp_offset); + } } idx += gridDim.x * (blockDim.x / raft::warp_size()); @@ -478,12 +502,21 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( std::is_same_v::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); auto idx = static_cast(blockIdx.x); + using BlockScan = cub::BlockScan; + __shared__ typename BlockScan::TempStorage temp_storage; + __shared__ size_t buffer_block_start_idx; + while (idx < static_cast(thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); vertex_t row{}; @@ -497,18 +530,46 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( thrust::optional weights{thrust::nullopt}; edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = threadIdx.x; i < local_out_degree; i += blockDim.x) { - push_if_buffer_element(matrix_partition, - key, - row_offset, - indices[i], - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input, - adj_matrix_col_value_input, - buffer_key_output_first, - buffer_payload_output_first, - buffer_idx_ptr, - e_op); + auto rounded_up_local_out_degree = + ((local_out_degree + (update_frontier_v_push_if_out_nbr_for_all_block_size - 1)) / + update_frontier_v_push_if_out_nbr_for_all_block_size) * + update_frontier_v_push_if_out_nbr_for_all_block_size; + for (edge_t i = threadIdx.x; i < rounded_up_local_out_degree; i += blockDim.x) { + e_op_result_t e_op_result{}; + edge_t buffer_block_offset{0}; + + auto col = indices[i]; + if (i < local_out_degree) { + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[i] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + BlockScan(temp_storage) + .ExclusiveSum(e_op_result ? edge_t{1} : edge_t{0}, buffer_block_offset); + if (threadIdx.x == (blockDim.x - 1)) { + auto increment = buffer_block_offset + (e_op_result ? edge_t{1} : edge_t{0}); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_block_start_idx = + atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment)); + } + __syncthreads(); + if (e_op_result) { + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_block_start_idx + buffer_block_offset); + } } idx += gridDim.x; From f86b1700c88e3cf992756515abe532bc378f49a1 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Fri, 10 Dec 2021 15:15:53 -0800 Subject: [PATCH 02/11] add result_type to evaluate_edge_op --- .../cugraph/prims/property_op_utils.cuh | 68 +++++++++++++++++-- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index 32ece7c166a..2f755da9d29 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -30,6 +30,8 @@ namespace cugraph { +namespace detail { + template struct is_valid_edge_op { static constexpr bool value = false; @@ -42,6 +44,55 @@ struct is_valid_edge_op< static constexpr bool valid = true; }; +template +struct edge_op_result_type; + +template +struct edge_op_result_type< + key_t, + vertex_t, + weight_t, + row_value_t, + col_value_t, + EdgeOp, + std::enable_if_t>:: + valid>> { + using type = + typename std::invoke_result::type; +}; + +template +struct edge_op_result_type< + key_t, + vertex_t, + weight_t, + row_value_t, + col_value_t, + EdgeOp, + std::enable_if_t>::valid>> { + using type = typename std::invoke_result::type; +}; + +} // namespace detail + template :: + type; template __device__ - std::enable_if_t>::valid, + std::enable_if_t>::valid, typename std::invoke_result::type> compute(K r, V c, W w, R rv, C cv, E e) { @@ -73,9 +127,10 @@ struct evaluate_edge_op { typename R = row_value_type, typename C = col_value_type, typename E = EdgeOp> - __device__ std::enable_if_t>::valid, - typename std::invoke_result::type> - compute(K r, V c, W w, R rv, C cv, E e) + __device__ + std::enable_if_t>::valid, + typename std::invoke_result::type> + compute(K r, V c, W w, R rv, C cv, E e) { return e(r, c, rv, cv); } @@ -103,7 +158,8 @@ struct cast_edge_op_bool_to_integer { typename C = col_value_type, typename E = EdgeOp> __device__ - std::enable_if_t>::valid, T> + std::enable_if_t>::valid, + T> operator()(K r, V c, W w, R rv, C cv) { return e_op(r, c, w, rv, cv) ? T{1} : T{0}; @@ -115,7 +171,7 @@ struct cast_edge_op_bool_to_integer { typename C = col_value_type, typename E = EdgeOp> __device__ - std::enable_if_t>::valid, T> + std::enable_if_t>::valid, T> operator()(K r, V c, R rv, C cv) { return e_op(r, c, rv, cv) ? T{1} : T{0}; From 4ca39ec65be11f5b90b9327bc8204fb23d30a0de Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 14 Dec 2021 14:49:38 -0800 Subject: [PATCH 03/11] update FIXME comments --- .../update_frontier_v_push_if_out_nbr.cuh | 36 ++++++------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index f9df295c81d..0b3a9b6509f 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -209,6 +209,11 @@ __device__ void push_buffer_element(vertex_t col, } } +// FIXME: for for_all_frontier_row_for_all_nbr_(hypersparse & low_degree), it might be faster to +// process a fixed number of edges per thread (this will require binary_searches but this overhead +// may be lesser than thread divergence) as there is no need for per vertex reduction. Threads in a +// warp can collaboratively load potential offset values to the shared memory and run binary search +// over the values in shared memory. template (thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); vertex_t row{}; @@ -605,8 +614,6 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, num_reduced_buffer_elements = static_cast(thrust::distance(buffer_key_output_first, it)); } else if constexpr (std::is_same>::value) { - // FIXME: if ReducOp is any, we may have a cheaper alternative than sort & uique (i.e. discard - // non-first elements) auto it = thrust::unique_by_key(execution_policy, buffer_key_output_first, buffer_key_output_first + num_buffer_elements, @@ -614,15 +621,6 @@ size_t sort_and_reduce_buffer_elements(raft::handle_t const& handle, num_reduced_buffer_elements = static_cast(thrust::distance(buffer_key_output_first, thrust::get<0>(it))); } else { - // FIXME: better avoid temporary buffer or at least limit the maximum buffer size (if we adopt - // CUDA cooperative group https://devblogs.nvidia.com/cooperative-groups and global sync(), we - // can use aggregate shared memory as a temporary buffer, or we can limit the buffer size, and - // split one thrust::reduce_by_key call to multiple thrust::reduce_by_key calls if the - // temporary buffer size exceeds the maximum buffer size (may be definied as percentage of the - // system HBM size or a function of the maximum number of threads in the system)) - // FIXME: actually, we can find how many unique keys are here by now. - // FIXME: if GraphViewType::is_multi_gpu is true, this should be executed on the GPU holding - // the vertex unless reduce_op is a pure function. rmm::device_uvector keys(num_buffer_elements, handle.get_stream()); auto value_buffer = allocate_dataframe_buffer(num_buffer_elements, handle.get_stream()); @@ -975,20 +973,6 @@ void update_frontier_v_push_if_out_nbr( edge_t{0}, thrust::plus()); - // FIXME: This is highly pessimistic for single GPU (and multi-GPU as well if we maintain - // additional per column data for filtering in e_op). If we can pause & resume execution if - // buffer needs to be increased (and if we reserve address space to avoid expensive - // reallocation; - // https://devblogs.nvidia.com/introducing-low-level-gpu-virtual-memory-management/), we can - // start with a smaller buffer size (especially when the frontier size is large). - // for special cases when we can assure that there is no more than one push per destination - // (e.g. if cugraph::reduce_op::any is used), we can limit the buffer size to - // std::min(max_pushes, matrix_partition.get_minor_size()). - // For Volta+, we can limit the buffer size to std::min(max_pushes, - // matrix_partition.get_minor_size()) if the reduction operation is a pure function if we use - // locking. - // FIXME: if i != 0, this will require costly reallocation if we don't use the new CUDA feature - // to reserve address space. auto new_buffer_size = buffer_idx.value(handle.get_stream()) + max_pushes; resize_dataframe_buffer(key_buffer, new_buffer_size, handle.get_stream()); if constexpr (!std::is_same_v) { From a0504ac15e6089bdde4c4d39909d62608a21c80c Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Wed, 15 Dec 2021 08:16:11 -0800 Subject: [PATCH 04/11] bfs performance tuning --- cpp/src/traversal/bfs_impl.cuh | 68 +++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index c1b8260b0a3..81a2ede23b1 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,37 @@ #include namespace cugraph { + +namespace { + +template +struct e_op_t { + std:: + conditional_t, uint32_t*> + visited_flags{nullptr}; + vertex_t dst_first{}; // relevant only if multi_gpu is true + + __device__ thrust::optional operator()(vertex_t src, + vertex_t dst, + thrust::nullopt_t, + thrust::nullopt_t) const + { + thrust::optional ret{}; + if constexpr (multi_gpu) { + auto dst_offset = dst - dst_first; + auto old = atomicOr(visited_flags.get_iter(dst_offset), uint8_t{1}); + ret = old == uint8_t{0} ? thrust::optional{src} : thrust::nullopt; + } else { + auto mask = uint32_t{1} << (dst % (sizeof(uint32_t) * 8)); + auto old = atomicOr(visited_flags + (dst / (sizeof(uint32_t) * 8)), mask); + ret = (old & mask) == 0 ? thrust::optional{src} : thrust::nullopt; + } + return ret; + } +}; + +} // namespace + namespace detail { template @@ -132,6 +164,17 @@ void bfs(raft::handle_t const& handle, vertex_frontier(handle); vertex_frontier.get_bucket(static_cast(Bucket::cur)).insert(sources, sources + n_sources); + rmm::device_uvector visited_flags( + (push_graph_view.get_number_of_local_vertices() + (sizeof(uint32_t) * 8 - 1)) / + (sizeof(uint32_t) * 8), + handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), visited_flags.begin(), visited_flags.end(), uint32_t{0}); + auto dst_visited_flags = GraphViewType::is_multi_gpu + ? col_properties_t(handle, push_graph_view) + : col_properties_t(); + if constexpr (GraphViewType::is_multi_gpu) { + dst_visited_flags.fill(uint8_t{0}, handle.get_stream()); + } // 4. BFS iteration vertex_t depth{0}; @@ -139,8 +182,22 @@ void bfs(raft::handle_t const& handle, if (direction_optimizing) { CUGRAPH_FAIL("unimplemented."); } else { - auto vertex_partition = vertex_partition_device_view_t( - push_graph_view.get_vertex_partition_view()); + if (GraphViewType::is_multi_gpu) { + copy_to_adj_matrix_col(handle, + push_graph_view, + vertex_frontier.get_bucket(static_cast(Bucket::cur)).begin(), + vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(), + thrust::make_constant_iterator(uint8_t{1}), + dst_visited_flags); + } + + e_op_t e_op{}; + if constexpr (GraphViewType::is_multi_gpu) { + e_op.visited_flags = dst_visited_flags.mutable_device_view(); + e_op.dst_first = push_graph_view.get_local_adj_matrix_partition_col_first(); + } else { + e_op.visited_flags = visited_flags.data(); + } update_frontier_v_push_if_out_nbr( handle, @@ -150,6 +207,12 @@ void bfs(raft::handle_t const& handle, std::vector{static_cast(Bucket::next)}, dummy_properties_t{}.device_view(), dummy_properties_t{}.device_view(), +#if 1 + e_op, +#else + // FIXME: need to test more about the performance trade-offs between additional + // communication in updating dst_visited_flags (+ using atomics) vs reduced number of pushes + // (leading to both less computation & communication in reduction) [vertex_partition, distances] __device__( vertex_t src, vertex_t dst, auto src_val, auto dst_val) { auto push = true; @@ -160,6 +223,7 @@ void bfs(raft::handle_t const& handle, } return push ? thrust::optional{src} : thrust::nullopt; }, +#endif reduce_op::any(), distances, thrust::make_zip_iterator(thrust::make_tuple(distances, predecessor_first)), From 1c03998c42c4e0167e2975627aeb3b60a28ce1a1 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Thu, 16 Dec 2021 15:11:32 -0800 Subject: [PATCH 05/11] pass rmm memory allocator to cuco --- cpp/src/structure/relabel_impl.cuh | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cpp/src/structure/relabel_impl.cuh b/cpp/src/structure/relabel_impl.cuh index e9d9220de81..a8da1798bb9 100644 --- a/cpp/src/structure/relabel_impl.cuh +++ b/cpp/src/structure/relabel_impl.cuh @@ -189,12 +189,17 @@ void relabel(raft::handle_t const& handle, relabel_map.find(labels, labels + num_labels, labels); } } else { - cuco::static_map relabel_map( - // cuco::static_map requires at least one empty slot - std::max(static_cast(static_cast(num_label_pairs) / load_factor), - static_cast(num_label_pairs) + 1), - invalid_vertex_id::value, - invalid_vertex_id::value); + handle.sync_stream(); // cuco::static_map currently does not take stream + + auto poly_alloc = rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()); + auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr}); + cuco::static_map + relabel_map{// cuco::static_map requires at least one empty slot + std::max(static_cast(static_cast(num_label_pairs) / load_factor), + static_cast(num_label_pairs) + 1), + invalid_vertex_id::value, + invalid_vertex_id::value, + stream_adapter}; auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(std::get<0>(old_new_label_pairs), std::get<1>(old_new_label_pairs))); From aa012beebae3a58b5b050f2b4180f3300430f03b Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Thu, 16 Dec 2021 17:37:49 -0800 Subject: [PATCH 06/11] avoid unnecessary atomicAdd --- .../update_frontier_v_push_if_out_nbr.cuh | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index 6b3d988e43d..339253d7c69 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -458,23 +458,25 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( e_op); } auto ballot = __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); - if (lane_id == 0) { - auto increment = __popc(ballot); - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - buffer_warp_start_indices[threadIdx.x / raft::warp_size()] = - atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(increment)); - } - __syncwarp(); - if (e_op_result) { - auto buffer_warp_offset = - static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); - push_buffer_element( - col, - e_op_result, - buffer_key_output_first, - buffer_payload_output_first, - buffer_warp_start_indices[threadIdx.x / raft::warp_size()] + buffer_warp_offset); + if (ballot > 0) { + if (lane_id == 0) { + auto increment = __popc(ballot); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[threadIdx.x / raft::warp_size()] = + atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment)); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element( + col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[threadIdx.x / raft::warp_size()] + buffer_warp_offset); + } } } @@ -567,9 +569,11 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( if (threadIdx.x == (blockDim.x - 1)) { auto increment = buffer_block_offset + (e_op_result ? edge_t{1} : edge_t{0}); static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - buffer_block_start_idx = - atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(increment)); + buffer_block_start_idx = increment > 0 + ? static_cast(atomicAdd( + reinterpret_cast(buffer_idx_ptr), + static_cast(increment))) + : size_t{0} /* dummy */; } __syncthreads(); if (e_op_result) { From 90fd98095af996b4c62b1792412a2ad62585b86c Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Mon, 20 Dec 2021 10:53:21 -0800 Subject: [PATCH 07/11] add prev_visited_flags to reduce the number of atomic operations --- cpp/src/traversal/bfs_impl.cuh | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index 81a2ede23b1..a0396bdf3b3 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -50,6 +50,10 @@ struct e_op_t { std:: conditional_t, uint32_t*> visited_flags{nullptr}; + uint32_t const* prev_visited_flags{ + nullptr}; // relevant only if multi_gpu is false (this affects only local-computing with 0 + // impact in communication volume, so this may improve performance in small-scale but + // will eat-up more memory with no benefit in performance in large-scale). vertex_t dst_first{}; // relevant only if multi_gpu is true __device__ thrust::optional operator()(vertex_t src, @@ -64,8 +68,13 @@ struct e_op_t { ret = old == uint8_t{0} ? thrust::optional{src} : thrust::nullopt; } else { auto mask = uint32_t{1} << (dst % (sizeof(uint32_t) * 8)); - auto old = atomicOr(visited_flags + (dst / (sizeof(uint32_t) * 8)), mask); - ret = (old & mask) == 0 ? thrust::optional{src} : thrust::nullopt; + if (*(prev_visited_flags + (dst / (sizeof(uint32_t) * 8))) & + mask) { // check if unvisited in previous iterations + ret = thrust::nullopt; + } else { // check if unvisited in this iteration as well + auto old = atomicOr(visited_flags + (dst / (sizeof(uint32_t) * 8)), mask); + ret = (old & mask) == 0 ? thrust::optional{src} : thrust::nullopt; + } } return ret; } @@ -169,9 +178,14 @@ void bfs(raft::handle_t const& handle, (sizeof(uint32_t) * 8), handle.get_stream()); thrust::fill(handle.get_thrust_policy(), visited_flags.begin(), visited_flags.end(), uint32_t{0}); - auto dst_visited_flags = GraphViewType::is_multi_gpu - ? col_properties_t(handle, push_graph_view) - : col_properties_t(); + rmm::device_uvector prev_visited_flags( + GraphViewType::is_multi_gpu ? size_t{0} : visited_flags.size(), + handle.get_stream()); // relevant only if GraphViewType::is_multi_gpu is false + auto dst_visited_flags = + GraphViewType::is_multi_gpu + ? col_properties_t(handle, push_graph_view) + : col_properties_t(); // relevant only if GraphViewType::is_multi_gpu is true if constexpr (GraphViewType::is_multi_gpu) { dst_visited_flags.fill(uint8_t{0}, handle.get_stream()); } @@ -189,6 +203,11 @@ void bfs(raft::handle_t const& handle, vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(), thrust::make_constant_iterator(uint8_t{1}), dst_visited_flags); + } else { + thrust::copy(handle.get_thrust_policy(), + visited_flags.begin(), + visited_flags.end(), + prev_visited_flags.begin()); } e_op_t e_op{}; @@ -196,7 +215,8 @@ void bfs(raft::handle_t const& handle, e_op.visited_flags = dst_visited_flags.mutable_device_view(); e_op.dst_first = push_graph_view.get_local_adj_matrix_partition_col_first(); } else { - e_op.visited_flags = visited_flags.data(); + e_op.visited_flags = visited_flags.data(); + e_op.prev_visited_flags = prev_visited_flags.data(); } update_frontier_v_push_if_out_nbr( From 45859bb2e42f3cd43f1348d6296a2e1ef1367ef6 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 21 Dec 2021 09:06:10 -0800 Subject: [PATCH 08/11] additional update_frontier_v_push_if_out_nbr performance improvements --- .../update_frontier_v_push_if_out_nbr.cuh | 382 +++++++++++++++--- 1 file changed, 316 insertions(+), 66 deletions(-) diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index 339253d7c69..d3ef125a6f3 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -209,11 +209,8 @@ __device__ void push_buffer_element(vertex_t col, } } -// FIXME: for for_all_frontier_row_for_all_nbr_(hypersparse & low_degree), it might be faster to -// process a fixed number of edges per thread (this will require binary_searches but this overhead -// may be lesser than thread divergence) as there is no need for per vertex reduction. Threads in a -// warp can collaboratively load potential offset values to the shared memory and run binary search -// over the values in shared memory. +// FIXME: test whether the first case is faster than the second case +#if 0 template ::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const warp_id = threadIdx.x / raft::warp_size(); + auto const lane_id = tid % raft::warp_size(); auto row_start_offset = static_cast(major_hypersparse_first - matrix_partition.get_major_first()); auto idx = static_cast(tid); - auto dcs_nzd_vertices = *(matrix_partition.get_dcs_nzd_vertices()); - auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count()); + __shared__ edge_t + warp_local_degree_inclusive_sums[update_frontier_v_push_if_out_nbr_for_all_block_size]; + __shared__ edge_t + warp_key_local_edge_offsets[update_frontier_v_push_if_out_nbr_for_all_block_size]; + + using WarpScan = cub::WarpScan; + __shared__ typename WarpScan::TempStorage temp_storage; + + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; + + auto indices = matrix_partition.get_indices(); + auto weights = matrix_partition.get_weights(); + + vertex_t num_keys = static_cast(thrust::distance(key_first, key_last)); + auto rounded_up_num_keys = + ((static_cast(num_keys) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + while (idx < rounded_up_num_keys) { + auto min_key_idx = static_cast(idx - (idx % raft::warp_size())); // inclusive + auto max_key_idx = + static_cast(std::min(static_cast(min_key_idx) + raft::warp_size(), + static_cast(num_keys))); // exclusive + + // update warp_local_degree_inclusive_sums & warp_key_local_edge_offsets + + edge_t local_degree{0}; + if (lane_id < static_cast(max_key_idx - min_key_idx)) { + auto key = *(key_first + idx); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + auto row_hypersparse_idx = matrix_partition.get_major_hypersparse_idx_from_major_nocheck(row); + if (row_hypersparse_idx) { + auto row_idx = row_start_offset + *row_hypersparse_idx; + local_degree = matrix_partition.get_local_degree(row_idx); + warp_key_local_edge_offsets[threadIdx.x] = matrix_partition.get_local_offset(row_idx); + } else { + local_degree = edge_t{0}; + warp_key_local_edge_offsets[threadIdx.x] = edge_t{0}; // dummy + } + } + WarpScan(temp_storage) + .InclusiveSum(local_degree, warp_local_degree_inclusive_sums[threadIdx.x]); + __syncwarp(); + + // process local edges for the keys in [key_first + min_key_idx, key_first + max_key_idx) + + auto num_edges_this_warp = warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + (max_key_idx - min_key_idx) - 1]; + auto rounded_up_num_edges_this_warp = + ((static_cast(num_edges_this_warp) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + + for (size_t i = lane_id; i < rounded_up_num_edges_this_warp; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + vertex_t col{}; + + if (i < static_cast(num_edges_this_warp)) { + auto key_idx_this_warp = static_cast(thrust::distance( + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + thrust::upper_bound(thrust::seq, + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + warp_local_degree_inclusive_sums + warp_id * raft::warp_size() + + (max_key_idx - min_key_idx), + i))); + auto local_edge_offset = + warp_key_local_edge_offsets[warp_id * raft::warp_size() + key_idx_this_warp] + + static_cast(i - + ((key_idx_this_warp == 0) + ? edge_t{0} + : warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + key_idx_this_warp - 1])); + auto key = *(key_first + (min_key_idx + key_idx_this_warp)); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + col = indices[local_edge_offset]; + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[local_edge_offset] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot_e_op = + __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (ballot_e_op) { + if (lane_id == 0) { + auto increment = __popc(ballot_e_op); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot_e_op & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); + } + } + } + idx += gridDim.x * blockDim.x; + } +} +#else +template +__global__ void for_all_frontier_row_for_all_nbr_hypersparse( + matrix_partition_device_view_t matrix_partition, + typename GraphViewType::vertex_type major_hypersparse_first, + KeyIterator key_first, + KeyIterator key_last, + AdjMatrixRowValueInputWrapper adj_matrix_row_value_input, + AdjMatrixColValueInputWrapper adj_matrix_col_value_input, + BufferKeyOutputIterator buffer_key_output_first, + BufferPayloadOutputIterator buffer_payload_output_first, + size_t* buffer_idx_ptr, + EdgeOp e_op) +{ + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; + using key_t = typename std::iterator_traits::value_type; + static_assert( + std::is_same_v::value_type>); + using payload_t = + typename optional_payload_buffer_value_type_t::value; + + static_assert(!GraphViewType::is_adj_matrix_transposed, + "GraphViewType should support the push model."); + + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto row_start_offset = + static_cast(major_hypersparse_first - matrix_partition.get_major_first()); + auto idx = static_cast(tid); while (idx < static_cast(thrust::distance(key_first, key_last))) { auto key = *(key_first + idx); @@ -288,8 +452,9 @@ __global__ void for_all_frontier_row_for_all_nbr_hypersparse( e_op); if (e_op_result) { static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(1)); + auto buffer_idx = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(1))); push_buffer_element( col, e_op_result, buffer_key_output_first, buffer_payload_output_first, buffer_idx); } @@ -298,12 +463,8 @@ __global__ void for_all_frontier_row_for_all_nbr_hypersparse( idx += gridDim.x * blockDim.x; } } +#endif -// FIXME: for for_all_frontier_row_for_all_nbr_(hypersparse & low_degree), it might be faster to -// process a fixed number of edges per thread (this will require binary_searches but this overhead -// may be lesser than thread divergence) as there is no need for per vertex reduction. Threads in a -// warp can collaboratively load potential offset values to the shared memory and run binary search -// over the values in shared memory. template ::value_type>); using payload_t = typename optional_payload_buffer_value_type_t::value; + using e_op_result_t = typename evaluate_edge_op::result_type; static_assert(!GraphViewType::is_adj_matrix_transposed, "GraphViewType should support the push model."); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const warp_id = threadIdx.x / raft::warp_size(); + auto const lane_id = tid % raft::warp_size(); + auto idx = static_cast(tid); - while (idx < static_cast(thrust::distance(key_first, key_last))) { - auto key = *(key_first + idx); - vertex_t row{}; - if constexpr (std::is_same_v) { - row = key; - } else { - row = thrust::get<0>(key); + __shared__ edge_t + warp_local_degree_inclusive_sums[update_frontier_v_push_if_out_nbr_for_all_block_size]; + __shared__ edge_t + warp_key_local_edge_offsets[update_frontier_v_push_if_out_nbr_for_all_block_size]; + + using WarpScan = cub::WarpScan; + __shared__ typename WarpScan::TempStorage temp_storage; + + __shared__ size_t buffer_warp_start_indices[update_frontier_v_push_if_out_nbr_for_all_block_size / + raft::warp_size()]; + + auto indices = matrix_partition.get_indices(); + auto weights = matrix_partition.get_weights(); + + vertex_t num_keys = static_cast(thrust::distance(key_first, key_last)); + auto rounded_up_num_keys = + ((static_cast(num_keys) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + while (idx < rounded_up_num_keys) { + auto min_key_idx = static_cast(idx - (idx % raft::warp_size())); // inclusive + auto max_key_idx = + static_cast(std::min(static_cast(min_key_idx) + raft::warp_size(), + static_cast(num_keys))); // exclusive + + // update warp_local_degree_inclusive_sums & warp_key_local_edge_offsets + + edge_t local_degree{0}; + if (lane_id < static_cast(max_key_idx - min_key_idx)) { + auto key = *(key_first + idx); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + local_degree = matrix_partition.get_local_degree(row_offset); + warp_key_local_edge_offsets[threadIdx.x] = matrix_partition.get_local_offset(row_offset); } - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - vertex_t const* indices{nullptr}; - thrust::optional weights{thrust::nullopt}; - edge_t local_out_degree{}; - thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); - for (edge_t i = 0; i < local_out_degree; ++i) { - auto col = indices[i]; - auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); - auto e_op_result = evaluate_edge_op() - .compute(key, - col, - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input.get(row_offset), - adj_matrix_col_value_input.get(col_offset), - e_op); - if (e_op_result) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - auto buffer_idx = atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(1)); - push_buffer_element( - col, e_op_result, buffer_key_output_first, buffer_payload_output_first, buffer_idx); + WarpScan(temp_storage) + .InclusiveSum(local_degree, warp_local_degree_inclusive_sums[threadIdx.x]); + __syncwarp(); + + // processes local edges for the keys in [key_first + min_key_idx, key_first + max_key_idx) + + auto num_edges_this_warp = warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + (max_key_idx - min_key_idx) - 1]; + auto rounded_up_num_edges_this_warp = + ((static_cast(num_edges_this_warp) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + for (size_t i = lane_id; i < rounded_up_num_edges_this_warp; i += raft::warp_size()) { + e_op_result_t e_op_result{}; + vertex_t col{}; + + if (i < static_cast(num_edges_this_warp)) { + auto key_idx_this_warp = static_cast(thrust::distance( + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + thrust::upper_bound(thrust::seq, + warp_local_degree_inclusive_sums + warp_id * raft::warp_size(), + warp_local_degree_inclusive_sums + warp_id * raft::warp_size() + + (max_key_idx - min_key_idx), + i))); + auto local_edge_offset = + warp_key_local_edge_offsets[warp_id * raft::warp_size() + key_idx_this_warp] + + static_cast(i - + ((key_idx_this_warp == 0) + ? edge_t{0} + : warp_local_degree_inclusive_sums[warp_id * raft::warp_size() + + key_idx_this_warp - 1])); + auto key = *(key_first + (min_key_idx + key_idx_this_warp)); + vertex_t row{}; + if constexpr (std::is_same_v) { + row = key; + } else { + row = thrust::get<0>(key); + } + col = indices[local_edge_offset]; + auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); + auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); + e_op_result = evaluate_edge_op() + .compute(key, + col, + weights ? (*weights)[local_edge_offset] : weight_t{1.0}, + adj_matrix_row_value_input.get(row_offset), + adj_matrix_col_value_input.get(col_offset), + e_op); + } + auto ballot = __ballot_sync(uint32_t{0xffffffff}, e_op_result ? uint32_t{1} : uint32_t{0}); + if (ballot > 0) { + if (lane_id == 0) { + auto increment = __popc(ballot); + static_assert(sizeof(unsigned long long int) == sizeof(size_t)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); + } + __syncwarp(); + if (e_op_result) { + auto buffer_warp_offset = + static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); + } } } + idx += gridDim.x * blockDim.x; } } @@ -419,6 +665,7 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( auto const tid = threadIdx.x + blockIdx.x * blockDim.x; static_assert(update_frontier_v_push_if_out_nbr_for_all_block_size % raft::warp_size() == 0); + auto const warp_id = threadIdx.x / raft::warp_size(); auto const lane_id = tid % raft::warp_size(); auto idx = static_cast(tid / raft::warp_size()); @@ -438,12 +685,14 @@ __global__ void for_all_frontier_row_for_all_nbr_mid_degree( edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); auto rounded_up_local_out_degree = - ((local_out_degree + (raft::warp_size() - 1)) / raft::warp_size()) * raft::warp_size(); - for (edge_t i = lane_id; i < rounded_up_local_out_degree; i += raft::warp_size()) { + ((static_cast(local_out_degree) + (raft::warp_size() - 1)) / raft::warp_size()) * + raft::warp_size(); + for (size_t i = lane_id; i < rounded_up_local_out_degree; i += raft::warp_size()) { e_op_result_t e_op_result{}; + vertex_t col{}; - auto col = indices[i]; - if (i < local_out_degree) { + if (i < static_cast(local_out_degree)) { + col = indices[i]; auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); e_op_result = evaluate_edge_op(buffer_idx_ptr), - static_cast(increment)); + buffer_warp_start_indices[warp_id] = + static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), + static_cast(increment))); } __syncwarp(); if (e_op_result) { auto buffer_warp_offset = static_cast(__popc(ballot & ~(uint32_t{0xffffffff} << lane_id))); - push_buffer_element( - col, - e_op_result, - buffer_key_output_first, - buffer_payload_output_first, - buffer_warp_start_indices[threadIdx.x / raft::warp_size()] + buffer_warp_offset); + push_buffer_element(col, + e_op_result, + buffer_key_output_first, + buffer_payload_output_first, + buffer_warp_start_indices[warp_id] + buffer_warp_offset); } } } @@ -542,15 +790,17 @@ __global__ void for_all_frontier_row_for_all_nbr_high_degree( edge_t local_out_degree{}; thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_offset); auto rounded_up_local_out_degree = - ((local_out_degree + (update_frontier_v_push_if_out_nbr_for_all_block_size - 1)) / + ((static_cast(local_out_degree) + + (update_frontier_v_push_if_out_nbr_for_all_block_size - 1)) / update_frontier_v_push_if_out_nbr_for_all_block_size) * update_frontier_v_push_if_out_nbr_for_all_block_size; - for (edge_t i = threadIdx.x; i < rounded_up_local_out_degree; i += blockDim.x) { + for (size_t i = threadIdx.x; i < rounded_up_local_out_degree; i += blockDim.x) { e_op_result_t e_op_result{}; + vertex_t col{}; edge_t buffer_block_offset{0}; - auto col = indices[i]; - if (i < local_out_degree) { + if (i < static_cast(local_out_degree)) { + col = indices[i]; auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); e_op_result = evaluate_edge_op Date: Tue, 21 Dec 2021 12:13:21 -0800 Subject: [PATCH 09/11] update hypersparse case of update_frontier_v_push_if_out_nbr --- .../update_frontier_v_push_if_out_nbr.cuh | 86 ------------------- 1 file changed, 86 deletions(-) diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index d3ef125a6f3..42a3957a44a 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -209,8 +209,6 @@ __device__ void push_buffer_element(vertex_t col, } } -// FIXME: test whether the first case is faster than the second case -#if 0 template -__global__ void for_all_frontier_row_for_all_nbr_hypersparse( - matrix_partition_device_view_t matrix_partition, - typename GraphViewType::vertex_type major_hypersparse_first, - KeyIterator key_first, - KeyIterator key_last, - AdjMatrixRowValueInputWrapper adj_matrix_row_value_input, - AdjMatrixColValueInputWrapper adj_matrix_col_value_input, - BufferKeyOutputIterator buffer_key_output_first, - BufferPayloadOutputIterator buffer_payload_output_first, - size_t* buffer_idx_ptr, - EdgeOp e_op) -{ - using vertex_t = typename GraphViewType::vertex_type; - using edge_t = typename GraphViewType::edge_type; - using weight_t = typename GraphViewType::weight_type; - using key_t = typename std::iterator_traits::value_type; - static_assert( - std::is_same_v::value_type>); - using payload_t = - typename optional_payload_buffer_value_type_t::value; - - static_assert(!GraphViewType::is_adj_matrix_transposed, - "GraphViewType should support the push model."); - - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto row_start_offset = - static_cast(major_hypersparse_first - matrix_partition.get_major_first()); - auto idx = static_cast(tid); - - while (idx < static_cast(thrust::distance(key_first, key_last))) { - auto key = *(key_first + idx); - vertex_t row{}; - if constexpr (std::is_same_v) { - row = key; - } else { - row = thrust::get<0>(key); - } - auto row_hypersparse_idx = matrix_partition.get_major_hypersparse_idx_from_major_nocheck(row); - if (row_hypersparse_idx) { - auto row_offset = matrix_partition.get_major_offset_from_major_nocheck(row); - auto row_idx = row_start_offset + *row_hypersparse_idx; - vertex_t const* indices{nullptr}; - thrust::optional weights{thrust::nullopt}; - edge_t local_out_degree{}; - thrust::tie(indices, weights, local_out_degree) = matrix_partition.get_local_edges(row_idx); - for (edge_t i = 0; i < local_out_degree; ++i) { - auto col = indices[i]; - auto col_offset = matrix_partition.get_minor_offset_from_minor_nocheck(col); - auto e_op_result = evaluate_edge_op() - .compute(key, - col, - weights ? (*weights)[i] : weight_t{1.0}, - adj_matrix_row_value_input.get(row_offset), - adj_matrix_col_value_input.get(col_offset), - e_op); - if (e_op_result) { - static_assert(sizeof(unsigned long long int) == sizeof(size_t)); - auto buffer_idx = - static_cast(atomicAdd(reinterpret_cast(buffer_idx_ptr), - static_cast(1))); - push_buffer_element( - col, e_op_result, buffer_key_output_first, buffer_payload_output_first, buffer_idx); - } - } - } - idx += gridDim.x * blockDim.x; - } -} -#endif template Date: Mon, 3 Jan 2022 09:54:25 -0800 Subject: [PATCH 10/11] test with unweighted graphs for BFS --- cpp/tests/traversal/bfs_test.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/tests/traversal/bfs_test.cpp b/cpp/tests/traversal/bfs_test.cpp index e15fc0d735b..568b2f24e68 100644 --- a/cpp/tests/traversal/bfs_test.cpp +++ b/cpp/tests/traversal/bfs_test.cpp @@ -110,7 +110,7 @@ class Tests_BFS : public ::testing::TestWithParam( - handle, input_usecase, true, renumber); + handle, input_usecase, false, renumber); if (cugraph::test::g_perf) { CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement @@ -157,7 +157,7 @@ class Tests_BFS : public ::testing::TestWithParam( - handle, input_usecase, true, false); + handle, input_usecase, false, false); } auto unrenumbered_graph_view = renumber ? unrenumbered_graph.view() : graph_view; From cdf56d102a1596911b20356a8af9d75354520dbf Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 4 Jan 2022 10:39:30 -0800 Subject: [PATCH 11/11] update copyright year --- cpp/include/cugraph/prims/property_op_utils.cuh | 2 +- cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh | 2 +- cpp/src/traversal/bfs_impl.cuh | 2 +- cpp/tests/traversal/bfs_test.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index 44efffcc5e0..1168617ae63 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index cc63c52949f..a07d5e3e054 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index a0396bdf3b3..3f46bd7724c 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/tests/traversal/bfs_test.cpp b/cpp/tests/traversal/bfs_test.cpp index 568b2f24e68..8edaefab8df 100644 --- a/cpp/tests/traversal/bfs_test.cpp +++ b/cpp/tests/traversal/bfs_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.