Skip to content

Commit

Permalink
#1024: Schedule the PendingSend call from collective bcast instead of…
Browse files Browse the repository at this point in the history
… invoking it
  • Loading branch information
JacobDomagala authored and Braden Mailloux committed Oct 15, 2020
1 parent ca3f2a1 commit 3138a72
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 25 deletions.
8 changes: 4 additions & 4 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ struct CollectionManager
>
messaging::PendingSend broadcastMsgCollective(
CollectionProxyWrapType<typename MsgT::CollectionType> const& proxy,
MsgT* msg, bool instrument = true);
messaging::MsgPtrThief<MsgT> msg, bool instrument = true);

/**
* \brief Broadcast collective a message with action member handler
Expand All @@ -982,7 +982,7 @@ struct CollectionManager
>
messaging::PendingSend broadcastMsgCollective(
CollectionProxyWrapType<typename MsgT::CollectionType> const& proxy,
MsgT* msg, bool instrument = true);
messaging::MsgPtrThief<MsgT> msg, bool instrument = true);

/**
* \internal \brief Broadcast collective a message
Expand All @@ -996,8 +996,8 @@ struct CollectionManager
*/
template <typename MsgT, typename ColT>
messaging::PendingSend broadcastMsgCollectiveImpl(
CollectionProxyWrapType<ColT> const& proxy, MsgT* msg,
bool instrument = true);
CollectionProxyWrapType<ColT> const& proxy, MsgPtr<MsgT>& msg,
bool instrument);

/**
* \brief Broadcast a message with action function handler
Expand Down
40 changes: 19 additions & 21 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -854,14 +854,15 @@ template <
>
messaging::PendingSend CollectionManager::broadcastMsgCollective(
CollectionProxyWrapType<typename MsgT::CollectionType> const& proxy,
MsgT* msg, bool instrument
messaging::MsgPtrThief<MsgT> msg, bool instrument
) {
using ColT = typename MsgT::CollectionType;

msg->setVrtHandler(auto_registry::makeAutoHandlerCollection<ColT, MsgT, f>());
msg->setMember(false);
auto& msgPtr = msg.msg_;
msgPtr->setVrtHandler(auto_registry::makeAutoHandlerCollection<ColT, MsgT, f>());
msgPtr->setMember(false);

return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg, instrument);
return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msgPtr, instrument);
}

template <
Expand All @@ -870,24 +871,24 @@ template <
>
messaging::PendingSend CollectionManager::broadcastMsgCollective(
CollectionProxyWrapType<typename MsgT::CollectionType> const& proxy,
MsgT* msg, bool instrument
messaging::MsgPtrThief<MsgT> msg, bool instrument
) {
using ColT = typename MsgT::CollectionType;

msg->setVrtHandler(
auto& msgPtr = msg.msg_;
msgPtr->setVrtHandler(
auto_registry::makeAutoHandlerCollectionMem<ColT, MsgT, f>()
);
msg->setMember(true);
msgPtr->setMember(true);

return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg, instrument);
return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msgPtr, instrument);
}

template <typename MsgT, typename ColT>
messaging::PendingSend CollectionManager::broadcastMsgCollectiveImpl(
CollectionProxyWrapType<ColT> const& proxy, MsgT* raw_msg, bool instrument
CollectionProxyWrapType<ColT> const& proxy, MsgPtr<MsgT>& msg, bool instrument
) {
using IndexT = typename ColT::IndexType;
auto msg = promoteMsg(raw_msg);

msg->setFromNode(theContext()->getNode());
msg->setBcastProxy(proxy.getProxy());
Expand All @@ -910,20 +911,17 @@ messaging::PendingSend CollectionManager::broadcastMsgCollectiveImpl(
msg->setCat(balance::CommCategory::NodeToCollection);
#endif

theMsg()->setupEpochMsg(msg);

return messaging::PendingSend(
msg, [proxy](MsgSharedPtr<BaseMsgType>& msgIn){
auto col_msg = reinterpret_cast<MsgT*>(msgIn.get());
auto const cur_epoch = theMsg()->setupEpochMsg(msg);

auto elm_holder = theCollection()->findElmHolder<ColT, IndexT>(proxy);
auto const bcast_epoch = elm_holder->cur_bcast_epoch_++;
col_msg->setBcastEpoch(bcast_epoch);
return schedule(msg, false, cur_epoch, [proxy, msg] {
auto elm_holder = theCollection()->findElmHolder<ColT, IndexT>(proxy);
auto const bcast_epoch = elm_holder->cur_bcast_epoch_++;
msg->setBcastEpoch(bcast_epoch);

theMsg()->markAsCollectionMessage(col_msg);
theMsg()->markAsCollectionMessage(msg);

collectionBcastHandler<ColT, IndexT, MsgT>(col_msg);
}
collectionBcastHandler<ColT, IndexT, MsgT>(msg.get());
}
);
}

Expand Down

0 comments on commit 3138a72

Please sign in to comment.