Skip to content

Commit

Permalink
Merge pull request #818 from DARMA-tasking/816-use-mpi-collective-api
Browse files Browse the repository at this point in the history
816 Use safe MPI collective API for all blocking MPI calls
  • Loading branch information
lifflander authored May 28, 2020
2 parents ab8cc47 + 33382fa commit 0be45cd
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 41 deletions.
8 changes: 3 additions & 5 deletions src/vt/context/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ Context::Context(int argc, char** argv, bool const is_interop, MPI_Comm* comm) {
communicator_ = MPI_COMM_WORLD;
}

MPI_Barrier(communicator_);

if (is_interop) {
MPI_Comm_split(communicator_, 0, 0, &communicator_);
MPI_Comm vt_comm;
MPI_Comm_dup(communicator_, &vt_comm);
communicator_ = vt_comm;
}

MPI_Barrier(communicator_);

is_comm_world_ = communicator_ == MPI_COMM_WORLD;

int numNodesLocal = uninitialized_destination;
Expand Down
22 changes: 16 additions & 6 deletions src/vt/group/collective/group_info_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ MPI_Comm InfoColl::getComm() const {

void InfoColl::freeComm() {
if (mpi_group_comm != MPI_COMM_WORLD) {
MPI_Comm_free(&mpi_group_comm);
mpi_group_comm = MPI_COMM_WORLD;
theGroup()->collective_scope_.mpiCollectiveWait([this]{
MPI_Comm_free(&mpi_group_comm);
mpi_group_comm = MPI_COMM_WORLD;
});
}
}

void InfoColl::setupCollective() {
auto const& this_node = theContext()->getNode();
auto const& num_nodes = theContext()->getNumNodes();
auto const& group_ = getGroupID();

Expand Down Expand Up @@ -137,9 +138,18 @@ void InfoColl::setupCollective() {
);

if (make_mpi_group_) {
auto const cur_comm = theContext()->getComm();
int32_t const group_color = in_group;
MPI_Comm_split(cur_comm, group_color, this_node, &mpi_group_comm);
// Create the MPI group, and wait for all nodes to get here. In theory, this
// might be overlapable with VT's group construction, but for now just wait
// on it here. These should be ordered within this scope, as the collective
// group creation is a collective invocation.
theGroup()->collective_scope_.mpiCollectiveWait(
[in_group,this]{
auto const this_node_impl = theContext()->getNode();
auto const cur_comm = theContext()->getComm();
int32_t const group_color = in_group;
MPI_Comm_split(cur_comm, group_color, this_node_impl, &mpi_group_comm);
}
);
}

up_tree_cont_ = makeCollectiveContinuation(group_);
Expand Down
4 changes: 3 additions & 1 deletion src/vt/group/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ EventType GroupManager::sendGroup(
return ret_event;
}

GroupManager::GroupManager() {
GroupManager::GroupManager()
: collective_scope_(theCollective()->makeCollectiveScope())
{
global::DefaultGroup::setupDefaultTree();
}

Expand Down
3 changes: 3 additions & 0 deletions src/vt/group/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "vt/activefn/activefn.h"
#include "vt/collective/tree/tree.h"
#include "vt/collective/reduce/reduce.h"
#include "vt/collective/collective_scope.h"
#include "vt/runtime/component/component_pack.h"

#include <memory>
Expand All @@ -87,6 +88,7 @@ struct GroupManager : runtime::component::Component<GroupManager> {
>;
using ReduceType = collective::reduce::Reduce;
using ReducePtrType = ReduceType*;
using CollectiveScopeType = collective::CollectiveScope;

GroupManager();

Expand Down Expand Up @@ -190,6 +192,7 @@ struct GroupManager : runtime::component::Component<GroupManager> {
RemoteOperationIDType cur_collective_id_ = 0xFFFFFFFF00000000;
ActionContainerType continuation_actions_ = {};
ActionListType cleanup_actions_ = {};
CollectiveScopeType collective_scope_;
};

// This is a separate template class because Intel 18 didn't like
Expand Down
48 changes: 27 additions & 21 deletions src/vt/rdma/channel/rdma_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
*/

#include "vt/rdma/channel/rdma_channel.h"
#include "vt/rdma/rdma.h"
#include "vt/collective/collective_alg.h"

#define PRINT_RDMA_OP_TYPE(OP) ((OP) == RDMA_TypeType::Get ? "GET" : "PUT")

Expand Down Expand Up @@ -114,14 +116,16 @@ Channel::initChannelGroup() {
group_incl_ret == MPI_SUCCESS, "MPI_Group_incl: Should be successful"
);

auto const& comm_create_ret = MPI_Comm_create_group(
theContext()->getComm(), channel_group_, channel_group_tag_, &channel_comm_
);
theRDMA()->collective_scope_.mpiCollectiveWait([&]{
auto const& comm_create_ret = MPI_Comm_create_group(
theContext()->getComm(), channel_group_, channel_group_tag_, &channel_comm_
);

vtAssert(
comm_create_ret == MPI_SUCCESS,
"MPI_Comm_create_group: Should be successful"
);
vtAssert(
comm_create_ret == MPI_SUCCESS,
"MPI_Comm_create_group: Should be successful"
);
});

debug_print(
rdma_channel, node,
Expand Down Expand Up @@ -299,21 +303,23 @@ Channel::initChannelWindow() {
"channel: create window: num_bytes={}\n", num_bytes_
);

int win_create_ret = 0;

if (is_target_) {
win_create_ret = MPI_Win_create(
ptr_, num_bytes_, rdma_elm_size, MPI_INFO_NULL, channel_comm_, &window_
theRDMA()->collective_scope_.mpiCollectiveWait([&]{
int win_create_ret = 0;

if (is_target_) {
win_create_ret = MPI_Win_create(
ptr_, num_bytes_, rdma_elm_size, MPI_INFO_NULL, channel_comm_, &window_
);
} else {
win_create_ret = MPI_Win_create(
nullptr, 0, 1, MPI_INFO_NULL, channel_comm_, &window_
);
}

vtAssert(
win_create_ret == MPI_SUCCESS, "MPI_Win_create: Should be successful"
);
} else {
win_create_ret = MPI_Win_create(
nullptr, 0, 1, MPI_INFO_NULL, channel_comm_, &window_
);
}

vtAssert(
win_create_ret == MPI_SUCCESS, "MPI_Win_create: Should be successful"
);
});

initialized_ = true;

Expand Down
5 changes: 5 additions & 0 deletions src/vt/rdma/rdma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@
#include "vt/rdma/rdma.h"
#include "vt/messaging/active.h"
#include "vt/pipe/pipe_headers.h"
#include "vt/collective/collective_alg.h"

#include <cstring>

namespace vt { namespace rdma {

RDMAManager::RDMAManager()
: collective_scope_(theCollective()->makeCollectiveScope())
{ }

/*static*/ void RDMAManager::getRDMAMsg(GetMessage* msg) {
auto const msg_tag = envelopeGetTag(msg->env);
auto const op_id = msg->op_id;
Expand Down
9 changes: 9 additions & 0 deletions src/vt/rdma/rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@

#include "vt/runtime/component/component_pack.h"

#include "vt/collective/collective_scope.h"

#include <unordered_map>
#include <cassert>

Expand Down Expand Up @@ -98,6 +100,9 @@ struct RDMAManager : runtime::component::Component<RDMAManager> {
template <typename MsgType>
using RDMA_PutTypedFunctionType =
RDMA_StateType::RDMA_PutTypedFunctionType<MsgType>;
using CollectiveScopeType = collective::CollectiveScope;

RDMAManager();

std::string name() override { return "RDMAManager"; }

Expand Down Expand Up @@ -557,6 +562,7 @@ struct RDMAManager : runtime::component::Component<RDMAManager> {

public:
friend struct RDMACollectionManager;
friend struct Channel;

public:
RDMA_HandlerType allocateNewRdmaHandler();
Expand Down Expand Up @@ -597,6 +603,9 @@ struct RDMAManager : runtime::component::Component<RDMAManager> {

// Current RDMA channel tag
TagType next_channel_tag_ = first_rdma_channel_tag;

// Collective scope for issuing MPI collective operations
CollectiveScopeType collective_scope_;
};

}} //end namespace vt::rdma
Expand Down
5 changes: 5 additions & 0 deletions src/vt/rdmahandle/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@
#include "vt/config.h"
#include "vt/rdmahandle/manager.h"
#include "vt/objgroup/manager.h"
#include "vt/collective/collective_alg.h"

namespace vt { namespace rdma {

Manager::Manager()
: collective_scope_(theCollective()->makeCollectiveScope())
{ }

void Manager::finalize() {
vt::theObjGroup()->destroyCollective(proxy_);
}
Expand Down
7 changes: 6 additions & 1 deletion src/vt/rdmahandle/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "vt/topos/mapping/dense/dense.h"
#include "vt/rdmahandle/handle_set.h"
#include "vt/runtime/component/component_pack.h"
#include "vt/collective/collective_scope.h"

namespace vt { namespace rdma {

Expand All @@ -83,8 +84,9 @@ struct Manager : runtime::component::Component<Manager> {
using ProxyType = vt::objgroup::proxy::Proxy<Manager>;
using ElemToHandle = std::unordered_map<int64_t, RDMA_HandleType>;
using HandleToManager = std::unordered_map<RDMA_HandleType, ObjGroupProxyType>;
using CollectiveScopeType = collective::CollectiveScope;

Manager() = default;
Manager();

std::string name() override { return "HandleRDMA"; }

Expand Down Expand Up @@ -241,6 +243,9 @@ struct Manager : runtime::component::Component<Manager> {
/// Holder for RDMA control data
template <typename T, HandleEnum E>
static std::unordered_map<HandleKey, Holder<T,E>> holder_;

// Collective scope for MPI operations
CollectiveScopeType collective_scope_;
};

template <typename T, HandleEnum E>
Expand Down
7 changes: 3 additions & 4 deletions src/vt/rdmahandle/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ void Manager::finishMake(impl::ConstructMsg<T, E, ProxyT>* msg) {
key.handle_, size, count
);
auto& entry = getEntry<T,E>(key);
// Need barrier here so all nodes arrive before blocking in the MPI window
// creation
theCollective()->barrier();
entry.allocateDataWindow();
collective_scope_.mpiCollectiveWait([&entry]{
entry.allocateDataWindow();
});
}

template <typename T, HandleEnum E, typename ProxyT>
Expand Down
8 changes: 5 additions & 3 deletions src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1298,8 +1298,9 @@ void Runtime::initializeComponents() {

p_->registerComponent<rdma::RDMAManager>(
&theRDMA, Deps<
ctx::Context, // Everything depends on theContext
messaging::ActiveMessenger // Depends on active messenger for RDMA
ctx::Context, // Everything depends on theContext
messaging::ActiveMessenger, // Depends on active messenger for RDMA
collective::CollectiveAlg // Depends on collective scope
>{}
);

Expand Down Expand Up @@ -1357,7 +1358,8 @@ void Runtime::initializeComponents() {
ctx::Context, // Everything depends on theContext
messaging::ActiveMessenger, // Depends on active messenger for messaging
vrt::collection::CollectionManager, // For RDMA on collection elements
objgroup::ObjGroupManager // For RDMA on objgroups
objgroup::ObjGroupManager, // For RDMA on objgroups
collective::CollectiveAlg // Depends on collective scope
>{}
);

Expand Down

0 comments on commit 0be45cd

Please sign in to comment.