From 3cc519f01040eb00d1517f3633a3877fb743c9c2 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Mon, 5 Jun 2023 00:33:31 -0500 Subject: [PATCH] Improve the MPI parcelport and change the default zero-copy serialization threshold from 128 to 8192 - Replace the lock-based tag provider with an atomic variable - Make the header message size dynamic --- CMakeLists.txt | 4 +- cmake/toolchains/Cray.cmake | 6 - cmake/toolchains/CrayKNL.cmake | 6 - cmake/toolchains/CrayKNLStatic.cmake | 6 - cmake/toolchains/CrayStatic.cmake | 6 - .../include/hpx/mpi_base/mpi_environment.hpp | 2 + libs/core/mpi_base/src/mpi_environment.cpp | 7 + .../include/hpx/parcelport_lci/header.hpp | 8 - .../include/hpx/parcelport_mpi/header.hpp | 221 +++++++++++++----- .../include/hpx/parcelport_mpi/receiver.hpp | 32 +-- .../parcelport_mpi/receiver_connection.hpp | 120 +++++----- .../include/hpx/parcelport_mpi/sender.hpp | 78 +------ .../hpx/parcelport_mpi/sender_connection.hpp | 16 +- .../hpx/parcelport_mpi/tag_provider.hpp | 36 +-- 14 files changed, 277 insertions(+), 271 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0dee5075ee69..151dccba248f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -655,8 +655,8 @@ endif() hpx_option( HPX_WITH_ZERO_COPY_SERIALIZATION_THRESHOLD STRING - "The threshold in bytes to when perform zero copy optimizations (default: 128)" - "128" + "The threshold in bytes to when perform zero copy optimizations (default: 8192)" + "8192" ADVANCED ) hpx_add_config_define( diff --git a/cmake/toolchains/Cray.cmake b/cmake/toolchains/Cray.cmake index 0b6e6d3eb1cc..83b9c051f133 100644 --- a/cmake/toolchains/Cray.cmake +++ b/cmake/toolchains/Cray.cmake @@ -90,12 +90,6 @@ set(HPX_PARCELPORT_LIBFABRIC_WITH_LOGGING OFF CACHE BOOL "Libfabric parcelport logging on/off flag" ) -set(HPX_WITH_ZERO_COPY_SERIALIZATION_THRESHOLD - "4096" - CACHE - STRING - "The threshold in bytes to when perform zero copy optimizations (default: 128)" -) # We do a cross compilation here ... set(CMAKE_CROSSCOMPILING diff --git a/cmake/toolchains/CrayKNL.cmake b/cmake/toolchains/CrayKNL.cmake index cd9da9c23633..126bcc9a0385 100644 --- a/cmake/toolchains/CrayKNL.cmake +++ b/cmake/toolchains/CrayKNL.cmake @@ -88,12 +88,6 @@ set(HPX_PARCELPORT_LIBFABRIC_WITH_LOGGING OFF CACHE BOOL "Libfabric parcelport logging on/off flag" ) -set(HPX_WITH_ZERO_COPY_SERIALIZATION_THRESHOLD - "4096" - CACHE - STRING - "The threshold in bytes to when perform zero copy optimizations (default: 128)" -) # Set the TBBMALLOC_PLATFORM correctly so that find_package(TBBMalloc) sets the # right hints diff --git a/cmake/toolchains/CrayKNLStatic.cmake b/cmake/toolchains/CrayKNLStatic.cmake index 395b719ce05f..97843059eaa7 100644 --- a/cmake/toolchains/CrayKNLStatic.cmake +++ b/cmake/toolchains/CrayKNLStatic.cmake @@ -72,12 +72,6 @@ set(HPX_PARCELPORT_LIBFABRIC_WITH_LOGGING OFF CACHE BOOL "Libfabric parcelport logging on/off flag" ) -set(HPX_WITH_ZERO_COPY_SERIALIZATION_THRESHOLD - "4096" - CACHE - STRING - "The threshold in bytes to when perform zero copy optimizations (default: 128)" -) # Set the TBBMALLOC_PLATFORM correctly so that find_package(TBBMalloc) sets the # right hints diff --git a/cmake/toolchains/CrayStatic.cmake b/cmake/toolchains/CrayStatic.cmake index 426afb1fb41b..f89757a2e72c 100644 --- a/cmake/toolchains/CrayStatic.cmake +++ b/cmake/toolchains/CrayStatic.cmake @@ -83,9 +83,3 @@ set(HPX_PARCELPORT_LIBFABRIC_WITH_LOGGING OFF CACHE BOOL "Libfabric parcelport logging on/off flag" ) -set(HPX_WITH_ZERO_COPY_SERIALIZATION_THRESHOLD - "4096" - CACHE - STRING - "The threshold in bytes to when perform zero copy optimizations (default: 128)" -) diff --git a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp index c8746f39d59f..97423be36d66 100644 --- a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp +++ b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp @@ -86,6 +86,8 @@ namespace hpx::util { using mutex_type = hpx::spinlock; + static int MPI_MAX_TAG; + private: static mutex_type mtx_; diff --git a/libs/core/mpi_base/src/mpi_environment.cpp b/libs/core/mpi_base/src/mpi_environment.cpp index 29ab87ba3896..5b7e8325c726 100644 --- a/libs/core/mpi_base/src/mpi_environment.cpp +++ b/libs/core/mpi_base/src/mpi_environment.cpp @@ -23,6 +23,8 @@ /////////////////////////////////////////////////////////////////////////////// namespace hpx::util { + int mpi_environment::MPI_MAX_TAG = 32767; + namespace detail { bool detect_mpi_environment( @@ -267,6 +269,11 @@ namespace hpx::util { rtcfg.add_entry("hpx.parcel.mpi.rank", std::to_string(this_rank)); rtcfg.add_entry("hpx.parcel.mpi.processorname", get_processor_name()); + void* max_tag_p; + int flag; + MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &max_tag_p, &flag); + if (flag) + MPI_MAX_TAG = *(int*) max_tag_p; } std::string mpi_environment::get_processor_name() diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index 4401e0751b2c..2ebb7f18ea76 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -108,14 +108,6 @@ namespace hpx::parcelset::policies::lci { data_ = header_buffer; } - void reset() noexcept - { - if (data_ != nullptr) - { - free(data_); - } - } - bool valid() const noexcept { return data_ != nullptr && signature() == MAGIC_SIGNATURE; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp index 953609bbb0ac..de76bb9562f8 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/header.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2013-2023 Hartmut Kaiser // Copyright (c) 2013-2015 Thomas Heller +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -11,7 +12,7 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_MPI) #include -#include + #include #include @@ -21,120 +22,228 @@ #include namespace hpx::parcelset::policies::mpi { - struct header { using value_type = int; - enum data_pos { - pos_tag = 0 * sizeof(value_type), - pos_size = 1 * sizeof(value_type), - pos_numbytes = 2 * sizeof(value_type), - pos_numchunks_first = 3 * sizeof(value_type), - pos_numchunks_second = 4 * sizeof(value_type), - pos_piggy_back_flag = 5 * sizeof(value_type), - pos_piggy_back_data = 5 * sizeof(value_type) + 1 + // siguature for assert_valid + pos_signature = 0, + // tag + pos_tag = 1 * sizeof(value_type), + // non-zero-copy chunk size + pos_numbytes_nonzero_copy = 2 * sizeof(value_type), + // transmission chunk size + pos_numbytes_tchunk = 3 * sizeof(value_type), + // how many bytes in total (including zero-copy and non-zero-copy chunks) + pos_numbytes = 4 * sizeof(value_type), + // zero-copy chunk number + pos_numchunks_zero_copy = 5 * sizeof(value_type), + // non-zero-copy chunk number + pos_numchunks_nonzero_copy = 6 * sizeof(value_type), + // whether piggyback data + pos_piggy_back_flag_data = 7 * sizeof(value_type), + // whether piggyback transmission chunk + pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1, + pos_piggy_back_address = 7 * sizeof(value_type) + 2 }; - static constexpr std::int32_t data_size_ = 512; - - template - header(Buffer const& buffer, int tag) noexcept + template + static size_t get_header_size( + parcel_buffer const& buffer, + size_t max_header_size) noexcept { - init(buffer, tag); + HPX_ASSERT(max_header_size >= pos_piggy_back_address); + + size_t current_header_size = pos_piggy_back_address; + if (buffer.data_.size() <= (max_header_size - current_header_size)) + { + current_header_size += buffer.data_.size(); + } + int num_zero_copy_chunks = buffer.num_chunks_.first; + [[maybe_unused]] int num_non_zero_copy_chunks = + buffer.num_chunks_.second; + if (num_zero_copy_chunks != 0) + { + HPX_ASSERT(buffer.transmission_chunks_.size() == + size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); + int tchunk_size = + static_cast(buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type)); + if (tchunk_size <= int(max_header_size - current_header_size)) + { + current_header_size += tchunk_size; + } + } + return current_header_size; } - template - void init(Buffer const& buffer, int tag) noexcept + template + header(parcel_buffer const& buffer, + char* header_buffer, size_t max_header_size) noexcept { - auto const size = static_cast(buffer.size_); - auto const numbytes = static_cast(buffer.data_size_); - + HPX_ASSERT(max_header_size >= pos_piggy_back_address); + data_ = header_buffer; + memset(data_, 0, pos_piggy_back_address); + std::int64_t size = static_cast(buffer.data_.size()); + std::int64_t numbytes = + static_cast(buffer.data_size_); HPX_ASSERT(size <= (std::numeric_limits::max)()); HPX_ASSERT(numbytes <= (std::numeric_limits::max)()); + int num_zero_copy_chunks = buffer.num_chunks_.first; + int num_non_zero_copy_chunks = buffer.num_chunks_.second; - set(pos_tag, tag); - set(pos_size, static_cast(size)); + set(pos_signature, MAGIC_SIGNATURE); + set(pos_numbytes_nonzero_copy, static_cast(size)); set(pos_numbytes, static_cast(numbytes)); - set(pos_numchunks_first, - static_cast(buffer.num_chunks_.first)); - set(pos_numchunks_second, - static_cast(buffer.num_chunks_.second)); + set(pos_numchunks_zero_copy, + static_cast(num_zero_copy_chunks)); + set(pos_numchunks_nonzero_copy, + static_cast(num_non_zero_copy_chunks)); + data_[pos_piggy_back_flag_data] = 0; + data_[pos_piggy_back_flag_tchunk] = 0; - if (buffer.data_.size() <= (data_size_ - pos_piggy_back_data)) + size_t current_header_size = pos_piggy_back_address; + if (buffer.data_.size() <= (max_header_size - current_header_size)) { - data_[pos_piggy_back_flag] = 1; - std::memcpy(&data_[pos_piggy_back_data], buffer.data_.data(), - buffer.data_.size()); + data_[pos_piggy_back_flag_data] = 1; + std::memcpy( + &data_[current_header_size], &buffer.data_[0], size); + current_header_size += size; } - else + if (num_zero_copy_chunks != 0) { - data_[pos_piggy_back_flag] = 0; + HPX_ASSERT(buffer.transmission_chunks_.size() == + size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); + int tchunk_size = + static_cast(buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type)); + set(pos_numbytes_tchunk, static_cast(tchunk_size)); + if (tchunk_size <= int(max_header_size - current_header_size)) + { + data_[pos_piggy_back_flag_tchunk] = 1; + std::memcpy(&data_[current_header_size], + buffer.transmission_chunks_.data(), tchunk_size); + } } } header() noexcept { - reset(); + data_ = nullptr; + } + + explicit header(char* header_buffer) noexcept + { + data_ = header_buffer; } void reset() noexcept { - std::memset(data_.data(), static_cast(-1), data_size_); - data_[pos_piggy_back_flag] = 1; + data_ = nullptr; } - [[nodiscard]] constexpr bool valid() const noexcept + [[nodiscard]] bool valid() const noexcept { - return data_[0] != static_cast(-1); + return data_ != nullptr && signature() == MAGIC_SIGNATURE; } void assert_valid() const noexcept { - HPX_ASSERT(tag() != -1); - HPX_ASSERT(size() != -1); - HPX_ASSERT(numbytes() != -1); - HPX_ASSERT(num_chunks().first != -1); - HPX_ASSERT(num_chunks().second != -1); + HPX_ASSERT(valid()); + } + + [[nodiscard]] char* data() noexcept + { + return data_; + } + + [[nodiscard]] size_t size() noexcept + { + return pos_piggy_back_address + piggy_back_size(); } - constexpr char* data() noexcept + [[nodiscard]] value_type signature() const noexcept { - return data_.data(); + return get(pos_signature); } - [[nodiscard]] constexpr value_type tag() const noexcept + void set_tag(int tag) noexcept + { + set(pos_tag, static_cast(tag)); + } + + [[nodiscard]] value_type get_tag() const noexcept { return get(pos_tag); } - [[nodiscard]] constexpr value_type size() const noexcept + [[nodiscard]] value_type numbytes_nonzero_copy() const noexcept + { + return get(pos_numbytes_nonzero_copy); + } + + [[nodiscard]] value_type numbytes_tchunk() const noexcept { - return get(pos_size); + return get(pos_numbytes_tchunk); } - [[nodiscard]] constexpr value_type numbytes() const noexcept + [[nodiscard]] value_type numbytes() const noexcept { return get(pos_numbytes); } - [[nodiscard]] constexpr std::pair num_chunks() - const noexcept + [[nodiscard]] value_type num_zero_copy_chunks() const noexcept { - return std::make_pair( - get(pos_numchunks_first), get(pos_numchunks_second)); + return get(pos_numchunks_zero_copy); } - [[nodiscard]] constexpr char* piggy_back() noexcept + [[nodiscard]] value_type num_non_zero_copy_chunks() const noexcept { - if (data_[pos_piggy_back_flag]) - return &data_[pos_piggy_back_data]; + return get(pos_numchunks_nonzero_copy); + } + + [[nodiscard]] constexpr char* piggy_back_address() noexcept + { + if (data_[pos_piggy_back_flag_data] || + data_[pos_piggy_back_flag_tchunk]) + return &data_[pos_piggy_back_address]; return nullptr; } + [[nodiscard]] int piggy_back_size() noexcept + { + int result = 0; + if (data_[pos_piggy_back_flag_data]) + result += numbytes_nonzero_copy(); + if (data_[pos_piggy_back_flag_tchunk]) + result += numbytes_tchunk(); + return result; + } + + [[nodiscard]] constexpr char* piggy_back_data() noexcept + { + if (data_[pos_piggy_back_flag_data]) + return &data_[pos_piggy_back_address]; + return nullptr; + } + + [[nodiscard]] constexpr char* piggy_back_tchunk() noexcept + { + size_t current_header_size = pos_piggy_back_address; + if (!data_[pos_piggy_back_flag_tchunk]) + return nullptr; + if (data_[pos_piggy_back_flag_data]) + current_header_size += numbytes_nonzero_copy(); + return &data_[current_header_size]; + } + private: - std::array data_; + // random magic number for assert_valid + static constexpr int MAGIC_SIGNATURE = 19527; + char* data_; constexpr void set(std::size_t Pos, value_type const& t) noexcept { diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp index eb9805513edb..1fb83e7922d4 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2007-2021 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -23,13 +24,13 @@ #include #include #include +#include namespace hpx::parcelset::policies::mpi { template struct receiver { - using header_list = std::list>; using handles_header_type = std::set>; using connection_type = receiver_connection; using connection_ptr = std::shared_ptr; @@ -38,15 +39,14 @@ namespace hpx::parcelset::policies::mpi { explicit constexpr receiver(Parcelport& pp) noexcept : pp_(pp) , hdr_request_(0) + , header_buffer_(pp.get_zero_copy_serialization_threshold()) { } void run() noexcept { util::mpi_environment::scoped_lock l; - - [[maybe_unused]] header h; - new_header(l, h); + post_new_header(l); } bool background_work() noexcept @@ -112,14 +112,20 @@ namespace hpx::parcelset::policies::mpi { MPI_Status status; if (request_done_locked(l, hdr_request_, &status)) { - header h; - new_header(l, h); + int recv_size = 0; + [[maybe_unused]] int ret = + MPI_Get_count(&status, MPI_CHAR, &recv_size); + HPX_ASSERT(ret == MPI_SUCCESS); + std::vector recv_header(header_buffer_.begin(), + header_buffer_.begin() + recv_size); + + post_new_header(l); l.unlock(); header_lock.unlock(); return std::make_shared( - status.MPI_SOURCE, h, pp_); + status.MPI_SOURCE, HPX_MOVE(recv_header), pp_); } } @@ -131,15 +137,11 @@ namespace hpx::parcelset::policies::mpi { } template - void new_header([[maybe_unused]] Lock& l, header& h) noexcept + void post_new_header([[maybe_unused]] Lock& l) noexcept { HPX_ASSERT_OWNS_LOCK(l); - - h = rcv_header_; - rcv_header_.reset(); - - [[maybe_unused]] int const ret = MPI_Irecv(rcv_header_.data(), - header::data_size_, MPI_BYTE, MPI_ANY_SOURCE, 0, + [[maybe_unused]] int const ret = MPI_Irecv(header_buffer_.data(), + header_buffer_.size(), MPI_BYTE, MPI_ANY_SOURCE, 0, util::mpi_environment::communicator(), &hdr_request_); HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); } @@ -148,7 +150,7 @@ namespace hpx::parcelset::policies::mpi { hpx::spinlock headers_mtx_; MPI_Request hdr_request_; - header rcv_header_; + std::vector header_buffer_; hpx::spinlock handles_header_mtx_; handles_header_type handles_header_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp index 18a9b172dec0..3abba418bf00 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2007-2023 Hartmut Kaiser +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -37,7 +38,6 @@ namespace hpx::parcelset::policies::mpi { rcvd_transmission_chunks, rcvd_data, rcvd_chunks, - sent_release_tag }; using data_type = std::vector; @@ -45,25 +45,67 @@ namespace hpx::parcelset::policies::mpi { parcel_buffer; public: - receiver_connection(int src, header const& h, Parcelport& pp) noexcept + receiver_connection( + int src, std::vector header_buffer, Parcelport& pp) noexcept : state_(initialized) , src_(src) - , tag_(h.tag()) - , header_(h) , request_(MPI_REQUEST_NULL) , request_ptr_(nullptr) , chunks_idx_(0) , zero_copy_chunks_idx_(0) , pp_(pp) { + header header_ = header(header_buffer.data()); header_.assert_valid(); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point& data = buffer_.data_point_; data.time_ = timer_.elapsed_nanoseconds(); data.bytes_ = static_cast(header_.numbytes()); #endif - buffer_.data_.resize(static_cast(header_.size())); - buffer_.num_chunks_ = header_.num_chunks(); + tag_ = header_.get_tag(); + // decode data + buffer_.data_.resize(header_.numbytes_nonzero_copy()); + char* piggy_back_data = header_.piggy_back_data(); + if (piggy_back_data) + { + need_recv_data = false; + memcpy(buffer_.data_.data(), piggy_back_data, + buffer_.data_.size()); + } + else + { + need_recv_data = true; + } + need_recv_tchunks = false; + if (header_.num_zero_copy_chunks() != 0) + { + // decode transmission chunk + int num_zero_copy_chunks = header_.num_zero_copy_chunks(); + int num_non_zero_copy_chunks = + header_.num_non_zero_copy_chunks(); + buffer_.num_chunks_.first = num_zero_copy_chunks; + buffer_.num_chunks_.second = num_non_zero_copy_chunks; + auto& tchunks = buffer_.transmission_chunks_; + tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); + int tchunks_length = static_cast(tchunks.size() * + sizeof(buffer_type::transmission_chunk_type)); + char* piggy_back_tchunk = header_.piggy_back_tchunk(); + if (piggy_back_tchunk) + { + memcpy(static_cast(tchunks.data()), + piggy_back_tchunk, tchunks_length); + } + else + { + need_recv_tchunks = true; + } + // zero-copy chunks + buffer_.chunks_.resize(num_zero_copy_chunks); + if (!pp_.allow_zero_copy_receive_optimizations()) + { + chunk_buffers_.resize(num_zero_copy_chunks); + } + } } bool receive(std::size_t num_thread = -1) @@ -80,10 +122,7 @@ namespace hpx::parcelset::policies::mpi { return receive_chunks(num_thread); case rcvd_chunks: - return send_release_tag(num_thread); - - case sent_release_tag: - return done(); + return done(num_thread); default: HPX_ASSERT(false); @@ -93,34 +132,19 @@ namespace hpx::parcelset::policies::mpi { bool receive_transmission_chunks(std::size_t num_thread = -1) { - // determine the size of the chunk buffer - auto const num_zero_copy_chunks = static_cast( - static_cast(buffer_.num_chunks_.first)); - auto const num_non_zero_copy_chunks = static_cast( - static_cast(buffer_.num_chunks_.second)); - buffer_.transmission_chunks_.resize( - num_zero_copy_chunks + num_non_zero_copy_chunks); - if (num_zero_copy_chunks != 0) + if (need_recv_tchunks) { - buffer_.chunks_.resize(num_zero_copy_chunks); - if (!pp_.allow_zero_copy_receive_optimizations()) - { - chunk_buffers_.resize(num_zero_copy_chunks); - } - - { - util::mpi_environment::scoped_lock l; + util::mpi_environment::scoped_lock l; - [[maybe_unused]] int const ret = MPI_Irecv( - buffer_.transmission_chunks_.data(), + [[maybe_unused]] int const ret = + MPI_Irecv(buffer_.transmission_chunks_.data(), static_cast(buffer_.transmission_chunks_.size() * sizeof(buffer_type::transmission_chunk_type)), MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); + HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); - request_ptr_ = &request_; - } + request_ptr_ = &request_; } state_ = rcvd_transmission_chunks; @@ -135,12 +159,7 @@ namespace hpx::parcelset::policies::mpi { return false; } - if (char const* piggy_back = header_.piggy_back()) - { - std::memcpy( - buffer_.data_.data(), piggy_back, buffer_.data_.size()); - } - else + if (need_recv_data) { util::mpi_environment::scoped_lock l; @@ -282,10 +301,10 @@ namespace hpx::parcelset::policies::mpi { } state_ = rcvd_chunks; - return send_release_tag(num_thread); + return done(num_thread); } - bool send_release_tag(std::size_t num_thread = -1) + bool done(std::size_t num_thread = -1) noexcept { if (!request_done()) { @@ -296,16 +315,6 @@ namespace hpx::parcelset::policies::mpi { parcelset::data_point& data = buffer_.data_point_; data.time_ = timer_.elapsed_nanoseconds() - data.time_; #endif - { - util::mpi_environment::scoped_lock l; - - [[maybe_unused]] int const ret = MPI_Isend(&tag_, 1, MPI_INT, - src_, 1, util::mpi_environment::communicator(), &request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); - - request_ptr_ = &request_; - } - if (parcels_.empty()) { // decode and handle received data @@ -324,15 +333,7 @@ namespace hpx::parcelset::policies::mpi { handle_received_parcels(HPX_MOVE(parcels_)); buffer_ = buffer_type{}; } - - state_ = sent_release_tag; - - return done(); - } - - bool done() noexcept - { - return request_done(); + return true; } bool request_done() noexcept @@ -368,7 +369,8 @@ namespace hpx::parcelset::policies::mpi { int src_; int tag_; - header header_; + bool need_recv_data; + bool need_recv_tchunks; buffer_type buffer_; MPI_Request request_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp index a3d84a125f11..e1a8d0592ff6 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2007-2021 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -34,20 +35,7 @@ namespace hpx::parcelset::policies::mpi { using connection_ptr = std::shared_ptr; using connection_list = std::deque; - // different versions of clang-format disagree - // clang-format off - sender() noexcept - : next_free_tag_request_((MPI_Request) (-1)) - , next_free_tag_(-1) - { - } - // clang-format on - - void run() noexcept - { - util::mpi_environment::scoped_lock l; - get_next_free_tag(l); - } + void run() noexcept {} connection_ptr create_connection(int dest, parcelset::parcelport* pp) { @@ -62,7 +50,7 @@ namespace hpx::parcelset::policies::mpi { int acquire_tag() noexcept { - return tag_provider_.acquire(); + return tag_provider_.get_next_tag(); } void send_messages(connection_ptr connection) @@ -105,7 +93,6 @@ namespace hpx::parcelset::policies::mpi { send_messages(HPX_MOVE(connection)); has_work = true; } - next_free_tag(); return has_work; } @@ -127,67 +114,8 @@ namespace hpx::parcelset::policies::mpi { private: tag_provider tag_provider_; - - void next_free_tag() noexcept - { - int next_free = -1; - { - std::unique_lock l(next_free_tag_mtx_, std::try_to_lock); - if (l.owns_lock()) - { - next_free = next_free_tag_locked(l); - } - } - - if (next_free != -1) - { - HPX_ASSERT(next_free > 1); - tag_provider_.release(next_free); - } - } - - template - int next_free_tag_locked([[maybe_unused]] Lock& lock) noexcept - { - HPX_ASSERT_OWNS_LOCK(lock); - - util::mpi_environment::scoped_try_lock l; - if (l.locked) - { - int completed = 0; - [[maybe_unused]] int const ret = MPI_Test( - &next_free_tag_request_, &completed, MPI_STATUS_IGNORE); - HPX_ASSERT(ret == MPI_SUCCESS); - - if (completed) - { - return get_next_free_tag(l); - } - } - return -1; - } - - template - int get_next_free_tag([[maybe_unused]] Lock& l) noexcept - { - HPX_ASSERT_OWNS_LOCK(l); - - int const next_free = next_free_tag_; - - [[maybe_unused]] int const ret = MPI_Irecv(&next_free_tag_, 1, - MPI_INT, MPI_ANY_SOURCE, 1, - util::mpi_environment::communicator(), &next_free_tag_request_); - HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); - - return next_free; - } - hpx::spinlock connections_mtx_; connection_list connections_; - - hpx::spinlock next_free_tag_mtx_; - MPI_Request next_free_tag_request_; - int next_free_tag_; }; } // namespace hpx::parcelset::policies::mpi diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp index 02f3f78f13c2..27fd01dda9d8 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp @@ -1,5 +1,6 @@ // Copyright (c) 2007-2021 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -103,7 +104,11 @@ namespace hpx::parcelset::policies::mpi { request_ptr_ = nullptr; chunks_idx_ = 0; tag_ = acquire_tag(sender_); - header_.init(buffer_, tag_); + header_buffer.resize(header::get_header_size( + buffer_, pp_->get_zero_copy_serialization_threshold())); + header_ = header(buffer_, static_cast(header_buffer.data()), + header_buffer.size()); + header_.set_tag(tag_); header_.assert_valid(); state_ = initialized; @@ -156,8 +161,8 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(state_ == initialized); HPX_ASSERT(request_ptr_ == nullptr); - [[maybe_unused]] int const ret = MPI_Isend(header_.data(), - header::data_size_, MPI_BYTE, dst_, 0, + [[maybe_unused]] int const ret = MPI_Isend(header_buffer.data(), + (int) header_buffer.size(), MPI_BYTE, dst_, 0, util::mpi_environment::communicator(), &request_); HPX_ASSERT_LOCKED(l, ret == MPI_SUCCESS); @@ -180,7 +185,7 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(request_ptr_ == nullptr); auto const& chunks = buffer_.transmission_chunks_; - if (!chunks.empty()) + if (!chunks.empty() && !header_.piggy_back_tchunk()) { util::mpi_environment::scoped_lock l; @@ -206,7 +211,7 @@ namespace hpx::parcelset::policies::mpi { return false; } - if (!header_.piggy_back()) + if (!header_.piggy_back_data()) { util::mpi_environment::scoped_lock l; @@ -312,6 +317,7 @@ namespace hpx::parcelset::policies::mpi { handler_type handler_; post_handler_type postprocess_handler_; + std::vector header_buffer; header header_; MPI_Request request_; diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp index 4d0ecede5ff2..dcb83334194a 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/tag_provider.hpp @@ -1,4 +1,5 @@ // Copyright (c) 2014-2015 Thomas Heller +// Copyright (c) 2023 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -12,6 +13,7 @@ #include #include +#include #include #include #include @@ -21,40 +23,20 @@ namespace hpx::parcelset::policies::mpi { struct tag_provider { tag_provider() - : next_tag_(2) + : next_tag(0) { } - [[nodiscard]] int acquire() noexcept + [[nodiscard]] int get_next_tag() noexcept { - int tag = -1; - std::lock_guard l(mtx_); - if (free_tags_.empty()) - { - HPX_ASSERT(next_tag_ < (std::numeric_limits::max)()); - tag = next_tag_++; - } - else - { - tag = free_tags_.front(); - free_tags_.pop_front(); - } - HPX_ASSERT(tag > 1); + // Tag 0 is reserved for header message + int tag = next_tag.fetch_add(1, std::memory_order_relaxed) % + (util::mpi_environment::MPI_MAX_TAG - 1) + + 1; return tag; } - void release(int tag) - { - HPX_ASSERT(tag > 1); - std::lock_guard l(mtx_); - HPX_ASSERT(tag < next_tag_); - - free_tags_.push_back(tag); - } - - hpx::spinlock mtx_; - int next_tag_; - std::deque free_tags_; + std::atomic next_tag; }; } // namespace hpx::parcelset::policies::mpi