From 5614211cb2f218566d72b426d72e3fea0867be1f Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 16 May 2024 00:25:41 +0200 Subject: [PATCH 01/17] feat: add `Universe` class to handle KokkosComm initialization/finalization --- src/KokkosComm.hpp | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 84ab90ea..19504127 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -28,6 +28,42 @@ namespace KokkosComm { +class Universe { + public: + Universe(int &argc, char *argv[]) : Universe(argc, argv, 0) {} + + Universe(int &argc, char *argv[], size_t buf_size) : _buffer(buf_size) { + int is_initialized; + MPI_Initialized(&is_initialized); + if (0 == is_initialized) { + int required = MPI_THREAD_MULTIPLE, provided; + MPI_Init_thread(&argc, &argv, required, &provided); + } + } + + ~Universe() { + detach_buffer(); + int is_finalized; + MPI_Finalized(&is_finalized); + if (0 == is_finalized) { + MPI_Finalize(); + } + } + + auto set_buffer_size(size_t size) -> void { + detach_buffer(); + if (0 <= size) { + _buffer.resize(size); + MPI_Buffer_attach(_buffer.data(), _buffer.size()); + } + } + + auto detach_buffer(void) -> void { MPI_Buffer_detach(&_buffer.data(), &_buffer.size()); } + + private: + std::vector _buffer; +}; + template Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { return Impl::isend(space, sv, dest, tag, comm); From 45c695f5a96e65df605785fad8003783b42ce0b1 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 16 May 2024 00:26:24 +0200 Subject: [PATCH 02/17] feat: add support for buffered mode send operations --- src/KokkosComm_comm_mode.hpp | 29 ++++++++++++++++------------- src/impl/KokkosComm_isend.hpp | 2 ++ src/impl/KokkosComm_send.hpp | 2 ++ 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/KokkosComm_comm_mode.hpp b/src/KokkosComm_comm_mode.hpp index 51dee1ae..ccdf91b0 100644 --- a/src/KokkosComm_comm_mode.hpp +++ b/src/KokkosComm_comm_mode.hpp @@ -21,22 +21,25 @@ namespace KokkosComm { // Scoped enumeration to specify the communication mode of a sending operation. // See section 3.4 of the MPI standard for a complete specification. enum class CommMode { - // Default mode: lets the user override the send operations behavior at - // compile-time. E.g., this can be set to mode "Synchronous" for debug - // builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE. + // Default mode: lets the user override the send operations behavior at compile-time. E.g. this can be set to mode + // "Synchronous" for debug builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE. Default, - // Standard mode: MPI implementation decides whether outgoing messages will - // be buffered. Send operations can be started whether or not a matching - // receive has been started. They may complete before a matching receive is - // started. Standard mode is non-local: successful completion of the send - // operation may depend on the occurrence of a matching receive. + // Standard mode: MPI implementation decides whether outgoing messages will be buffered. Send operations can be + // started whether or not a matching receive has been started. They may complete before a matching receive is started. + // Standard mode is non-local: successful completion of the send operation may depend on the occurrence of a matching + // receive. Standard, - // Ready mode: Send operations may be started only if the matching receive is - // already started. + // Buffered mode: Send operation can be started whether or not a matching receive has been started. It may complete + // before a matching receive is started. However, unlike the standard send, this operation is local, and its + // completion does not depend on the occurrence of a matching receive. Thus, if a send is executed and no matching + // receive is started, then MPI must buffer the outgoing message, so as to allow the send call to complete. An error + // will occur if there is insufficient buffer space. The amount of available buffer space is controlled by the user + // (see Section 3.6). Buffer allocation by the user may be required for the buffered mode to be effective. + Buffered, + // Ready mode: Send operations may be started only if the matching receive is already started. Ready, - // Synchronous mode: Send operations complete successfully only if a matching - // receive is started, and the receive operation has started to receive the - // message sent. + // Synchronous mode: Send operations complete successfully only if a matching receive is started, and the receive + // operation has started to receive the message sent. Synchronous, }; diff --git a/src/impl/KokkosComm_isend.hpp b/src/impl/KokkosComm_isend.hpp index 1e2a8ba4..8d819193 100644 --- a/src/impl/KokkosComm_isend.hpp +++ b/src/impl/KokkosComm_isend.hpp @@ -44,6 +44,8 @@ KokkosComm::Req isend(const ExecSpace &space, const SendView &sv, int dest, int MPI_Comm mpi_comm, MPI_Request *mpi_req) { if constexpr (SendMode == CommMode::Standard) { MPI_Isend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); + } else if constexpr (SendMode == CommMode::Buffered) { + MPI_Ibsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); } else if constexpr (SendMode == CommMode::Ready) { MPI_Irsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); } else if constexpr (SendMode == CommMode::Synchronous) { diff --git a/src/impl/KokkosComm_send.hpp b/src/impl/KokkosComm_send.hpp index 46d209d8..806eb199 100644 --- a/src/impl/KokkosComm_send.hpp +++ b/src/impl/KokkosComm_send.hpp @@ -36,6 +36,8 @@ void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Com MPI_Comm mpi_comm) { if constexpr (SendMode == CommMode::Standard) { MPI_Send(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); + } else if constexpr (SendMode == CommMode::Buffered) { + MPI_Bsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); } else if constexpr (SendMode == CommMode::Ready) { MPI_Rsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); } else if constexpr (SendMode == CommMode::Synchronous) { From bc3128937cd6523e117f8d8ce13417b4bcdbe9f1 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 16 May 2024 13:20:09 +0200 Subject: [PATCH 03/17] fix: correctly detach buffer and fix size check --- src/KokkosComm.hpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 19504127..2ba1c93e 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -23,6 +23,7 @@ #include "KokkosComm_send.hpp" #include "KokkosComm_concepts.hpp" #include "KokkosComm_comm_mode.hpp" +#include "KokkosComm_communicator.hpp" #include @@ -52,13 +53,17 @@ class Universe { auto set_buffer_size(size_t size) -> void { detach_buffer(); - if (0 <= size) { + if (0 < size) { _buffer.resize(size); MPI_Buffer_attach(_buffer.data(), _buffer.size()); } } - auto detach_buffer(void) -> void { MPI_Buffer_detach(&_buffer.data(), &_buffer.size()); } + auto detach_buffer(void) -> void { + int size; + MPI_Buffer_detach(_buffer.data(), &size); + assert(static_cast(size) == _buffer.size()); // safety check + } private: std::vector _buffer; From 4852d9cf4b54eb6708cee7bda50e968e32ec2079 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 16 May 2024 13:20:28 +0200 Subject: [PATCH 04/17] feat: first draft of `Communicator` class --- src/KokkosComm_communicator.hpp | 125 ++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 src/KokkosComm_communicator.hpp diff --git a/src/KokkosComm_communicator.hpp b/src/KokkosComm_communicator.hpp new file mode 100644 index 00000000..ad4e7d65 --- /dev/null +++ b/src/KokkosComm_communicator.hpp @@ -0,0 +1,125 @@ +//@HEADER +// ************************************************************************ +// +// Kokkos v. 4.0 +// Copyright (2022) National Technology & Engineering +// Solutions of Sandia, LLC (NTESS). +// +// Under the terms of Contract DE-NA0003525 with NTESS, +// the U.S. Government retains certain rights in this software. +// +// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions. +// See https://kokkos.org/LICENSE for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//@HEADER + +#pragma once + +#include "KokkosComm_concepts.hpp" + +#include +#include + +namespace KokkosComm { + +using Color = int; +using Key = int; + +template +class Communicator { + public: + ~Communicator() { + switch (_comm_kind) { + case CommunicatorKind::User: MPI_Comm_free(&_comm); break; + // case CommunicatorKind::Inter: MPI_Comm_disconnect(&_comm); break; + default: break; + } + } + + static auto from_raw(MPI_Comm raw) -> Communicator { + assert(MPI_COMM_NULL != raw); + + CommunicatorKind comm_kind; + if (MPI_COMM_SELF == raw) { + comm_kind = CommunicatorKind::Self; + } else if (MPI_COMM_WORLD == raw) { + comm_kind = CommunicatorKind::World; + } else { + int flag; + MPI_Comm_test_inter(raw, &flag); + if (0 != flag) { + fprintf(stderr, "[KokkosComm] error: intercommunicators are not supported (yet).\n"); + std::terminate(); + // MPI_Comm parent_comm = MPI_COMM_NULL; + // MPI_Comm_get_parent(&parent_comm); + // if (raw == parent_comm) { + // comm_kind = CommunicatorKind::Parent; + // } else { + // comm_kind = CommunicatorKind::Inter; + // } + } else { + comm_kind = CommunicatorKind::User; + } + } + + return Communicator(raw, comm_kind); + } + + inline static auto from_raw_unchecked(MPI_Comm comm) -> Communicator { + return Communicator(comm, CommunicatorKind::User); + } + + static auto dup_raw(MPI_Comm raw) -> Communicator { + MPI_Comm new_comm; + MPI_Comm_dup(raw, &new_comm); + return Communicator(new_comm, CommunicatorKind::User); + } + + static auto dup(const Communicator &other) -> Communicator { return Communicator::dup_raw(other.as_raw()); } + + static auto split_raw(MPI_Comm raw, Color color, Key key) -> Communicator { + MPI_Comm new_comm; + MPI_Comm_split(raw, color, key, &new_comm); + return Communicator(new_comm, CommunicatorKind::User); + } + + static auto split(const Communicator &other, Color color, Key key) -> Communicator { + return Communicator::split_raw(other.as_raw(), color, key); + } + + inline auto as_raw() const -> MPI_Comm { return _comm; } + + inline static auto self(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_SELF); } + + inline static auto world(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_WORLD); } + + inline auto size(void) const -> int { + int size; + MPI_Comm_size(_comm, &size); + return size; + } + + inline auto rank(void) const -> int { + int rank; + MPI_Comm_rank(_comm, &rank); + return rank; + } + + private: + enum class CommunicatorKind { + Self, // MPI_COMM_SELF + World, // MPI_COMM_WORLD + User, // User-defined communicator + // Parent, + // Inter, + }; + + Communicator(MPI_Comm comm, CommunicatorKind comm_kind) : _comm(comm), _comm_kind(comm_kind) {} + + MPI_Comm _comm; + CommunicatorKind _comm_kind; + ExecSpace _exec_space; +}; + +} // namespace KokkosComm From e37173bc04ff0225cd814d7e292c67785c935165 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 16 May 2024 16:44:19 +0200 Subject: [PATCH 05/17] feat(session): implement a first draft of session-based init --- src/KokkosComm.hpp | 52 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 2ba1c93e..1646123b 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include "KokkosComm_collective.hpp" #include "KokkosComm_version.hpp" #include "KokkosComm_isend.hpp" @@ -28,22 +29,15 @@ #include namespace KokkosComm { +auto initialize(int &argc, char *argv[]) -> Universe {} class Universe { public: - Universe(int &argc, char *argv[]) : Universe(argc, argv, 0) {} - - Universe(int &argc, char *argv[], size_t buf_size) : _buffer(buf_size) { - int is_initialized; - MPI_Initialized(&is_initialized); - if (0 == is_initialized) { - int required = MPI_THREAD_MULTIPLE, provided; - MPI_Init_thread(&argc, &argv, required, &provided); - } - } - ~Universe() { detach_buffer(); + + MPI_Session_finalize(&_shandle); + int is_finalized; MPI_Finalized(&is_finalized); if (0 == is_finalized) { @@ -62,10 +56,44 @@ class Universe { auto detach_buffer(void) -> void { int size; MPI_Buffer_detach(_buffer.data(), &size); - assert(static_cast(size) == _buffer.size()); // safety check + assert(static_cast(size) == _buffer.size()); // safety check } private: + enum class Threading { + Single = MPI_THREAD_SINGLE, + Funneled = MPI_THREAD_FUNNELED, + Serialized = MPI_THREAD_SERIALIZED, + Multiple = MPI_THREAD_MULTIPLE, + }; + + Universe(int &argc, char *argv[]) : Universe(argc, argv, 0) {} + + Universe(int &argc, char *argv[], size_t buf_size) : _buffer(buf_size) { + int is_initialized, provided; + + MPI_Initialized(&is_initialized); + if (0 == is_initialized) { + int required = MPI_THREAD_MULTIPLE; + MPI_Init_thread(&argc, &argv, required, &provided); + } else { + MPI_Query_thread(&provided); + } + _threading = static_cast(provided); + + const char comm_key[] = "mpi_communication_pattern"; + const char comm_val[] = "MPI_CPU_TO_GPU"; + MPI_Info lib_info = MPI_INFO_NULL; + MPI_Info_create(&lib_info); + MPI_Info_set(lib_info, comm_key, comm_val); + + MPI_Session lib_handle = MPI_SESSION_NULL; + MPI_Session_init(lib_info, MPI_ERRORS_RETURN, &lib_handle); + _shandle = lib_handle; + } + + MPI_Session _shandle; + Threading _threading; std::vector _buffer; }; From 4bc55e2cb1b57a8147f88dbf3c9180710787e840 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Sat, 18 May 2024 21:19:11 +0200 Subject: [PATCH 06/17] feat: initialization and finalization routines for KokkosComm --- src/KokkosComm.hpp | 129 ++++++++++++++++++++++++++++++--------------- 1 file changed, 86 insertions(+), 43 deletions(-) diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 1646123b..170e1a40 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -29,74 +29,117 @@ #include namespace KokkosComm { -auto initialize(int &argc, char *argv[]) -> Universe {} +template class Universe { public: - ~Universe() { + static auto create(MPI_Session shandle, MPI_Comm comm) -> Universe { return Universe(shandle, comm); } + + auto free(void) -> void { + // Detach the buffer from MPI. It's ok to destroy at scope exit as it won't have anything to do with MPI anymore. detach_buffer(); - MPI_Session_finalize(&_shandle); + // Eagerly destruct the communicator: we cannot rely on the universe going out of scope as `MPI_Finalize` will be + // called earlier than that. + MPI_Comm comm = _comm.as_raw(); + MPI_Comm_free(&comm); - int is_finalized; - MPI_Finalized(&is_finalized); - if (0 == is_finalized) { - MPI_Finalize(); - } + MPI_Session_finalize(&_shandle); } auto set_buffer_size(size_t size) -> void { detach_buffer(); if (0 < size) { - _buffer.resize(size); + _buffer.resize(size + MPI_BSEND_OVERHEAD); MPI_Buffer_attach(_buffer.data(), _buffer.size()); + _is_buffer_attached = true; } } auto detach_buffer(void) -> void { - int size; - MPI_Buffer_detach(_buffer.data(), &size); - assert(static_cast(size) == _buffer.size()); // safety check + if (_is_buffer_attached) { + int size; + MPI_Buffer_detach(_buffer.data(), &size); + assert(static_cast(size) == _buffer.size()); // safety check + _is_buffer_attached = false; + } } - private: - enum class Threading { - Single = MPI_THREAD_SINGLE, - Funneled = MPI_THREAD_FUNNELED, - Serialized = MPI_THREAD_SERIALIZED, - Multiple = MPI_THREAD_MULTIPLE, - }; - - Universe(int &argc, char *argv[]) : Universe(argc, argv, 0) {} - - Universe(int &argc, char *argv[], size_t buf_size) : _buffer(buf_size) { - int is_initialized, provided; - - MPI_Initialized(&is_initialized); - if (0 == is_initialized) { - int required = MPI_THREAD_MULTIPLE; - MPI_Init_thread(&argc, &argv, required, &provided); - } else { - MPI_Query_thread(&provided); - } - _threading = static_cast(provided); + auto comm(void) -> Communicator { return _comm; } - const char comm_key[] = "mpi_communication_pattern"; - const char comm_val[] = "MPI_CPU_TO_GPU"; - MPI_Info lib_info = MPI_INFO_NULL; - MPI_Info_create(&lib_info); - MPI_Info_set(lib_info, comm_key, comm_val); + private: + Universe(MPI_Session shandle, MPI_Comm comm) : Universe(shandle, comm, 0) {} - MPI_Session lib_handle = MPI_SESSION_NULL; - MPI_Session_init(lib_info, MPI_ERRORS_RETURN, &lib_handle); - _shandle = lib_handle; - } + Universe(MPI_Session shandle, MPI_Comm comm, size_t buf_size) + : _shandle(shandle), + _comm(Communicator::from_raw_unchecked(comm)), + _buffer(buf_size), + _is_buffer_attached(false) {} MPI_Session _shandle; - Threading _threading; + Communicator _comm; std::vector _buffer; + bool _is_buffer_attached; }; +template +auto initialize(void) -> Universe { + MPI_Info kokkoscomm_info = MPI_INFO_NULL; + MPI_Info_create(&kokkoscomm_info); + + // Set threading level for our session + constexpr char thrd_lvl_key[] = "thread_level"; + constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE"; + MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val); + // TODO: error handling + +#ifdef KOKKOSCOMM_CUDA_AWARE_MPI + // Disable CUDA pointer attribute checks from MPI + constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern"; + constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU"; + MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val); + // TODO: error handling +#endif + + MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; + MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); + // TODO: error handling + + MPI_Group kokkoscomm_group = MPI_GROUP_NULL; + constexpr char pset_name[] = "mpi://WORLD"; + MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group); + // TODO: error handling + + MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; + MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN, + &kokkoscomm_comm); + // TODO: error handling + + // Resource release + MPI_Group_free(&kokkoscomm_group); + MPI_Info_free(&kokkoscomm_info); + + return Universe::create(kokkoscomm_shandle, kokkoscomm_comm); +} + +template +auto initialize(int &argc, char *argv[]) -> Universe { + // Check that MPI was initiliazed and init if it wasn't + int is_initialized; + MPI_Initialized(&is_initialized); + if (0 == is_initialized) { + int required = MPI_THREAD_MULTIPLE, provided; + MPI_Init_thread(&argc, &argv, required, &provided); + } + + return initialize(); +} + +template +auto finalize(Universe &universe) -> void { + universe.free(); +} + template Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { return Impl::isend(space, sv, dest, tag, comm); From afbb017e0bc5d23505d365af4bcf28f81823e597 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Sat, 18 May 2024 21:19:54 +0200 Subject: [PATCH 07/17] test: `send_recv` with KokkosComm init and finalize --- unit_tests/test_sendrecv.cpp | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/unit_tests/test_sendrecv.cpp b/unit_tests/test_sendrecv.cpp index 6c4c4ced..885e07bb 100644 --- a/unit_tests/test_sendrecv.cpp +++ b/unit_tests/test_sendrecv.cpp @@ -15,6 +15,7 @@ //@HEADER #include +#include #include "KokkosComm.hpp" @@ -37,9 +38,11 @@ void send_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -48,15 +51,17 @@ void send_comm_mode_1d_contig() { int dst = 1; Kokkos::parallel_for( a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm.as_raw()); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm.as_raw()); int errs; Kokkos::parallel_reduce( a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); ASSERT_EQ(errs, 0); } + + KokkosComm::finalize(universe); } template From dcc4584db57bda5773c22ca7a502866c61a98567 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 16:17:08 +0200 Subject: [PATCH 08/17] feat(sessions): move init & finalize w/ `MPI_Session` to its own file --- src/KokkosComm.hpp | 125 ++------------------------- src/impl/KokkosComm_MPI_instance.hpp | 99 +++++++++++++++++++++ 2 files changed, 107 insertions(+), 117 deletions(-) create mode 100644 src/impl/KokkosComm_MPI_instance.hpp diff --git a/src/KokkosComm.hpp b/src/KokkosComm.hpp index 170e1a40..a4e24d75 100644 --- a/src/KokkosComm.hpp +++ b/src/KokkosComm.hpp @@ -18,141 +18,32 @@ #include #include "KokkosComm_collective.hpp" +#include "KokkosComm_communicator.hpp" #include "KokkosComm_version.hpp" #include "KokkosComm_isend.hpp" #include "KokkosComm_recv.hpp" #include "KokkosComm_send.hpp" #include "KokkosComm_concepts.hpp" #include "KokkosComm_comm_mode.hpp" -#include "KokkosComm_communicator.hpp" +#include "KokkosComm_MPI_instance.hpp" #include namespace KokkosComm { -template -class Universe { - public: - static auto create(MPI_Session shandle, MPI_Comm comm) -> Universe { return Universe(shandle, comm); } - - auto free(void) -> void { - // Detach the buffer from MPI. It's ok to destroy at scope exit as it won't have anything to do with MPI anymore. - detach_buffer(); - - // Eagerly destruct the communicator: we cannot rely on the universe going out of scope as `MPI_Finalize` will be - // called earlier than that. - MPI_Comm comm = _comm.as_raw(); - MPI_Comm_free(&comm); - - MPI_Session_finalize(&_shandle); - } - - auto set_buffer_size(size_t size) -> void { - detach_buffer(); - if (0 < size) { - _buffer.resize(size + MPI_BSEND_OVERHEAD); - MPI_Buffer_attach(_buffer.data(), _buffer.size()); - _is_buffer_attached = true; - } - } - - auto detach_buffer(void) -> void { - if (_is_buffer_attached) { - int size; - MPI_Buffer_detach(_buffer.data(), &size); - assert(static_cast(size) == _buffer.size()); // safety check - _is_buffer_attached = false; - } - } - - auto comm(void) -> Communicator { return _comm; } - - private: - Universe(MPI_Session shandle, MPI_Comm comm) : Universe(shandle, comm, 0) {} - - Universe(MPI_Session shandle, MPI_Comm comm, size_t buf_size) - : _shandle(shandle), - _comm(Communicator::from_raw_unchecked(comm)), - _buffer(buf_size), - _is_buffer_attached(false) {} - - MPI_Session _shandle; - Communicator _comm; - std::vector _buffer; - bool _is_buffer_attached; -}; - -template -auto initialize(void) -> Universe { - MPI_Info kokkoscomm_info = MPI_INFO_NULL; - MPI_Info_create(&kokkoscomm_info); - - // Set threading level for our session - constexpr char thrd_lvl_key[] = "thread_level"; - constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE"; - MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val); - // TODO: error handling - -#ifdef KOKKOSCOMM_CUDA_AWARE_MPI - // Disable CUDA pointer attribute checks from MPI - constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern"; - constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU"; - MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val); - // TODO: error handling -#endif - - MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; - MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); - // TODO: error handling - - MPI_Group kokkoscomm_group = MPI_GROUP_NULL; - constexpr char pset_name[] = "mpi://WORLD"; - MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group); - // TODO: error handling - - MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; - MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN, - &kokkoscomm_comm); - // TODO: error handling - - // Resource release - MPI_Group_free(&kokkoscomm_group); - MPI_Info_free(&kokkoscomm_info); - - return Universe::create(kokkoscomm_shandle, kokkoscomm_comm); -} - -template -auto initialize(int &argc, char *argv[]) -> Universe { - // Check that MPI was initiliazed and init if it wasn't - int is_initialized; - MPI_Initialized(&is_initialized); - if (0 == is_initialized) { - int required = MPI_THREAD_MULTIPLE, provided; - MPI_Init_thread(&argc, &argv, required, &provided); - } - - return initialize(); -} - -template -auto finalize(Universe &universe) -> void { - universe.free(); -} - template -Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { - return Impl::isend(space, sv, dest, tag, comm); +Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator &comm) { + return Impl::isend(space, sv, dest, tag, comm.as_raw()); } template -void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) { - return Impl::send(space, sv, dest, tag, comm); +void send(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator &comm) { + return Impl::send(space, sv, dest, tag, comm.as_raw()); } template -void recv(const ExecSpace &space, RecvView &sv, int src, int tag, MPI_Comm comm) { - return Impl::recv(space, sv, src, tag, comm); +void recv(const ExecSpace &space, RecvView &sv, int src, int tag, const Communicator &comm) { + return Impl::recv(space, sv, src, tag, comm.as_raw()); } } // namespace KokkosComm diff --git a/src/impl/KokkosComm_MPI_instance.hpp b/src/impl/KokkosComm_MPI_instance.hpp new file mode 100644 index 00000000..ccccad9d --- /dev/null +++ b/src/impl/KokkosComm_MPI_instance.hpp @@ -0,0 +1,99 @@ +//@HEADER +// ************************************************************************ +// +// Kokkos v. 4.0 +// Copyright (2022) National Technology & Engineering +// Solutions of Sandia, LLC (NTESS). +// +// Under the terms of Contract DE-NA0003525 with NTESS, +// the U.S. Government retains certain rights in this software. +// +// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions. +// See https://kokkos.org/LICENSE for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//@HEADER + +#pragma once + +#include "KokkosComm_communicator.hpp" +#include "KokkosComm_concepts.hpp" + +#include +#include + +namespace KokkosComm { + +template +class Universe { + private: + Communicator _comm; + MPI_Session _shandle; + + public: + Universe(MPI_Session shandle, MPI_Comm comm) + : _comm(Communicator::from_raw_unchecked(comm)), _shandle(shandle) {} + + ~Universe() { + // FIXME: find out how to properly finalize the session + // MPI_Session_finalize(&_shandle); + } + + auto comm(void) -> Communicator { return _comm; } +}; + +template +auto initialize(void) -> Universe { + MPI_Info kokkoscomm_info = MPI_INFO_NULL; + MPI_Info_create(&kokkoscomm_info); + // TODO: error handling + + // Set threading level for our session + constexpr char thrd_lvl_key[] = "thread_level"; + constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE"; + MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val); + // TODO: error handling + +#ifdef KOKKOSCOMM_CUDA_AWARE_MPI + // Disable CUDA pointer attribute checks from MPI + constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern"; + constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU"; + MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val); + // TODO: error handling +#endif + + MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; + MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); + // TODO: error handling + + MPI_Group kokkoscomm_group = MPI_GROUP_NULL; + constexpr char pset_name[] = "mpi://WORLD"; + MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group); + // TODO: error handling + + MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; + MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN, + &kokkoscomm_comm); + // TODO: error handling + + // Resource release + MPI_Group_free(&kokkoscomm_group); + MPI_Info_free(&kokkoscomm_info); + + return Universe(kokkoscomm_shandle, kokkoscomm_comm); +} + +template +auto initialize(int &argc, char *argv[]) -> Universe { + // Check that MPI was initiliazed and init if it wasn't + int is_initialized; + MPI_Initialized(&is_initialized); + if (0 == is_initialized) { + int required = MPI_THREAD_MULTIPLE, provided; + MPI_Init_thread(&argc, &argv, required, &provided); + } + + return initialize(); +} + +} // namespace KokkosComm From de176b6d955e110d490def2898eb50cd7d8fbf80 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 16:20:51 +0200 Subject: [PATCH 09/17] revert: removed Buffered communication mode --- src/KokkosComm_comm_mode.hpp | 7 ------- src/impl/KokkosComm_isend.hpp | 3 +-- src/impl/KokkosComm_send.hpp | 2 -- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/KokkosComm_comm_mode.hpp b/src/KokkosComm_comm_mode.hpp index ccdf91b0..52a21c5a 100644 --- a/src/KokkosComm_comm_mode.hpp +++ b/src/KokkosComm_comm_mode.hpp @@ -29,13 +29,6 @@ enum class CommMode { // Standard mode is non-local: successful completion of the send operation may depend on the occurrence of a matching // receive. Standard, - // Buffered mode: Send operation can be started whether or not a matching receive has been started. It may complete - // before a matching receive is started. However, unlike the standard send, this operation is local, and its - // completion does not depend on the occurrence of a matching receive. Thus, if a send is executed and no matching - // receive is started, then MPI must buffer the outgoing message, so as to allow the send call to complete. An error - // will occur if there is insufficient buffer space. The amount of available buffer space is controlled by the user - // (see Section 3.6). Buffer allocation by the user may be required for the buffered mode to be effective. - Buffered, // Ready mode: Send operations may be started only if the matching receive is already started. Ready, // Synchronous mode: Send operations complete successfully only if a matching receive is started, and the receive diff --git a/src/impl/KokkosComm_isend.hpp b/src/impl/KokkosComm_isend.hpp index 8d819193..aaf34450 100644 --- a/src/impl/KokkosComm_isend.hpp +++ b/src/impl/KokkosComm_isend.hpp @@ -25,6 +25,7 @@ #include "KokkosComm_request.hpp" #include "KokkosComm_traits.hpp" #include "KokkosComm_comm_mode.hpp" +#include "KokkosComm_communicator.hpp" // impl #include "KokkosComm_include_mpi.hpp" @@ -44,8 +45,6 @@ KokkosComm::Req isend(const ExecSpace &space, const SendView &sv, int dest, int MPI_Comm mpi_comm, MPI_Request *mpi_req) { if constexpr (SendMode == CommMode::Standard) { MPI_Isend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); - } else if constexpr (SendMode == CommMode::Buffered) { - MPI_Ibsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); } else if constexpr (SendMode == CommMode::Ready) { MPI_Irsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm, mpi_req); } else if constexpr (SendMode == CommMode::Synchronous) { diff --git a/src/impl/KokkosComm_send.hpp b/src/impl/KokkosComm_send.hpp index 3f957b40..09186305 100644 --- a/src/impl/KokkosComm_send.hpp +++ b/src/impl/KokkosComm_send.hpp @@ -50,8 +50,6 @@ void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Com MPI_Comm mpi_comm) { if constexpr (SendMode == CommMode::Standard) { MPI_Send(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); - } else if constexpr (SendMode == CommMode::Buffered) { - MPI_Bsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); } else if constexpr (SendMode == CommMode::Ready) { MPI_Rsend(mpi_view, mpi_count, mpi_datatype, mpi_dest, mpi_tag, mpi_comm); } else if constexpr (SendMode == CommMode::Synchronous) { From bb58138982f6543b89bb3fbd42098cde730ba76c Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 16:21:16 +0200 Subject: [PATCH 10/17] feat(sessions): removed dead code in `Communicator` class --- src/KokkosComm_communicator.hpp | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/KokkosComm_communicator.hpp b/src/KokkosComm_communicator.hpp index ad4e7d65..238c2973 100644 --- a/src/KokkosComm_communicator.hpp +++ b/src/KokkosComm_communicator.hpp @@ -31,8 +31,8 @@ class Communicator { public: ~Communicator() { switch (_comm_kind) { - case CommunicatorKind::User: MPI_Comm_free(&_comm); break; - // case CommunicatorKind::Inter: MPI_Comm_disconnect(&_comm); break; + // FIXME: find out how to properly free a session-associated communicator + // case CommunicatorKind::User: MPI_Comm_free(&_comm); break; default: break; } } @@ -48,18 +48,11 @@ class Communicator { } else { int flag; MPI_Comm_test_inter(raw, &flag); - if (0 != flag) { + if (0 == flag) { + comm_kind = CommunicatorKind::User; + } else { fprintf(stderr, "[KokkosComm] error: intercommunicators are not supported (yet).\n"); std::terminate(); - // MPI_Comm parent_comm = MPI_COMM_NULL; - // MPI_Comm_get_parent(&parent_comm); - // if (raw == parent_comm) { - // comm_kind = CommunicatorKind::Parent; - // } else { - // comm_kind = CommunicatorKind::Inter; - // } - } else { - comm_kind = CommunicatorKind::User; } } @@ -111,8 +104,6 @@ class Communicator { Self, // MPI_COMM_SELF World, // MPI_COMM_WORLD User, // User-defined communicator - // Parent, - // Inter, }; Communicator(MPI_Comm comm, CommunicatorKind comm_kind) : _comm(comm), _comm_kind(comm_kind) {} From c2812bdba1b528bcfe7a4f3bd9f8b44cf02dd48f Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 16:21:46 +0200 Subject: [PATCH 11/17] test(sessions): update tests with new `init`/`finalize` API --- perf_tests/test_2dhalo.cpp | 21 ++++++++++---------- perf_tests/test_sendrecv.cpp | 15 +++++++++------ perf_tests/test_utils.hpp | 8 +++++--- unit_tests/test_isendirecv.cpp | 35 +++++++++++++++++----------------- unit_tests/test_isendrecv.cpp | 33 ++++++++++++++++---------------- unit_tests/test_sendrecv.cpp | 29 ++++++++++++---------------- 6 files changed, 71 insertions(+), 70 deletions(-) diff --git a/perf_tests/test_2dhalo.cpp b/perf_tests/test_2dhalo.cpp index 8b34381b..e8761e29 100644 --- a/perf_tests/test_2dhalo.cpp +++ b/perf_tests/test_2dhalo.cpp @@ -18,13 +18,14 @@ #include "KokkosComm.hpp" +#include #include -void noop(benchmark::State, MPI_Comm) {} +void noop(benchmark::State, const KokkosComm::Communicator &comm) {} template -void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int nx, int ny, int rx, int ry, int rs, - const View &v) { +void send_recv(benchmark::State &, const KokkosComm::Communicator &comm, + const Space &space, int nx, int ny, int rx, int ry, int rs, const View &v) { // 2D index of nbrs in minus and plus direction (periodic) const int xm1 = (rx + rs - 1) % rs; const int ym1 = (ry + rs - 1) % rs; @@ -73,10 +74,11 @@ void benchmark_2dhalo(benchmark::State &state) { int ny = 512; int nprops = 3; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + int rank = comm.rank(); + int size = comm.size(); const int rs = std::sqrt(size); const int rx = rank % rs; const int ry = rank / rs; @@ -86,12 +88,11 @@ void benchmark_2dhalo(benchmark::State &state) { // grid of elements, each with 3 properties, and a radius-1 halo grid_type grid("", nx + 2, ny + 2, nprops); while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, send_recv, space, nx, ny, rx, ry, - rs, grid); + do_iteration(state, comm, send_recv, space, nx, ny, rx, ry, rs, grid); } } else { while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, noop); // do nothing... + do_iteration(state, comm, noop); // do nothing... } } @@ -113,4 +114,4 @@ void benchmark_2dhalo(benchmark::State &state) { // clang-format on } -BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond); \ No newline at end of file +BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond); diff --git a/perf_tests/test_sendrecv.cpp b/perf_tests/test_sendrecv.cpp index c8de8e00..63defd6b 100644 --- a/perf_tests/test_sendrecv.cpp +++ b/perf_tests/test_sendrecv.cpp @@ -19,7 +19,8 @@ #include "KokkosComm.hpp" template -void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, const View &v) { +void send_recv(benchmark::State &, const KokkosComm::Communicator &comm, const Space &space, int rank, + const View &v) { if (0 == rank) { KokkosComm::send(space, v, 1, 0, comm); KokkosComm::recv(space, v, 1, 0, comm); @@ -30,9 +31,11 @@ void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, } void benchmark_sendrecv(benchmark::State &state) { - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { state.SkipWithError("benchmark_sendrecv needs at least 2 ranks"); } @@ -44,10 +47,10 @@ void benchmark_sendrecv(benchmark::State &state) { view_type a("", 1000000); while (state.KeepRunning()) { - do_iteration(state, MPI_COMM_WORLD, send_recv, space, rank, a); + do_iteration(state, comm, send_recv, space, rank, a); } state.SetBytesProcessed(sizeof(Scalar) * state.iterations() * a.size() * 2); } -BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond); \ No newline at end of file +BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond); diff --git a/perf_tests/test_utils.hpp b/perf_tests/test_utils.hpp index 00e1061c..f26e1eca 100644 --- a/perf_tests/test_utils.hpp +++ b/perf_tests/test_utils.hpp @@ -16,15 +16,17 @@ #pragma once +#include #include #include +#include "KokkosComm_communicator.hpp" #include "KokkosComm_include_mpi.hpp" // F is a function that takes (state, MPI_Comm, args...) template -void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args) { +void do_iteration(benchmark::State &state, KokkosComm::Communicator& comm, F &&func, Args... args) { using Clock = std::chrono::steady_clock; using Duration = std::chrono::duration; @@ -34,6 +36,6 @@ void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args double max_elapsed_second; double elapsed_seconds = elapsed.count(); - MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm); + MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm.as_raw()); state.SetIterationTime(max_elapsed_second); -} \ No newline at end of file +} diff --git a/unit_tests/test_isendirecv.cpp b/unit_tests/test_isendirecv.cpp index 1a351e85..68735f6a 100644 --- a/unit_tests/test_isendirecv.cpp +++ b/unit_tests/test_isendirecv.cpp @@ -38,27 +38,27 @@ void test_1d(const View1D &a) { static_assert(View1D::rank == 1, ""); using Scalar = typename View1D::non_const_value_type; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; MPI_Request req; - KokkosComm::Impl::irecv(a, src, 0, MPI_COMM_WORLD, req); + KokkosComm::Impl::irecv(a, src, 0, comm.as_raw(), req); MPI_Wait(&req, MPI_STATUS_IGNORE); int errs; - Kokkos::parallel_reduce( - a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } @@ -68,9 +68,11 @@ void test_2d(const View2D &a) { static_assert(View2D::rank == 2, ""); using Scalar = typename View2D::non_const_value_type; - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } @@ -80,14 +82,13 @@ void test_2d(const View2D &a) { if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - policy, KOKKOS_LAMBDA(int i, int j) { a(i, j) = i * a.extent(0) + j; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + Kokkos::parallel_for(policy, KOKKOS_LAMBDA(int i, int j) { a(i, j) = i * a.extent(0) + j; }); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; MPI_Request req; - KokkosComm::Impl::irecv(a, src, 0, MPI_COMM_WORLD, req); + KokkosComm::Impl::irecv(a, src, 0, comm.as_raw(), req); MPI_Wait(&req, MPI_STATUS_IGNORE); int errs; Kokkos::parallel_reduce( @@ -106,4 +107,4 @@ TYPED_TEST(IsendIrecv, 2D_contig) { test_2d(a); } -} // namespace \ No newline at end of file +} // namespace diff --git a/unit_tests/test_isendrecv.cpp b/unit_tests/test_isendrecv.cpp index 68bd3bf6..faa94ed7 100644 --- a/unit_tests/test_isendrecv.cpp +++ b/unit_tests/test_isendrecv.cpp @@ -38,25 +38,25 @@ void isend_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - int rank, size; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &size); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + + int rank = comm.rank(); + int size = comm.size(); if (size < 2) { GTEST_SKIP() << "Requires >= 2 ranks (" << size << " provided)"; } if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce( - a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } @@ -71,21 +71,20 @@ void isend_comm_mode_1d_noncontig() { Kokkos::View b("a", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + int rank = comm.rank(); if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce( - a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } diff --git a/unit_tests/test_sendrecv.cpp b/unit_tests/test_sendrecv.cpp index 885e07bb..79a01030 100644 --- a/unit_tests/test_sendrecv.cpp +++ b/unit_tests/test_sendrecv.cpp @@ -39,7 +39,7 @@ void send_comm_mode_1d_contig() { Kokkos::View a("a", 1000); auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto comm = universe.comm(); int rank = comm.rank(); int size = comm.size(); @@ -49,19 +49,15 @@ void send_comm_mode_1d_contig() { if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm.as_raw()); + Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm.as_raw()); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce( - a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); + Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); ASSERT_EQ(errs, 0); } - - KokkosComm::finalize(universe); } template @@ -74,20 +70,19 @@ void send_comm_mode_1d_noncontig() { Kokkos::View b("b", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); + auto universe = KokkosComm::initialize(); + auto comm = universe.comm(); + int rank = comm.rank(); if (0 == rank) { int dst = 1; - Kokkos::parallel_for( - a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); - KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, MPI_COMM_WORLD); + Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; - KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, MPI_COMM_WORLD); + KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce( - a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); + Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); ASSERT_EQ(errs, 0); } } From 4ceee5fd0818aee182819eea8d9cbd33af4490b1 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 16:29:41 +0200 Subject: [PATCH 12/17] chore: format --- perf_tests/test_utils.hpp | 3 ++- src/impl/KokkosComm_MPI_instance.hpp | 2 +- unit_tests/test_alltoall.cpp | 6 ++---- unit_tests/test_reduce.cpp | 3 +-- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/perf_tests/test_utils.hpp b/perf_tests/test_utils.hpp index f26e1eca..5ea485d2 100644 --- a/perf_tests/test_utils.hpp +++ b/perf_tests/test_utils.hpp @@ -26,7 +26,8 @@ // F is a function that takes (state, MPI_Comm, args...) template -void do_iteration(benchmark::State &state, KokkosComm::Communicator& comm, F &&func, Args... args) { +void do_iteration(benchmark::State &state, KokkosComm::Communicator &comm, F &&func, + Args... args) { using Clock = std::chrono::steady_clock; using Duration = std::chrono::duration; diff --git a/src/impl/KokkosComm_MPI_instance.hpp b/src/impl/KokkosComm_MPI_instance.hpp index ccccad9d..6ccba7a7 100644 --- a/src/impl/KokkosComm_MPI_instance.hpp +++ b/src/impl/KokkosComm_MPI_instance.hpp @@ -32,7 +32,7 @@ class Universe { public: Universe(MPI_Session shandle, MPI_Comm comm) - : _comm(Communicator::from_raw_unchecked(comm)), _shandle(shandle) {} + : _comm(Communicator::from_raw_unchecked(comm)), _shandle(shandle) {} ~Universe() { // FIXME: find out how to properly finalize the session diff --git a/unit_tests/test_alltoall.cpp b/unit_tests/test_alltoall.cpp index 61397734..3205b9ad 100644 --- a/unit_tests/test_alltoall.cpp +++ b/unit_tests/test_alltoall.cpp @@ -42,8 +42,7 @@ TYPED_TEST(Alltoall, 1D_contig) { Kokkos::View rv("rv", size * nContrib); // fill send buffer - Kokkos::parallel_for( - sv.extent(0), KOKKOS_LAMBDA(const int i) { sv(i) = rank + i; }); + Kokkos::parallel_for(sv.extent(0), KOKKOS_LAMBDA(const int i) { sv(i) = rank + i; }); KokkosComm::Impl::alltoall(Kokkos::DefaultExecutionSpace(), sv, nContrib, rv, nContrib, MPI_COMM_WORLD); @@ -71,8 +70,7 @@ TYPED_TEST(Alltoall, 1D_inplace_contig) { Kokkos::View rv("rv", size * nContrib); // fill send buffer - Kokkos::parallel_for( - rv.extent(0), KOKKOS_LAMBDA(const int i) { rv(i) = rank + i; }); + Kokkos::parallel_for(rv.extent(0), KOKKOS_LAMBDA(const int i) { rv(i) = rank + i; }); KokkosComm::Impl::alltoall(Kokkos::DefaultExecutionSpace(), rv, nContrib, MPI_COMM_WORLD); diff --git a/unit_tests/test_reduce.cpp b/unit_tests/test_reduce.cpp index f538f64a..f6dd0ed3 100644 --- a/unit_tests/test_reduce.cpp +++ b/unit_tests/test_reduce.cpp @@ -47,8 +47,7 @@ TYPED_TEST(Reduce, 1D_contig) { } // fill send buffer - Kokkos::parallel_for( - sendv.extent(0), KOKKOS_LAMBDA(const int i) { sendv(i) = rank + i; }); + Kokkos::parallel_for(sendv.extent(0), KOKKOS_LAMBDA(const int i) { sendv(i) = rank + i; }); KokkosComm::reduce(Kokkos::DefaultExecutionSpace(), sendv, recvv, MPI_SUM, 0, MPI_COMM_WORLD); From 40ba7e78bed2d0db6d2fe4514aadd6aed959c77d Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 18:37:45 +0200 Subject: [PATCH 13/17] fix(sessions): remove code related to MPI's WPM --- src/KokkosComm_communicator.hpp | 69 +++++++-------------------------- 1 file changed, 14 insertions(+), 55 deletions(-) diff --git a/src/KokkosComm_communicator.hpp b/src/KokkosComm_communicator.hpp index 238c2973..af1b6dc0 100644 --- a/src/KokkosComm_communicator.hpp +++ b/src/KokkosComm_communicator.hpp @@ -18,6 +18,7 @@ #include "KokkosComm_concepts.hpp" +#include #include #include @@ -26,67 +27,38 @@ namespace KokkosComm { using Color = int; using Key = int; -template +template class Communicator { - public: - ~Communicator() { - switch (_comm_kind) { - // FIXME: find out how to properly free a session-associated communicator - // case CommunicatorKind::User: MPI_Comm_free(&_comm); break; - default: break; - } - } - - static auto from_raw(MPI_Comm raw) -> Communicator { - assert(MPI_COMM_NULL != raw); - - CommunicatorKind comm_kind; - if (MPI_COMM_SELF == raw) { - comm_kind = CommunicatorKind::Self; - } else if (MPI_COMM_WORLD == raw) { - comm_kind = CommunicatorKind::World; - } else { - int flag; - MPI_Comm_test_inter(raw, &flag); - if (0 == flag) { - comm_kind = CommunicatorKind::User; - } else { - fprintf(stderr, "[KokkosComm] error: intercommunicators are not supported (yet).\n"); - std::terminate(); - } - } - - return Communicator(raw, comm_kind); - } + private: + MPI_Comm _comm; + ExecSpace _exec_space; - inline static auto from_raw_unchecked(MPI_Comm comm) -> Communicator { - return Communicator(comm, CommunicatorKind::User); - } + public: + Communicator(MPI_Comm comm) : _comm(comm) {} + Communicator(const Communicator& other) = delete; + Communicator(const Communicator&& other) { _comm = std::move(other._comm); } + ~Communicator() { /*MPI_Comm_free(&_comm);*/ } static auto dup_raw(MPI_Comm raw) -> Communicator { MPI_Comm new_comm; MPI_Comm_dup(raw, &new_comm); - return Communicator(new_comm, CommunicatorKind::User); + return Communicator(new_comm); } - static auto dup(const Communicator &other) -> Communicator { return Communicator::dup_raw(other.as_raw()); } + static auto dup(const Communicator& other) -> Communicator { return Communicator::dup_raw(other.as_raw()); } static auto split_raw(MPI_Comm raw, Color color, Key key) -> Communicator { MPI_Comm new_comm; MPI_Comm_split(raw, color, key, &new_comm); - return Communicator(new_comm, CommunicatorKind::User); + return Communicator(new_comm); } - static auto split(const Communicator &other, Color color, Key key) -> Communicator { + static auto split(const Communicator& other, Color color, Key key) -> Communicator { return Communicator::split_raw(other.as_raw(), color, key); } inline auto as_raw() const -> MPI_Comm { return _comm; } - inline static auto self(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_SELF); } - - inline static auto world(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_WORLD); } - inline auto size(void) const -> int { int size; MPI_Comm_size(_comm, &size); @@ -98,19 +70,6 @@ class Communicator { MPI_Comm_rank(_comm, &rank); return rank; } - - private: - enum class CommunicatorKind { - Self, // MPI_COMM_SELF - World, // MPI_COMM_WORLD - User, // User-defined communicator - }; - - Communicator(MPI_Comm comm, CommunicatorKind comm_kind) : _comm(comm), _comm_kind(comm_kind) {} - - MPI_Comm _comm; - CommunicatorKind _comm_kind; - ExecSpace _exec_space; }; } // namespace KokkosComm From 822722de567a8b29572c241953cdcd245f6cf922 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 18:38:51 +0200 Subject: [PATCH 14/17] refactor(sessions): rename as `Context`, remove `initialize` w/ WPM & cleanup --- src/impl/KokkosComm_MPI_instance.hpp | 54 ++++++++-------------------- 1 file changed, 15 insertions(+), 39 deletions(-) diff --git a/src/impl/KokkosComm_MPI_instance.hpp b/src/impl/KokkosComm_MPI_instance.hpp index 6ccba7a7..bfde3d34 100644 --- a/src/impl/KokkosComm_MPI_instance.hpp +++ b/src/impl/KokkosComm_MPI_instance.hpp @@ -25,75 +25,51 @@ namespace KokkosComm { template -class Universe { +class Context { private: - Communicator _comm; MPI_Session _shandle; + Communicator _comm; public: - Universe(MPI_Session shandle, MPI_Comm comm) - : _comm(Communicator::from_raw_unchecked(comm)), _shandle(shandle) {} + Context(MPI_Session shandle, MPI_Comm comm) : _shandle(shandle), _comm(Communicator(comm)) {} - ~Universe() { - // FIXME: find out how to properly finalize the session - // MPI_Session_finalize(&_shandle); - } + ~Context() { MPI_Session_finalize(&_shandle); } - auto comm(void) -> Communicator { return _comm; } + auto comm(void) -> const Communicator& { return _comm; } }; template -auto initialize(void) -> Universe { - MPI_Info kokkoscomm_info = MPI_INFO_NULL; +auto initialize(void) -> Context { + int rc; + + MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; + MPI_Group kokkoscomm_group = MPI_GROUP_NULL; + MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; + MPI_Info kokkoscomm_info = MPI_INFO_NULL; + MPI_Info_create(&kokkoscomm_info); - // TODO: error handling // Set threading level for our session constexpr char thrd_lvl_key[] = "thread_level"; constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE"; MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val); - // TODO: error handling #ifdef KOKKOSCOMM_CUDA_AWARE_MPI // Disable CUDA pointer attribute checks from MPI constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern"; constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU"; MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val); - // TODO: error handling #endif - MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL; - MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); - // TODO: error handling + rc = MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle); - MPI_Group kokkoscomm_group = MPI_GROUP_NULL; constexpr char pset_name[] = "mpi://WORLD"; MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group); - // TODO: error handling - MPI_Comm kokkoscomm_comm = MPI_COMM_NULL; MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN, &kokkoscomm_comm); - // TODO: error handling - - // Resource release - MPI_Group_free(&kokkoscomm_group); - MPI_Info_free(&kokkoscomm_info); - return Universe(kokkoscomm_shandle, kokkoscomm_comm); -} - -template -auto initialize(int &argc, char *argv[]) -> Universe { - // Check that MPI was initiliazed and init if it wasn't - int is_initialized; - MPI_Initialized(&is_initialized); - if (0 == is_initialized) { - int required = MPI_THREAD_MULTIPLE, provided; - MPI_Init_thread(&argc, &argv, required, &provided); - } - - return initialize(); + return Context(kokkoscomm_shandle, kokkoscomm_comm); } } // namespace KokkosComm From 8faf2056ee26cc1e92c0a71540f5ac8d7069260a Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 23 May 2024 18:39:07 +0200 Subject: [PATCH 15/17] test(sessions): update test accordingly --- perf_tests/test_2dhalo.cpp | 4 ++-- perf_tests/test_sendrecv.cpp | 4 ++-- perf_tests/test_utils.hpp | 4 ++-- unit_tests/test_isendirecv.cpp | 8 ++++---- unit_tests/test_isendrecv.cpp | 8 ++++---- unit_tests/test_sendrecv.cpp | 8 ++++---- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/perf_tests/test_2dhalo.cpp b/perf_tests/test_2dhalo.cpp index e8761e29..a82723a5 100644 --- a/perf_tests/test_2dhalo.cpp +++ b/perf_tests/test_2dhalo.cpp @@ -74,8 +74,8 @@ void benchmark_2dhalo(benchmark::State &state) { int ny = 512; int nprops = 3; - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); diff --git a/perf_tests/test_sendrecv.cpp b/perf_tests/test_sendrecv.cpp index 63defd6b..cd6ff1a2 100644 --- a/perf_tests/test_sendrecv.cpp +++ b/perf_tests/test_sendrecv.cpp @@ -31,8 +31,8 @@ void send_recv(benchmark::State &, const KokkosComm::Communicator &comm, } void benchmark_sendrecv(benchmark::State &state) { - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); diff --git a/perf_tests/test_utils.hpp b/perf_tests/test_utils.hpp index 5ea485d2..3208fea0 100644 --- a/perf_tests/test_utils.hpp +++ b/perf_tests/test_utils.hpp @@ -26,8 +26,8 @@ // F is a function that takes (state, MPI_Comm, args...) template -void do_iteration(benchmark::State &state, KokkosComm::Communicator &comm, F &&func, - Args... args) { +void do_iteration(benchmark::State &state, const KokkosComm::Communicator &comm, + F &&func, Args... args) { using Clock = std::chrono::steady_clock; using Duration = std::chrono::duration; diff --git a/unit_tests/test_isendirecv.cpp b/unit_tests/test_isendirecv.cpp index 68735f6a..64fb4a74 100644 --- a/unit_tests/test_isendirecv.cpp +++ b/unit_tests/test_isendirecv.cpp @@ -38,8 +38,8 @@ void test_1d(const View1D &a) { static_assert(View1D::rank == 1, ""); using Scalar = typename View1D::non_const_value_type; - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); @@ -68,8 +68,8 @@ void test_2d(const View2D &a) { static_assert(View2D::rank == 2, ""); using Scalar = typename View2D::non_const_value_type; - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); diff --git a/unit_tests/test_isendrecv.cpp b/unit_tests/test_isendrecv.cpp index faa94ed7..8304dab7 100644 --- a/unit_tests/test_isendrecv.cpp +++ b/unit_tests/test_isendrecv.cpp @@ -38,8 +38,8 @@ void isend_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); @@ -71,8 +71,8 @@ void isend_comm_mode_1d_noncontig() { Kokkos::View b("a", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); if (0 == rank) { diff --git a/unit_tests/test_sendrecv.cpp b/unit_tests/test_sendrecv.cpp index 79a01030..593d2999 100644 --- a/unit_tests/test_sendrecv.cpp +++ b/unit_tests/test_sendrecv.cpp @@ -38,8 +38,8 @@ void send_comm_mode_1d_contig() { Kokkos::View a("a", 1000); - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); int size = comm.size(); @@ -70,8 +70,8 @@ void send_comm_mode_1d_noncontig() { Kokkos::View b("b", 10, 10); auto a = Kokkos::subview(b, Kokkos::ALL, 2); // take column 2 (non-contiguous) - auto universe = KokkosComm::initialize(); - auto comm = universe.comm(); + auto ctx = KokkosComm::initialize(); + const auto &comm = ctx.comm(); int rank = comm.rank(); if (0 == rank) { From d0644696ceec62bf8efab61ee423007b13c233bb Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Fri, 24 May 2024 13:36:38 +0200 Subject: [PATCH 16/17] chore: format --- unit_tests/test_alltoall.cpp | 6 ++++-- unit_tests/test_isendirecv.cpp | 9 ++++++--- unit_tests/test_isendrecv.cpp | 12 ++++++++---- unit_tests/test_reduce.cpp | 3 ++- unit_tests/test_sendrecv.cpp | 12 ++++++++---- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/unit_tests/test_alltoall.cpp b/unit_tests/test_alltoall.cpp index 3205b9ad..61397734 100644 --- a/unit_tests/test_alltoall.cpp +++ b/unit_tests/test_alltoall.cpp @@ -42,7 +42,8 @@ TYPED_TEST(Alltoall, 1D_contig) { Kokkos::View rv("rv", size * nContrib); // fill send buffer - Kokkos::parallel_for(sv.extent(0), KOKKOS_LAMBDA(const int i) { sv(i) = rank + i; }); + Kokkos::parallel_for( + sv.extent(0), KOKKOS_LAMBDA(const int i) { sv(i) = rank + i; }); KokkosComm::Impl::alltoall(Kokkos::DefaultExecutionSpace(), sv, nContrib, rv, nContrib, MPI_COMM_WORLD); @@ -70,7 +71,8 @@ TYPED_TEST(Alltoall, 1D_inplace_contig) { Kokkos::View rv("rv", size * nContrib); // fill send buffer - Kokkos::parallel_for(rv.extent(0), KOKKOS_LAMBDA(const int i) { rv(i) = rank + i; }); + Kokkos::parallel_for( + rv.extent(0), KOKKOS_LAMBDA(const int i) { rv(i) = rank + i; }); KokkosComm::Impl::alltoall(Kokkos::DefaultExecutionSpace(), rv, nContrib, MPI_COMM_WORLD); diff --git a/unit_tests/test_isendirecv.cpp b/unit_tests/test_isendirecv.cpp index 64fb4a74..ad58a766 100644 --- a/unit_tests/test_isendirecv.cpp +++ b/unit_tests/test_isendirecv.cpp @@ -49,7 +49,8 @@ void test_1d(const View1D &a) { if (0 == rank) { int dst = 1; - Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + Kokkos::parallel_for( + a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { @@ -58,7 +59,8 @@ void test_1d(const View1D &a) { KokkosComm::Impl::irecv(a, src, 0, comm.as_raw(), req); MPI_Wait(&req, MPI_STATUS_IGNORE); int errs; - Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce( + a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } @@ -82,7 +84,8 @@ void test_2d(const View2D &a) { if (0 == rank) { int dst = 1; - Kokkos::parallel_for(policy, KOKKOS_LAMBDA(int i, int j) { a(i, j) = i * a.extent(0) + j; }); + Kokkos::parallel_for( + policy, KOKKOS_LAMBDA(int i, int j) { a(i, j) = i * a.extent(0) + j; }); KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { diff --git a/unit_tests/test_isendrecv.cpp b/unit_tests/test_isendrecv.cpp index 8304dab7..d2b0591c 100644 --- a/unit_tests/test_isendrecv.cpp +++ b/unit_tests/test_isendrecv.cpp @@ -49,14 +49,16 @@ void isend_comm_mode_1d_contig() { if (0 == rank) { int dst = 1; - Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + Kokkos::parallel_for( + a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce( + a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } @@ -77,14 +79,16 @@ void isend_comm_mode_1d_noncontig() { int rank = comm.rank(); if (0 == rank) { int dst = 1; - Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + Kokkos::parallel_for( + a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); KokkosComm::Req req = KokkosComm::isend(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); req.wait(); } else if (1 == rank) { int src = 0; KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); + Kokkos::parallel_reduce( + a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != Scalar(i); }, errs); ASSERT_EQ(errs, 0); } } diff --git a/unit_tests/test_reduce.cpp b/unit_tests/test_reduce.cpp index f6dd0ed3..f538f64a 100644 --- a/unit_tests/test_reduce.cpp +++ b/unit_tests/test_reduce.cpp @@ -47,7 +47,8 @@ TYPED_TEST(Reduce, 1D_contig) { } // fill send buffer - Kokkos::parallel_for(sendv.extent(0), KOKKOS_LAMBDA(const int i) { sendv(i) = rank + i; }); + Kokkos::parallel_for( + sendv.extent(0), KOKKOS_LAMBDA(const int i) { sendv(i) = rank + i; }); KokkosComm::reduce(Kokkos::DefaultExecutionSpace(), sendv, recvv, MPI_SUM, 0, MPI_COMM_WORLD); diff --git a/unit_tests/test_sendrecv.cpp b/unit_tests/test_sendrecv.cpp index 593d2999..74696482 100644 --- a/unit_tests/test_sendrecv.cpp +++ b/unit_tests/test_sendrecv.cpp @@ -49,13 +49,15 @@ void send_comm_mode_1d_contig() { if (0 == rank) { int dst = 1; - Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + Kokkos::parallel_for( + a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); + Kokkos::parallel_reduce( + a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); ASSERT_EQ(errs, 0); } } @@ -76,13 +78,15 @@ void send_comm_mode_1d_noncontig() { int rank = comm.rank(); if (0 == rank) { int dst = 1; - Kokkos::parallel_for(a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); + Kokkos::parallel_for( + a.extent(0), KOKKOS_LAMBDA(const int i) { a(i) = i; }); KokkosComm::send(Kokkos::DefaultExecutionSpace(), a, dst, 0, comm); } else if (1 == rank) { int src = 0; KokkosComm::recv(Kokkos::DefaultExecutionSpace(), a, src, 0, comm); int errs; - Kokkos::parallel_reduce(a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); + Kokkos::parallel_reduce( + a.extent(0), KOKKOS_LAMBDA(const int &i, int &lsum) { lsum += a(i) != i; }, errs); ASSERT_EQ(errs, 0); } } From f0d52852faed440869524379df9021985c4ab2a2 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Fri, 24 May 2024 13:37:29 +0200 Subject: [PATCH 17/17] fix(sessions): only free communicator if it isn't already null --- src/KokkosComm_communicator.hpp | 8 +++++++- src/impl/KokkosComm_MPI_instance.hpp | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/KokkosComm_communicator.hpp b/src/KokkosComm_communicator.hpp index af1b6dc0..c3ecd7c1 100644 --- a/src/KokkosComm_communicator.hpp +++ b/src/KokkosComm_communicator.hpp @@ -37,7 +37,13 @@ class Communicator { Communicator(MPI_Comm comm) : _comm(comm) {} Communicator(const Communicator& other) = delete; Communicator(const Communicator&& other) { _comm = std::move(other._comm); } - ~Communicator() { /*MPI_Comm_free(&_comm);*/ } + ~Communicator() { + // Only free the communicator if it hasn't been set to `MPI_COMM_NULL` before. This is to prevent double freeing + // when we explicitly call the communicator's dtor in the `Context` dtor. + if (MPI_COMM_NULL != _comm) { + MPI_Comm_free(&_comm); + } + } static auto dup_raw(MPI_Comm raw) -> Communicator { MPI_Comm new_comm; diff --git a/src/impl/KokkosComm_MPI_instance.hpp b/src/impl/KokkosComm_MPI_instance.hpp index bfde3d34..df86776a 100644 --- a/src/impl/KokkosComm_MPI_instance.hpp +++ b/src/impl/KokkosComm_MPI_instance.hpp @@ -33,7 +33,11 @@ class Context { public: Context(MPI_Session shandle, MPI_Comm comm) : _shandle(shandle), _comm(Communicator(comm)) {} - ~Context() { MPI_Session_finalize(&_shandle); } + ~Context() { + // Ensure the session-associated communicator is destroyed before the session is finalized. + _comm.~Communicator(); + MPI_Session_finalize(&_shandle); + } auto comm(void) -> const Communicator& { return _comm; } };