Skip to content

Commit

Permalink
#1024: Use collectionBcastHandler function to handle collective bcast
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobDomagala committed Sep 22, 2020
1 parent 0e06bdf commit 78c465f
Showing 1 changed file with 32 additions and 22 deletions.
54 changes: 32 additions & 22 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ messaging::PendingSend CollectionManager::broadcastMsgCollective(
msg->setVrtHandler(auto_registry::makeAutoHandlerCollection<ColT, MsgT, f>());
msg->setMember(false);

return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg);
return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg, instrument);
}

template <
Expand All @@ -877,39 +877,49 @@ messaging::PendingSend CollectionManager::broadcastMsgCollective(
auto_registry::makeAutoHandlerCollectionMem<ColT, MsgT, f>());
msg->setMember(true);

return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg);
return broadcastMsgCollectiveImpl<MsgT, ColT>(proxy, msg, instrument);
}

template <typename MsgT, typename ColT>
messaging::PendingSend CollectionManager::broadcastMsgCollectiveImpl(
CollectionProxyWrapType<ColT> const& proxy, MsgT* msg
CollectionProxyWrapType<ColT> const& proxy, MsgT* raw_msg, bool instrument
) {
using IndexT = typename ColT::IndexType;
auto promoMsg = promoteMsg(msg);
auto msg = promoteMsg(raw_msg);

return messaging::PendingSend(
promoMsg, [proxy](MsgSharedPtr<BaseMsgType>& msgIn) {
auto elm_holder = theCollection()->findElmHolder<ColT, IndexT>(proxy);
auto const node = theContext()->getNode();
msg->setFromNode(theContext()->getNode());
msg->setBcastProxy(proxy.getProxy());

#if vt_check_enabled(trace_enabled)
// Create the trace creation event for the broadcast here to connect it a
// higher semantic level
auto reg_type = msg->getMember() ?
auto_registry::RegistryTypeEnum::RegVrtCollectionMember :
auto_registry::RegistryTypeEnum::RegVrtCollection;
auto msg_size = vt::serialization::MsgSizer<MsgT>::get(msg.get());
auto event =
theMsg()->makeTraceCreationSend(msg, handler, reg_type, msg_size, true);
msg->setFromTraceEvent(event);
#endif

#if vt_check_enabled(lblite)
msg->setLBLiteInstrument(instrument);
msg->setCat(balance::CommCategory::NodeToCollection);
#endif

auto const cur_epoch = theMsg()->setupEpochMsg(msg);

return messaging::PendingSend(
msg, [proxy](MsgSharedPtr<BaseMsgType>& msgIn) {
auto col_msg = reinterpret_cast<MsgT*>(msgIn.get());

auto elm_holder = theCollection()->findElmHolder<ColT, IndexT>(proxy);
auto const bcast_epoch = elm_holder->cur_bcast_epoch_++;
col_msg->setBcastEpoch(bcast_epoch);

theMsg()->markAsCollectionMessage(col_msg);

if (elm_holder) {
elm_holder->foreach (
[node,
col_msg](IndexT const& idx, CollectionBase<ColT, IndexT>* base) {
auto const from = col_msg->getFromNode();
auto trace_event = trace::no_trace_event;
auto const hand = col_msg->getVrtHandler();
auto const member = col_msg->getMember();

collectionAutoMsgDeliver<
ColT, IndexT, MsgT, typename MsgT::UserMsgType>(
col_msg, base, hand, member, from, trace_event);
});
}
collectionBcastHandler<ColT, IndexT, MsgT>(col_msg);
});
}

Expand Down

0 comments on commit 78c465f

Please sign in to comment.