diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ed7005cc769..1de17223699 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -230,7 +230,6 @@ add_library(cugraph SHARED src/structure/create_graph_from_edgelist_mg.cu src/structure/symmetrize_edgelist_sg.cu src/structure/symmetrize_edgelist_mg.cu - src/utilities/host_barrier.cpp src/visitors/graph_envelope.cpp src/visitors/visitors_factory.cpp src/visitors/bfs_visitor.cpp diff --git a/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh b/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh index 508294c9e89..1d1b3810a53 100644 --- a/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh +++ b/cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -170,7 +170,7 @@ void copy_to_matrix_major(raft::handle_t const& handle, matrix_partition_device_view_t( graph_view.get_matrix_partition_view(i)); - if (col_comm_rank == i) { + if (i == col_comm_rank) { auto vertex_partition = vertex_partition_device_view_t( graph_view.get_vertex_partition_view()); @@ -365,7 +365,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle, matrix_partition_device_view_t( graph_view.get_matrix_partition_view(size_t{0})); for (int i = 0; i < row_comm_size; ++i) { - if (row_comm_rank == i) { + if (i == row_comm_rank) { auto vertex_partition = vertex_partition_device_view_t( graph_view.get_vertex_partition_view()); diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh index 5fb3b6544f7..c4b1b7fd8c2 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh @@ -35,6 +35,7 @@ #include #include +#include #include #include @@ -476,9 +477,11 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, VertexValueOutputIterator vertex_value_output_first) { constexpr auto update_major = (in == GraphViewType::is_adj_matrix_transposed); - using vertex_t = typename GraphViewType::vertex_type; - using edge_t = typename GraphViewType::edge_type; - using weight_t = typename GraphViewType::weight_type; + [[maybe_unused]] constexpr auto max_segments = + detail::num_sparse_segments_per_vertex_partition + size_t{1}; + using vertex_t = typename GraphViewType::vertex_type; + using edge_t = typename GraphViewType::edge_type; + using weight_t = typename GraphViewType::weight_type; static_assert(is_arithmetic_or_thrust_tuple_of_arithmetic::value); @@ -513,23 +516,98 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, } } + std::optional> stream_pool_indices{std::nullopt}; + if constexpr (GraphViewType::is_multi_gpu) { + if ((graph_view.get_local_adj_matrix_partition_segment_offsets(0)) && + (handle.get_stream_pool_size() >= max_segments)) { + for (size_t i = 1; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { + assert(graph_view.get_local_adj_matrix_partition_segment_offsets(i)); + } + + auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + auto const col_comm_size = col_comm.get_size(); + + // memory footprint vs parallelism trade-off + // peak memory requirement per loop is + // update_major ? V / comm_size * sizeof(T) : 0 + // and limit memory requirement to (E / comm_size) * sizeof(vertex_t) + + size_t num_streams = std::min(static_cast(col_comm_size) * max_segments, + (handle.get_stream_pool_size() / max_segments) * max_segments); + if constexpr (update_major) { + size_t value_size{0}; + if constexpr (is_thrust_tuple_of_arithmetic::value) { + auto elem_sizes = compute_thrust_tuple_element_sizes{}(); + value_size = std::reduce(elem_sizes.begin(), elem_sizes.end()); + } else { + value_size = sizeof(T); + } + + auto avg_vertex_degree = graph_view.get_number_of_vertices() > 0 + ? (static_cast(graph_view.get_number_of_edges()) / + static_cast(graph_view.get_number_of_vertices())) + : double{0.0}; + + num_streams = + std::min(static_cast(avg_vertex_degree * (static_cast(sizeof(vertex_t)) / + static_cast(value_size))) * + max_segments, + num_streams); + } + + if (num_streams >= max_segments) { + stream_pool_indices = std::vector(num_streams); + std::iota((*stream_pool_indices).begin(), (*stream_pool_indices).end(), size_t{0}); + handle.sync_stream(); + } + } + } + + std::vector(0, rmm::cuda_stream_view{}))> + major_tmp_buffers{}; + if constexpr (GraphViewType::is_multi_gpu && update_major) { + std::vector major_tmp_buffer_sizes( + graph_view.get_number_of_local_adj_matrix_partitions(), size_t{0}); + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { + major_tmp_buffer_sizes[i] = GraphViewType::is_adj_matrix_transposed + ? graph_view.get_number_of_local_adj_matrix_partition_cols(i) + : graph_view.get_number_of_local_adj_matrix_partition_rows(i); + } + if (stream_pool_indices) { + auto num_concurrent_loops = (*stream_pool_indices).size() / max_segments; + major_tmp_buffers.reserve(num_concurrent_loops); + for (size_t i = 0; i < num_concurrent_loops; ++i) { + size_t max_size{0}; + for (size_t j = i; j < graph_view.get_number_of_local_adj_matrix_partitions(); + j += num_concurrent_loops) { + max_size = std::max(major_tmp_buffer_sizes[j], max_size); + } + major_tmp_buffers.push_back(allocate_dataframe_buffer(max_size, handle.get_stream())); + } + } else { + major_tmp_buffers.reserve(1); + major_tmp_buffers.push_back(allocate_dataframe_buffer( + *std::max_element(major_tmp_buffer_sizes.begin(), major_tmp_buffer_sizes.end()), + handle.get_stream())); + } + } else { // dummy + major_tmp_buffers.reserve(1); + major_tmp_buffers.push_back(allocate_dataframe_buffer(size_t{0}, handle.get_stream())); + } + + if (stream_pool_indices) { handle.sync_stream(); } + for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { auto matrix_partition = matrix_partition_device_view_t( graph_view.get_matrix_partition_view(i)); - auto major_tmp_buffer_size = - GraphViewType::is_multi_gpu && update_major ? matrix_partition.get_major_size() : vertex_t{0}; - auto major_tmp_buffer = - allocate_dataframe_buffer(major_tmp_buffer_size, handle.get_stream()); - auto major_buffer_first = get_dataframe_buffer_begin(major_tmp_buffer); - auto major_init = T{}; if constexpr (update_major) { if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - major_init = (col_comm_rank == 0) ? init : T{}; + major_init = (static_cast(i) == col_comm_rank) ? init : T{}; } else { major_init = init; } @@ -543,6 +621,9 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, matrix_partition_row_value_input.set_local_adj_matrix_partition_idx(i); } + auto major_buffer_first = + get_dataframe_buffer_begin(major_tmp_buffers[i % major_tmp_buffers.size()]); + std::conditional_t 0) { - raft::grid_1d_block_t update_grid((*segment_offsets)[1], - detail::copy_v_transform_reduce_nbr_for_all_block_size, - handle.get_device_properties().maxGridSize[0]); - detail::for_all_major_for_all_nbr_high_degree - <<>>( + + // FIXME: we may further improve performance by 1) individually tuning block sizes for + // different segments; and 2) adding one more segment for very high degree vertices and + // running segmented reduction + if (matrix_partition.get_dcs_nzd_vertex_count()) { + auto exec_stream = + stream_pool_indices + ? handle.get_stream_from_stream_pool((i * max_segments) % (*stream_pool_indices).size()) + : handle.get_stream(); + if constexpr (update_major) { // this is necessary as we don't visit every vertex in the + // hypersparse segment in + // for_all_major_for_all_nbr_hypersparse + thrust::fill(rmm::exec_policy(exec_stream), + output_buffer + (*segment_offsets)[3], + output_buffer + (*segment_offsets)[4], + major_init); + } + if (*(matrix_partition.get_dcs_nzd_vertex_count()) > 0) { + raft::grid_1d_thread_t update_grid(*(matrix_partition.get_dcs_nzd_vertex_count()), + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + auto segment_output_buffer = output_buffer; + if constexpr (update_major) { segment_output_buffer += (*segment_offsets)[3]; } + detail::for_all_major_for_all_nbr_hypersparse + <<>>( + matrix_partition, + matrix_partition.get_major_first() + (*segment_offsets)[3], + matrix_partition_row_value_input, + matrix_partition_col_value_input, + segment_output_buffer, + e_op, + major_init); + } + } + if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { + auto exec_stream = stream_pool_indices + ? handle.get_stream_from_stream_pool((i * max_segments + 1) % + (*stream_pool_indices).size()) + : handle.get_stream(); + raft::grid_1d_thread_t update_grid((*segment_offsets)[3] - (*segment_offsets)[2], + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + auto segment_output_buffer = output_buffer; + if constexpr (update_major) { segment_output_buffer += (*segment_offsets)[2]; } + detail::for_all_major_for_all_nbr_low_degree + <<>>( matrix_partition, - matrix_partition.get_major_first(), - matrix_partition.get_major_first() + (*segment_offsets)[1], + matrix_partition.get_major_first() + (*segment_offsets)[2], + matrix_partition.get_major_first() + (*segment_offsets)[3], matrix_partition_row_value_input, matrix_partition_col_value_input, - output_buffer, + segment_output_buffer, e_op, major_init); } if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { + auto exec_stream = stream_pool_indices + ? handle.get_stream_from_stream_pool((i * max_segments + 2) % + (*stream_pool_indices).size()) + : handle.get_stream(); raft::grid_1d_warp_t update_grid((*segment_offsets)[2] - (*segment_offsets)[1], detail::copy_v_transform_reduce_nbr_for_all_block_size, handle.get_device_properties().maxGridSize[0]); auto segment_output_buffer = output_buffer; if constexpr (update_major) { segment_output_buffer += (*segment_offsets)[1]; } detail::for_all_major_for_all_nbr_mid_degree - <<>>( + <<>>( matrix_partition, matrix_partition.get_major_first() + (*segment_offsets)[1], matrix_partition.get_major_first() + (*segment_offsets)[2], @@ -596,49 +719,25 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, e_op, major_init); } - if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { - raft::grid_1d_thread_t update_grid((*segment_offsets)[3] - (*segment_offsets)[2], - detail::copy_v_transform_reduce_nbr_for_all_block_size, - handle.get_device_properties().maxGridSize[0]); - auto segment_output_buffer = output_buffer; - if constexpr (update_major) { segment_output_buffer += (*segment_offsets)[2]; } - detail::for_all_major_for_all_nbr_low_degree - <<>>( + if ((*segment_offsets)[1] > 0) { + auto exec_stream = stream_pool_indices + ? handle.get_stream_from_stream_pool((i * max_segments + 3) % + (*stream_pool_indices).size()) + : handle.get_stream(); + raft::grid_1d_block_t update_grid((*segment_offsets)[1], + detail::copy_v_transform_reduce_nbr_for_all_block_size, + handle.get_device_properties().maxGridSize[0]); + detail::for_all_major_for_all_nbr_high_degree + <<>>( matrix_partition, - matrix_partition.get_major_first() + (*segment_offsets)[2], - matrix_partition.get_major_first() + (*segment_offsets)[3], + matrix_partition.get_major_first(), + matrix_partition.get_major_first() + (*segment_offsets)[1], matrix_partition_row_value_input, matrix_partition_col_value_input, - segment_output_buffer, + output_buffer, e_op, major_init); } - if (matrix_partition.get_dcs_nzd_vertex_count()) { - if constexpr (update_major) { // this is necessary as we don't visit every vertex in the - // hypersparse segment in - // for_all_major_for_all_nbr_hypersparse - thrust::fill(handle.get_thrust_policy(), - output_buffer + (*segment_offsets)[3], - output_buffer + (*segment_offsets)[4], - major_init); - } - if (*(matrix_partition.get_dcs_nzd_vertex_count()) > 0) { - raft::grid_1d_thread_t update_grid(*(matrix_partition.get_dcs_nzd_vertex_count()), - detail::copy_v_transform_reduce_nbr_for_all_block_size, - handle.get_device_properties().maxGridSize[0]); - auto segment_output_buffer = output_buffer; - if constexpr (update_major) { segment_output_buffer += (*segment_offsets)[3]; } - detail::for_all_major_for_all_nbr_hypersparse - <<>>( - matrix_partition, - matrix_partition.get_major_first() + (*segment_offsets)[3], - matrix_partition_row_value_input, - matrix_partition_col_value_input, - segment_output_buffer, - e_op, - major_init); - } - } } else { if (matrix_partition.get_major_size() > 0) { raft::grid_1d_thread_t update_grid(matrix_partition.get_major_size(), @@ -666,16 +765,61 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); - device_reduce(col_comm, - major_buffer_first, - vertex_value_output_first, - matrix_partition.get_major_size(), - raft::comms::op_t::SUM, - i, - handle.get_stream()); + if (segment_offsets && stream_pool_indices) { + if ((*segment_offsets).back() - (*segment_offsets)[3] > 0) { + device_reduce( + col_comm, + major_buffer_first + (*segment_offsets)[3], + vertex_value_output_first + (*segment_offsets)[3], + (*segment_offsets).back() - (*segment_offsets)[3], + raft::comms::op_t::SUM, + i, + handle.get_stream_from_stream_pool((i * max_segments) % (*stream_pool_indices).size())); + } + if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { + device_reduce(col_comm, + major_buffer_first + (*segment_offsets)[2], + vertex_value_output_first + (*segment_offsets)[2], + (*segment_offsets)[3] - (*segment_offsets)[2], + raft::comms::op_t::SUM, + i, + handle.get_stream_from_stream_pool((i * max_segments + 1) % + (*stream_pool_indices).size())); + } + if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { + device_reduce(col_comm, + major_buffer_first + (*segment_offsets)[1], + vertex_value_output_first + (*segment_offsets)[1], + (*segment_offsets)[2] - (*segment_offsets)[1], + raft::comms::op_t::SUM, + i, + handle.get_stream_from_stream_pool((i * max_segments + 2) % + (*stream_pool_indices).size())); + } + if ((*segment_offsets)[1] > 0) { + device_reduce(col_comm, + major_buffer_first, + vertex_value_output_first, + (*segment_offsets)[1], + raft::comms::op_t::SUM, + i, + handle.get_stream_from_stream_pool((i * max_segments + 3) % + (*stream_pool_indices).size())); + } + } else { + device_reduce(col_comm, + major_buffer_first, + vertex_value_output_first, + matrix_partition.get_major_size(), + raft::comms::op_t::SUM, + i, + handle.get_stream()); + } } } + if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); } + if constexpr (GraphViewType::is_multi_gpu && !update_major) { auto& comm = handle.get_comms(); auto const comm_rank = comm.get_rank(); diff --git a/cpp/include/cugraph/utilities/host_barrier.hpp b/cpp/include/cugraph/utilities/host_barrier.hpp deleted file mode 100644 index 6825814eb93..00000000000 --- a/cpp/include/cugraph/utilities/host_barrier.hpp +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include - -namespace cugraph { - -// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and -// MPI barrier for MPI) -void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view); - -} // namespace cugraph diff --git a/cpp/src/utilities/host_barrier.cpp b/cpp/src/utilities/host_barrier.cpp deleted file mode 100644 index 2887350ad4d..00000000000 --- a/cpp/src/utilities/host_barrier.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include - -#include - -namespace cugraph { - -// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and -// MPI barrier for MPI) -void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view) -{ - stream_view.synchronize(); - - auto const comm_size = comm.get_size(); - auto const comm_rank = comm.get_rank(); - - // k-tree barrier - - int constexpr k = 2; - static_assert(k >= 2); - std::vector requests(k - 1); - std::vector dummies(k - 1); - - // up - - int mod = 1; - while (mod < comm_size) { - if (comm_rank % mod == 0) { - auto level_rank = comm_rank / mod; - if (level_rank % k == 0) { - auto num_irecvs = 0; - ; - for (int i = 1; i < k; ++i) { - auto src_rank = (level_rank + i) * mod; - if (src_rank < comm_size) { - comm.irecv(dummies.data() + (i - 1), - sizeof(std::byte), - src_rank, - int{0} /* tag */, - requests.data() + (i - 1)); - ++num_irecvs; - } - } - comm.waitall(num_irecvs, requests.data()); - } else { - comm.isend(dummies.data(), - sizeof(std::byte), - (level_rank - (level_rank % k)) * mod, - int{0} /* tag */, - requests.data()); - comm.waitall(1, requests.data()); - } - } - mod *= k; - } - - // down - - mod /= k; - while (mod >= 1) { - if (comm_rank % mod == 0) { - auto level_rank = comm_rank / mod; - if (level_rank % k == 0) { - auto num_isends = 0; - for (int i = 1; i < k; ++i) { - auto dst_rank = (level_rank + i) * mod; - if (dst_rank < comm_size) { - comm.isend(dummies.data() + (i - 1), - sizeof(std::byte), - dst_rank, - int{0} /* tag */, - requests.data() + (i - 1)); - ++num_isends; - } - } - comm.waitall(num_isends, requests.data()); - } else { - comm.irecv(dummies.data(), - sizeof(std::byte), - (level_rank - (level_rank % k)) * mod, - int{0} /* tag */, - requests.data()); - comm.waitall(1, requests.data()); - } - } - mod /= k; - } -} - -} // namespace cugraph diff --git a/cpp/tests/link_analysis/mg_pagerank_test.cpp b/cpp/tests/link_analysis/mg_pagerank_test.cpp index adcd0c94a8f..1f199668d6f 100644 --- a/cpp/tests/link_analysis/mg_pagerank_test.cpp +++ b/cpp/tests/link_analysis/mg_pagerank_test.cpp @@ -61,7 +61,9 @@ class Tests_MGPageRank { // 1. initialize handle - raft::handle_t handle{}; + auto constexpr pool_size = 64; // FIXME: tuning parameter + raft::handle_t handle(rmm::cuda_stream_per_thread, + std::make_shared(pool_size)); HighResClock hr_clock{}; raft::comms::initialize_mpi_comms(&handle, MPI_COMM_WORLD); @@ -73,6 +75,7 @@ class Tests_MGPageRank while (comm_size % row_comm_size != 0) { --row_comm_size; } + cugraph::partition_2d::subcomm_factory_t subcomm_factory(handle, row_comm_size);