Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft kokkos-comm initialization/finalization using MPI_Sessions #68

Draft
wants to merge 20 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5614211
feat: add `Universe` class to handle KokkosComm initialization/finali…
dssgabriel May 15, 2024
45c695f
feat: add support for buffered mode send operations
dssgabriel May 15, 2024
ea667db
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 16, 2024
bc31289
fix: correctly detach buffer and fix size check
dssgabriel May 16, 2024
4852d9c
feat: first draft of `Communicator` class
dssgabriel May 16, 2024
e37173b
feat(session): implement a first draft of session-based init
dssgabriel May 16, 2024
aac626e
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 17, 2024
4bc55e2
feat: initialization and finalization routines for KokkosComm
dssgabriel May 18, 2024
afbb017
test: `send_recv` with KokkosComm init and finalize
dssgabriel May 18, 2024
89e3ee3
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 20, 2024
dcc4584
feat(sessions): move init & finalize w/ `MPI_Session` to its own file
dssgabriel May 23, 2024
de176b6
revert: removed Buffered communication mode
dssgabriel May 23, 2024
bb58138
feat(sessions): removed dead code in `Communicator` class
dssgabriel May 23, 2024
c2812bd
test(sessions): update tests with new `init`/`finalize` API
dssgabriel May 23, 2024
4ceee5f
chore: format
dssgabriel May 23, 2024
40ba7e7
fix(sessions): remove code related to MPI's WPM
dssgabriel May 23, 2024
822722d
refactor(sessions): rename as `Context`, remove `initialize` w/ WPM &…
dssgabriel May 23, 2024
8faf205
test(sessions): update test accordingly
dssgabriel May 23, 2024
d064469
chore: format
dssgabriel May 24, 2024
f0d5285
fix(sessions): only free communicator if it isn't already null
dssgabriel May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions perf_tests/test_2dhalo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

#include "KokkosComm.hpp"

#include <Kokkos_Core_fwd.hpp>
#include <iostream>

void noop(benchmark::State, MPI_Comm) {}
void noop(benchmark::State, const KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm) {}

template <typename Space, typename View>
void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int nx, int ny, int rx, int ry, int rs,
const View &v) {
void send_recv(benchmark::State &, const KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm,
const Space &space, int nx, int ny, int rx, int ry, int rs, const View &v) {
// 2D index of nbrs in minus and plus direction (periodic)
const int xm1 = (rx + rs - 1) % rs;
const int ym1 = (ry + rs - 1) % rs;
Expand Down Expand Up @@ -73,10 +74,11 @@ void benchmark_2dhalo(benchmark::State &state) {
int ny = 512;
int nprops = 3;

int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
auto ctx = KokkosComm::initialize<Kokkos::DefaultExecutionSpace>();
const auto &comm = ctx.comm();

int rank = comm.rank();
int size = comm.size();
const int rs = std::sqrt(size);
const int rx = rank % rs;
const int ry = rank / rs;
Expand All @@ -86,12 +88,11 @@ void benchmark_2dhalo(benchmark::State &state) {
// grid of elements, each with 3 properties, and a radius-1 halo
grid_type grid("", nx + 2, ny + 2, nprops);
while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, send_recv<Kokkos::DefaultExecutionSpace, grid_type>, space, nx, ny, rx, ry,
rs, grid);
do_iteration(state, comm, send_recv<Kokkos::DefaultExecutionSpace, grid_type>, space, nx, ny, rx, ry, rs, grid);
}
} else {
while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, noop); // do nothing...
do_iteration(state, comm, noop); // do nothing...
}
}

Expand All @@ -113,4 +114,4 @@ void benchmark_2dhalo(benchmark::State &state) {
// clang-format on
}

BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond);
BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond);
15 changes: 9 additions & 6 deletions perf_tests/test_sendrecv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#include "KokkosComm.hpp"

template <typename Space, typename View>
void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, const View &v) {
void send_recv(benchmark::State &, const KokkosComm::Communicator<Space> &comm, const Space &space, int rank,
const View &v) {
if (0 == rank) {
KokkosComm::send(space, v, 1, 0, comm);
KokkosComm::recv(space, v, 1, 0, comm);
Expand All @@ -30,9 +31,11 @@ void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank,
}

void benchmark_sendrecv(benchmark::State &state) {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
auto ctx = KokkosComm::initialize<Kokkos::DefaultExecutionSpace>();
const auto &comm = ctx.comm();

int rank = comm.rank();
int size = comm.size();
if (size < 2) {
state.SkipWithError("benchmark_sendrecv needs at least 2 ranks");
}
Expand All @@ -44,10 +47,10 @@ void benchmark_sendrecv(benchmark::State &state) {
view_type a("", 1000000);

while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, send_recv<Kokkos::DefaultExecutionSpace, view_type>, space, rank, a);
do_iteration(state, comm, send_recv<Kokkos::DefaultExecutionSpace, view_type>, space, rank, a);
}

state.SetBytesProcessed(sizeof(Scalar) * state.iterations() * a.size() * 2);
}

BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond);
BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond);
9 changes: 6 additions & 3 deletions perf_tests/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

#pragma once

#include <Kokkos_Core_fwd.hpp>
#include <chrono>

#include <benchmark/benchmark.h>

#include "KokkosComm_communicator.hpp"
#include "KokkosComm_include_mpi.hpp"

// F is a function that takes (state, MPI_Comm, args...)
template <typename F, typename... Args>
void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args) {
void do_iteration(benchmark::State &state, const KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm,
F &&func, Args... args) {
using Clock = std::chrono::steady_clock;
using Duration = std::chrono::duration<double>;

Expand All @@ -34,6 +37,6 @@ void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args

double max_elapsed_second;
double elapsed_seconds = elapsed.count();
MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm);
MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm.as_raw());
state.SetIterationTime(max_elapsed_second);
}
}
15 changes: 9 additions & 6 deletions src/KokkosComm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,34 @@

#pragma once

#include <mpi.h>
#include "KokkosComm_collective.hpp"
#include "KokkosComm_communicator.hpp"
#include "KokkosComm_version.hpp"
#include "KokkosComm_isend.hpp"
#include "KokkosComm_recv.hpp"
#include "KokkosComm_send.hpp"
#include "KokkosComm_concepts.hpp"
#include "KokkosComm_comm_mode.hpp"
#include "KokkosComm_MPI_instance.hpp"

#include <Kokkos_Core.hpp>

namespace KokkosComm {

template <CommMode SendMode = CommMode::Default, KokkosExecutionSpace ExecSpace, KokkosView SendView>
Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) {
return Impl::isend<SendMode>(space, sv, dest, tag, comm);
Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator<ExecSpace> &comm) {
return Impl::isend<SendMode>(space, sv, dest, tag, comm.as_raw());
}

template <CommMode SendMode = CommMode::Default, KokkosExecutionSpace ExecSpace, KokkosView SendView>
void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) {
return Impl::send<SendMode>(space, sv, dest, tag, comm);
void send(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator<ExecSpace> &comm) {
return Impl::send<SendMode>(space, sv, dest, tag, comm.as_raw());
}

template <KokkosExecutionSpace ExecSpace, KokkosView RecvView>
void recv(const ExecSpace &space, RecvView &sv, int src, int tag, MPI_Comm comm) {
return Impl::recv(space, sv, src, tag, comm);
void recv(const ExecSpace &space, RecvView &sv, int src, int tag, const Communicator<ExecSpace> &comm) {
return Impl::recv(space, sv, src, tag, comm.as_raw());
}

} // namespace KokkosComm
22 changes: 9 additions & 13 deletions src/KokkosComm_comm_mode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@ namespace KokkosComm {
// Scoped enumeration to specify the communication mode of a sending operation.
// See section 3.4 of the MPI standard for a complete specification.
enum class CommMode {
// Default mode: lets the user override the send operations behavior at
// compile-time. E.g., this can be set to mode "Synchronous" for debug
// builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE.
// Default mode: lets the user override the send operations behavior at compile-time. E.g. this can be set to mode
// "Synchronous" for debug builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE.
Default,
// Standard mode: MPI implementation decides whether outgoing messages will
// be buffered. Send operations can be started whether or not a matching
// receive has been started. They may complete before a matching receive is
// started. Standard mode is non-local: successful completion of the send
// operation may depend on the occurrence of a matching receive.
// Standard mode: MPI implementation decides whether outgoing messages will be buffered. Send operations can be
// started whether or not a matching receive has been started. They may complete before a matching receive is started.
// Standard mode is non-local: successful completion of the send operation may depend on the occurrence of a matching
// receive.
Standard,
// Ready mode: Send operations may be started only if the matching receive is
// already started.
// Ready mode: Send operations may be started only if the matching receive is already started.
Ready,
// Synchronous mode: Send operations complete successfully only if a matching
// receive is started, and the receive operation has started to receive the
// message sent.
// Synchronous mode: Send operations complete successfully only if a matching receive is started, and the receive
// operation has started to receive the message sent.
Synchronous,
};

Expand Down
81 changes: 81 additions & 0 deletions src/KokkosComm_communicator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//@HEADER
// ************************************************************************
//
// Kokkos v. 4.0
// Copyright (2022) National Technology & Engineering
// Solutions of Sandia, LLC (NTESS).
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
// See https://kokkos.org/LICENSE for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//@HEADER

#pragma once

#include "KokkosComm_concepts.hpp"

#include <Kokkos_Core_fwd.hpp>
#include <cstdio>
#include <mpi.h>

namespace KokkosComm {

using Color = int;
using Key = int;

template <KokkosExecutionSpace ExecSpace = Kokkos::DefaultExecutionSpace>
class Communicator {
devreal marked this conversation as resolved.
Show resolved Hide resolved
private:
MPI_Comm _comm;
ExecSpace _exec_space;

public:
Communicator(MPI_Comm comm) : _comm(comm) {}
Communicator(const Communicator& other) = delete;
Communicator(const Communicator&& other) { _comm = std::move(other._comm); }
~Communicator() {
// Only free the communicator if it hasn't been set to `MPI_COMM_NULL` before. This is to prevent double freeing
// when we explicitly call the communicator's dtor in the `Context` dtor.
if (MPI_COMM_NULL != _comm) {
MPI_Comm_free(&_comm);
}
}

static auto dup_raw(MPI_Comm raw) -> Communicator {
MPI_Comm new_comm;
MPI_Comm_dup(raw, &new_comm);
return Communicator(new_comm);
}

static auto dup(const Communicator& other) -> Communicator { return Communicator::dup_raw(other.as_raw()); }

static auto split_raw(MPI_Comm raw, Color color, Key key) -> Communicator {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this isn't just an overload of split?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make it explicit that we're splitting from a raw MPI_Comm handle, not a wrapped KokkosComm::Communicator, but I guess we can also simply overload.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's a design decision that the Kokkos community should make (to be consistent with the other projects)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the Raw. We typically are happy with overloads.

MPI_Comm new_comm;
MPI_Comm_split(raw, color, key, &new_comm);
return Communicator(new_comm);
}

static auto split(const Communicator& other, Color color, Key key) -> Communicator {
return Communicator::split_raw(other.as_raw(), color, key);
}

inline auto as_raw() const -> MPI_Comm { return _comm; }

inline auto size(void) const -> int {
int size;
MPI_Comm_size(_comm, &size);
return size;
}

inline auto rank(void) const -> int {
int rank;
MPI_Comm_rank(_comm, &rank);
return rank;
}
};

} // namespace KokkosComm
79 changes: 79 additions & 0 deletions src/impl/KokkosComm_MPI_instance.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//@HEADER
// ************************************************************************
//
// Kokkos v. 4.0
// Copyright (2022) National Technology & Engineering
// Solutions of Sandia, LLC (NTESS).
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
// See https://kokkos.org/LICENSE for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//@HEADER

#pragma once

#include "KokkosComm_communicator.hpp"
#include "KokkosComm_concepts.hpp"

#include <Kokkos_Core.hpp>
#include <mpi.h>

namespace KokkosComm {

template <KokkosExecutionSpace ExecSpace>
class Context {
private:
MPI_Session _shandle;
Communicator<ExecSpace> _comm;

public:
Context(MPI_Session shandle, MPI_Comm comm) : _shandle(shandle), _comm(Communicator<ExecSpace>(comm)) {}

~Context() {
// Ensure the session-associated communicator is destroyed before the session is finalized.
_comm.~Communicator();
MPI_Session_finalize(&_shandle);
}

auto comm(void) -> const Communicator<ExecSpace>& { return _comm; }
};

template <KokkosExecutionSpace ExecSpace>
auto initialize(void) -> Context<ExecSpace> {
int rc;

MPI_Session kokkoscomm_shandle = MPI_SESSION_NULL;
MPI_Group kokkoscomm_group = MPI_GROUP_NULL;
MPI_Comm kokkoscomm_comm = MPI_COMM_NULL;
MPI_Info kokkoscomm_info = MPI_INFO_NULL;

MPI_Info_create(&kokkoscomm_info);

// Set threading level for our session
constexpr char thrd_lvl_key[] = "thread_level";
constexpr char thrd_lvl_val[] = "MPI_THREAD_MULTIPLE";
MPI_Info_set(kokkoscomm_info, thrd_lvl_key, thrd_lvl_val);

#ifdef KOKKOSCOMM_CUDA_AWARE_MPI
// Disable CUDA pointer attribute checks from MPI
constexpr char cu_ptr_attr_key[] = "mpi_communication_pattern";
constexpr char cu_ptr_attr_val[] = "MPI_CPU_TO_GPU";
MPI_Info_set(kokkoscomm_info, cu_ptr_attr_key, cu_ptr_attr_val);
#endif

rc = MPI_Session_init(kokkoscomm_info, MPI_ERRORS_RETURN, &kokkoscomm_shandle);

constexpr char pset_name[] = "mpi://WORLD";
MPI_Group_from_session_pset(kokkoscomm_shandle, pset_name, &kokkoscomm_group);

MPI_Comm_create_from_group(kokkoscomm_group, "kokkos-comm.default_session", MPI_INFO_NULL, MPI_ERRORS_RETURN,
&kokkoscomm_comm);

return Context<ExecSpace>(kokkoscomm_shandle, kokkoscomm_comm);
}

} // namespace KokkosComm
1 change: 1 addition & 0 deletions src/impl/KokkosComm_isend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "KokkosComm_request.hpp"
#include "KokkosComm_traits.hpp"
#include "KokkosComm_comm_mode.hpp"
#include "KokkosComm_communicator.hpp"

// impl
#include "KokkosComm_include_mpi.hpp"
Expand Down
Loading
Loading