From 2563d132c1770b92c36222a72ac9e174a94fe339 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 27 Sep 2022 18:25:59 -0700 Subject: [PATCH] #1983: runnable: optimize objgroup dispatch --- src/vt/messaging/active.cc | 17 +-- src/vt/objgroup/dispatch/dispatch.h | 77 -------------- src/vt/objgroup/dispatch/dispatch.impl.h | 68 ------------ src/vt/objgroup/dispatch/dispatch_base.h | 86 --------------- src/vt/objgroup/manager.cc | 38 ++----- src/vt/objgroup/manager.fwd.h | 8 +- src/vt/objgroup/manager.h | 23 +--- src/vt/objgroup/manager.impl.h | 29 +---- src/vt/objgroup/manager.static.h | 100 +++++++++++++++--- src/vt/runnable/make_runnable.h | 4 +- src/vt/runnable/runnable.cc | 3 + .../messaging/serialized_messenger.impl.h | 55 +++++++--- 12 files changed, 166 insertions(+), 342 deletions(-) delete mode 100644 src/vt/objgroup/dispatch/dispatch.h delete mode 100644 src/vt/objgroup/dispatch/dispatch.impl.h delete mode 100644 src/vt/objgroup/dispatch/dispatch_base.h diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index 5559605f8f..9ab3235191 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -495,10 +495,16 @@ EventType ActiveMessenger::doMessageSend( } else { recordLBDataCommForSend(dest, base, base.size()); - runnable::makeRunnable(base, true, envelopeGetHandler(msg->env), dest) - .withTDEpochFromMsg(is_term) - .withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_) - .enqueue(); + auto han = envelopeGetHandler(msg->env); + bool const is_obj = HandlerManagerType::isHandlerObjGroup(han); + if (is_obj) { + objgroup::dispatchObjGroup(base, han, dest, nullptr); + } else { + runnable::makeRunnable(base, true, envelopeGetHandler(msg->env), dest) + .withTDEpochFromMsg(is_term) + .withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_) + .enqueue(); + } } return no_event; } @@ -955,8 +961,7 @@ void ActiveMessenger::prepareActiveMsgToRun( bool const is_obj = HandlerManagerType::isHandlerObjGroup(handler); if (is_obj) { - vtAbortIf(cont != nullptr, "Must be nullptr"); - objgroup::dispatchObjGroup(base, handler); + objgroup::dispatchObjGroup(base, handler, from_node, cont); } else { runnable::makeRunnable(base, not is_term, handler, from_node) .withContinuation(cont) diff --git a/src/vt/objgroup/dispatch/dispatch.h b/src/vt/objgroup/dispatch/dispatch.h deleted file mode 100644 index 417640fe50..0000000000 --- a/src/vt/objgroup/dispatch/dispatch.h +++ /dev/null @@ -1,77 +0,0 @@ -/* -//@HEADER -// ***************************************************************************** -// -// dispatch.h -// DARMA/vt => Virtual Transport -// -// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC -// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. -// Government retains certain rights in this software. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// * Neither the name of the copyright holder nor the names of its -// contributors may be used to endorse or promote products derived from this -// software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. -// -// Questions? Contact darma@sandia.gov -// -// ***************************************************************************** -//@HEADER -*/ - -#if !defined INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_H -#define INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_H - -#include "vt/config.h" -#include "vt/objgroup/common.h" -#include "vt/objgroup/dispatch/dispatch_base.h" -#include "vt/messaging/message/smart_ptr.h" - -namespace vt { namespace objgroup { namespace dispatch { - -template -struct Dispatch final : DispatchBase { - Dispatch() = delete; - Dispatch(Dispatch const&) = delete; - Dispatch(Dispatch&&) = default; - - Dispatch(ObjGroupProxyType in_proxy, ObjT* in_obj) - : DispatchBase(in_proxy), obj_(in_obj) - { } - - virtual ~Dispatch() = default; - - void run(HandlerType han, BaseMessage* msg) override; - void* objPtr() const override { return obj_; } - -private: - ObjT* obj_ = nullptr; -}; - -}}} /* end namespace vt::objgroup::dispatch */ - -#include "vt/objgroup/dispatch/dispatch.impl.h" - -#endif /*INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_H*/ diff --git a/src/vt/objgroup/dispatch/dispatch.impl.h b/src/vt/objgroup/dispatch/dispatch.impl.h deleted file mode 100644 index 4623cad82f..0000000000 --- a/src/vt/objgroup/dispatch/dispatch.impl.h +++ /dev/null @@ -1,68 +0,0 @@ -/* -//@HEADER -// ***************************************************************************** -// -// dispatch.impl.h -// DARMA/vt => Virtual Transport -// -// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC -// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. -// Government retains certain rights in this software. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// * Neither the name of the copyright holder nor the names of its -// contributors may be used to endorse or promote products derived from this -// software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. -// -// Questions? Contact darma@sandia.gov -// -// ***************************************************************************** -//@HEADER -*/ - -#if !defined INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_IMPL_H -#define INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_IMPL_H - -#include "vt/config.h" -#include "vt/objgroup/common.h" -#include "vt/registry/auto/auto_registry.h" - -namespace vt { namespace objgroup { namespace dispatch { - -template -void Dispatch::run(HandlerType han, BaseMessage* msg) { - //using ActiveFnType = void(ObjT::*)(vt::BaseMessage*); - vtAssert(obj_ != nullptr, "Must have a valid object"); - - auto tmsg = static_cast(msg); - auto m = promoteMsg(tmsg); - runnable::makeRunnable(m, true, han, theContext()->getNode()) - .withObjGroup(obj_) - .withTDEpochFromMsg() - .enqueue(); -} - -}}} /* end namespace vt::objgroup::dispatch */ - -#endif /*INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_IMPL_H*/ diff --git a/src/vt/objgroup/dispatch/dispatch_base.h b/src/vt/objgroup/dispatch/dispatch_base.h deleted file mode 100644 index 42bc9b63ab..0000000000 --- a/src/vt/objgroup/dispatch/dispatch_base.h +++ /dev/null @@ -1,86 +0,0 @@ -/* -//@HEADER -// ***************************************************************************** -// -// dispatch_base.h -// DARMA/vt => Virtual Transport -// -// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC -// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. -// Government retains certain rights in this software. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// * Neither the name of the copyright holder nor the names of its -// contributors may be used to endorse or promote products derived from this -// software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. -// -// Questions? Contact darma@sandia.gov -// -// ***************************************************************************** -//@HEADER -*/ - -#if !defined INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_BASE_H -#define INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_BASE_H - -#include "vt/config.h" -#include "vt/objgroup/common.h" -#include "vt/messaging/message/smart_ptr.h" - -namespace vt { namespace objgroup { namespace dispatch { - -/* - * DispatchBase implements type erasure to dispatch to a obj group without - * encoding the message directly in the message (as an alternative to using a - * std::function) - */ - -struct DispatchBase { - explicit DispatchBase(ObjGroupProxyType in_proxy) - : proxy_(in_proxy) - { } - - virtual ~DispatchBase() = default; - - /* - * Dispatch to the handler; the base is closed around the proper object - * pointer that is type-erased here - */ - virtual void run(HandlerType han, BaseMessage* msg) = 0; - virtual void* objPtr() const = 0; - - ObjGroupProxyType proxy() const { return proxy_; } - - template - void serialize(Serializer& s) { - s | proxy_; - } - -private: - ObjGroupProxyType proxy_ = no_obj_group; -}; - -}}} /* end namespace vt::objgroup::dispatch */ - -#endif /*INCLUDED_VT_OBJGROUP_DISPATCH_DISPATCH_BASE_H*/ diff --git a/src/vt/objgroup/manager.cc b/src/vt/objgroup/manager.cc index a3932db0bb..18888093dc 100644 --- a/src/vt/objgroup/manager.cc +++ b/src/vt/objgroup/manager.cc @@ -83,32 +83,6 @@ ObjGroupProxyType ObjGroupManager::getProxy(ObjGroupProxyType proxy) { return proxy; } -void ObjGroupManager::dispatch(MsgSharedPtr msg, HandlerType han) { - // Extract the control-bit sequence from the handler - auto const ctrl = HandlerManager::getHandlerControl(han); - vt_debug_print( - verbose, objgroup, - "dispatch: ctrl={:x}, han={:x}\n", ctrl, han - ); - auto const node = 0; - auto const proxy = proxy::ObjGroupProxy::create(ctrl, node, true); - auto dispatch_iter = dispatch_.find(proxy); - vt_debug_print( - normal, objgroup, - "dispatch: try ctrl={:x}, han={:x}, has dispatch={}\n", - ctrl, han, dispatch_iter != dispatch_.end() - ); - if (dispatch_iter == dispatch_.end()) { - auto const epoch = envelopeGetEpoch(msg->env); - if (epoch != no_epoch and epoch != term::any_epoch_sentinel) { - theTerm()->produce(epoch); - } - pending_[proxy].push_back(msg); - } else { - dispatch_iter->second->run(han,msg.get()); - } -} - ObjGroupProxyType ObjGroupManager::makeCollectiveImpl( std::string const& label, HolderBasePtrType base, void* obj_ptr ) { @@ -163,12 +137,12 @@ elm::ElementIDStruct ObjGroupManager::getNextElm(ObjGroupProxyType proxy) { } } -void dispatchObjGroup(MsgSharedPtr msg, HandlerType han) { - vt_debug_print( - verbose, objgroup, - "dispatchObjGroup: han={:x}\n", han - ); - return theObjGroup()->dispatch(msg,han); +std::unordered_map>& getObjs() { + return theObjGroup()->objs_; +} + +std::unordered_map>& getPending() { + return theObjGroup()->pending_; } }} /* end namespace vt::objgroup */ diff --git a/src/vt/objgroup/manager.fwd.h b/src/vt/objgroup/manager.fwd.h index 9b67f7e5d2..c95045240c 100644 --- a/src/vt/objgroup/manager.fwd.h +++ b/src/vt/objgroup/manager.fwd.h @@ -60,7 +60,13 @@ namespace detail { holder::HolderBase* getHolderBase(HandlerType handler); } /* end namespace detail */ -void dispatchObjGroup(MsgSharedPtr msg, HandlerType han); +template +void dispatchObjGroup( + MsgSharedPtr msg, HandlerType han, NodeType from_node, ActionType cont +); + +std::unordered_map>& getObjs(); +std::unordered_map>& getPending(); template messaging::PendingSend send(MsgSharedPtr msg, HandlerType han, NodeType node); diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index a43c107501..571d258593 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -52,7 +52,6 @@ #include "vt/objgroup/holder/holder.h" #include "vt/objgroup/holder/holder_user.h" #include "vt/objgroup/holder/holder_basic.h" -#include "vt/objgroup/dispatch/dispatch.h" #include "vt/messaging/message/message.h" #include "vt/messaging/message/smart_ptr.h" #include "vt/messaging/pending_send.h" @@ -89,11 +88,9 @@ struct ObjGroupManager : runtime::component::Component { using MakeFnType = std::function()>; using HolderBaseType = holder::HolderBase; using HolderBasePtrType = std::unique_ptr; - using DispatchBaseType = dispatch::DispatchBase; - using DispatchBasePtrType = std::unique_ptr; - using MsgContainerType = std::vector>; using PendingSendType = messaging::PendingSend; +public: /** * \internal \brief Construct the ObjGroupManager */ @@ -330,17 +327,6 @@ struct ObjGroupManager : runtime::component::Component { template std::string getLabel(ProxyType proxy) const; - /* - * Dispatch to a live obj group pointer with a handler - */ - /** - * \internal \brief Dispatch message to objgroup - * - * \param[in] msg the message - * \param[in] han the handler to invoke - */ - void dispatch(MsgSharedPtr msg, HandlerType han); - /** * \internal \brief Send a message to an objgroup * @@ -383,7 +369,6 @@ struct ObjGroupManager : runtime::component::Component { template void serialize(SerializerT& s) { s | cur_obj_id_ - | dispatch_ | objs_ | obj_to_proxy_ | pending_ @@ -392,6 +377,8 @@ struct ObjGroupManager : runtime::component::Component { // Friend function to access the holder without including this header file friend holder::HolderBase* detail::getHolderBase(HandlerType handler); + friend std::unordered_map& getObjs(); + friend std::unordered_map>& getPending(); private: /** @@ -451,14 +438,12 @@ struct ObjGroupManager : runtime::component::Component { private: /// The current obj ID, sequential on each node for collective construction ObjGroupIDType cur_obj_id_ = fst_obj_group_id; - /// Function to dispatch to the base class for type-erasure to run handler - std::unordered_map dispatch_; /// Type-erased pointers to the objects held on this node std::unordered_map objs_; /// Reverse lookup map from an object pointer to the proxy std::unordered_map obj_to_proxy_; /// Messages that are pending creation for delivery - std::unordered_map pending_; + std::unordered_map> pending_; /// Map of object groups' labels std::unordered_map labels_; }; diff --git a/src/vt/objgroup/manager.impl.h b/src/vt/objgroup/manager.impl.h index dffb5f7917..dc38c39239 100644 --- a/src/vt/objgroup/manager.impl.h +++ b/src/vt/objgroup/manager.impl.h @@ -52,7 +52,6 @@ #include "vt/objgroup/holder/holder.h" #include "vt/objgroup/holder/holder_user.h" #include "vt/objgroup/holder/holder_basic.h" -#include "vt/objgroup/dispatch/dispatch.h" #include "vt/objgroup/type_registry/registry.h" #include "vt/registry/auto/auto_registry.h" #include "vt/collective/collective_alg.h" @@ -130,15 +129,6 @@ ObjGroupManager::makeCollective(MakeFnType fn, std::string const& label) { template void ObjGroupManager::destroyCollective(ProxyType proxy) { auto const proxy_bits = proxy.getProxy(); - auto iter = dispatch_.find(proxy_bits); - if (iter != dispatch_.end()) { - auto ptr = iter->second->objPtr(); - auto obj_iter = obj_to_proxy_.find(ptr); - if (obj_iter != obj_to_proxy_.end()) { - obj_to_proxy_.erase(obj_iter); - } - dispatch_.erase(iter); - } auto obj_iter = objs_.find(proxy_bits); if (obj_iter != objs_.end()) { objs_.erase(obj_iter); @@ -152,30 +142,15 @@ void ObjGroupManager::destroyCollective(ProxyType proxy) { template void ObjGroupManager::regObjProxy(ObjT* obj, ObjGroupProxyType proxy) { - auto iter = dispatch_.find(proxy); - vtAssertExpr(iter == dispatch_.end()); vt_debug_print( normal, objgroup, "regObjProxy: obj={}, proxy={:x}\n", print_ptr(obj), proxy ); - DispatchBasePtrType b = std::make_unique>(proxy,obj); - dispatch_.emplace( - std::piecewise_construct, - std::forward_as_tuple(proxy), - std::forward_as_tuple(std::move(b)) - ); auto pending_iter = pending_.find(proxy); if (pending_iter != pending_.end()) { - for (auto&& msg : pending_iter->second) { - theSched()->enqueue([msg]{ - auto const handler = envelopeGetHandler(msg->env); - auto const epoch = envelopeGetEpoch(msg->env); - theObjGroup()->dispatch(msg,handler); - if (epoch != no_epoch) { - theTerm()->consume(epoch); - } - }); + for (auto&& pending : pending_iter->second) { + pending(); } pending_.erase(pending_iter); } diff --git a/src/vt/objgroup/manager.static.h b/src/vt/objgroup/manager.static.h index faf6aaa5b3..c7df5e30d5 100644 --- a/src/vt/objgroup/manager.static.h +++ b/src/vt/objgroup/manager.static.h @@ -46,6 +46,7 @@ #include "vt/config.h" #include "vt/objgroup/common.h" +#include "vt/objgroup/proxy/proxy_bits.h" #include "vt/objgroup/holder/holder_base.h" #include "vt/messaging/active.h" #include "vt/runnable/make_runnable.h" @@ -60,18 +61,15 @@ messaging::PendingSend send(MsgSharedPtr msg, HandlerType han, NodeType de if (dest_node != this_node) { return theMsg()->sendMsg(dest_node, han,msg, no_tag); } else { - // Get the current epoch for the message - auto const cur_epoch = theMsg()->setupEpochMsg(msg); - - return messaging::PendingSend{cur_epoch, [msg, han, cur_epoch, this_node](){ - auto holder = detail::getHolderBase(han); - auto const& elm_id = holder->getElmID(); - auto lb_data = &holder->getLBData(); - - runnable::makeRunnable(msg, true, han, this_node) - .withTDEpoch(cur_epoch) - .withLBData(lb_data, elm_id) - .enqueue(); + theMsg()->setupEpochMsg(msg); + envelopeSetHandler(msg->env, han); + return messaging::PendingSend{msg, [](MsgSharedPtr& inner_msg){ + dispatchObjGroup( + inner_msg.template to(), + envelopeGetHandler(inner_msg->env), + theContext()->getNode(), + nullptr + ); }}; } } @@ -89,8 +87,14 @@ void invoke(messaging::MsgPtrThief msg, HandlerType han, NodeType dest_nod ); // this is a local invocation.. no thread required + auto holder = detail::getHolderBase(han); + auto const& elm_id = holder->getElmID(); + auto elm = holder->getPtr(); + auto lb_data = &holder->getLBData(); runnable::makeRunnable(msg.msg_, false, han, this_node) + .withObjGroup(elm) .withTDEpochFromMsg() + .withLBData(lb_data, elm_id) .run(); } @@ -99,6 +103,78 @@ messaging::PendingSend broadcast(MsgSharedPtr msg, HandlerType han) { return theMsg()->broadcastMsg(han, msg); } +namespace detail { + +template +void dispatchImpl( + MsgSharedPtr const& msg, HandlerType han, NodeType from_node, + ActionType cont, ObjT* obj +) { + auto holder = detail::getHolderBase(han); + auto const& elm_id = holder->getElmID(); + auto lb_data = &holder->getLBData(); + runnable::makeRunnable(msg, true, han, from_node) + .withContinuation(cont) + .withObjGroup(obj) + .withLBData(lb_data, elm_id) + .withTDEpochFromMsg() + .enqueue(); +} + +template +void dispatch( + MsgSharedPtr msg, HandlerType han, NodeType from_node, + ActionType cont +) { + // Extract the control-bit sequence from the handler + auto const ctrl = HandlerManager::getHandlerControl(han); + vt_debug_print( + verbose, objgroup, + "dispatch: ctrl={:x}, han={:x}\n", ctrl, han + ); + auto const node = 0; + auto const proxy = proxy::ObjGroupProxy::create(ctrl, node, true); + auto& objs = getObjs(); + auto obj_iter = objs.find(proxy); + vt_debug_print( + normal, objgroup, + "dispatch: try ctrl={:x}, han={:x}, has dispatch={}\n", + ctrl, han, obj_iter != objs.end() + ); + if (obj_iter == objs.end()) { + auto const epoch = envelopeGetEpoch(msg->env); + if (epoch != no_epoch) { + theTerm()->produce(epoch); + } + auto& pending = getPending(); + pending[proxy].emplace_back([=]{ + auto& objs2 = getObjs(); + auto obj_iter2 = objs2.find(proxy); + vtAssert(obj_iter2 != objs2.end(), "Obj must exist"); + detail::dispatchImpl(msg, han, from_node, cont, obj_iter2->second->getPtr()); + if (epoch != no_epoch) { + theTerm()->consume(epoch); + } + }); + } else { + detail::dispatchImpl(msg, han, from_node, cont, obj_iter->second->getPtr()); + } +} + +} /* end namespace detail */ + +template +void dispatchObjGroup( + MsgSharedPtr msg, HandlerType han, NodeType from_node, + ActionType cont +) { + vt_debug_print( + verbose, objgroup, + "dispatchObjGroup: han={:x}\n", han + ); + return detail::dispatch(msg, han, from_node, cont); +} + }} /* end namespace vt::objgroup */ #endif /*INCLUDED_VT_OBJGROUP_MANAGER_STATIC_H*/ diff --git a/src/vt/runnable/make_runnable.h b/src/vt/runnable/make_runnable.h index f370e3f252..499f4d10de 100644 --- a/src/vt/runnable/make_runnable.h +++ b/src/vt/runnable/make_runnable.h @@ -91,7 +91,9 @@ struct RunnableMaker { * \param[in] cont the continuation */ RunnableMaker&& withContinuation(ActionType cont) { - impl_->addContextCont(cont); + if (cont != nullptr) { + impl_->addContextCont(cont); + } return std::move(*this); } diff --git a/src/vt/runnable/runnable.cc b/src/vt/runnable/runnable.cc index 27f68c1abd..bd9193f83c 100644 --- a/src/vt/runnable/runnable.cc +++ b/src/vt/runnable/runnable.cc @@ -56,6 +56,9 @@ namespace vt { namespace runnable { void RunnableNew::setupHandler(HandlerType handler) { using HandlerManagerType = HandlerManager; + bool const is_obj = HandlerManagerType::isHandlerObjGroup(handler); + vtAssert(not is_obj, "Must not be object"); + bool const is_auto = HandlerManagerType::isHandlerAuto(handler); bool const is_functor = HandlerManagerType::isHandlerFunctor(handler); diff --git a/src/vt/serialization/messaging/serialized_messenger.impl.h b/src/vt/serialization/messaging/serialized_messenger.impl.h index fa65da93d4..5b9c296ae0 100644 --- a/src/vt/serialization/messaging/serialized_messenger.impl.h +++ b/src/vt/serialization/messaging/serialized_messenger.impl.h @@ -53,6 +53,7 @@ #include "vt/serialization/messaging/serialized_messenger.h" #include "vt/messaging/envelope/envelope_set.h" // envelopeSetRef #include "vt/runnable/make_runnable.h" +#include "vt/objgroup/manager.fwd.h" #include #include @@ -88,9 +89,16 @@ template auto msg_data = ptr_offset; auto user_msg = deserializeFullMessage(msg_data); - runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) - .withTDEpochFromMsg() - .enqueue(); + bool const is_obj = HandlerManager::isHandlerObjGroup(handler); + if (is_obj) { + objgroup::dispatchObjGroup( + user_msg.template to(), handler, sys_msg->from_node, nullptr + ); + } else { + runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) + .withTDEpochFromMsg() + .enqueue(); + } } template @@ -132,10 +140,17 @@ template handler, recv_tag, envelopeGetEpoch(msg->env) ); - runnable::makeRunnable(msg, true, handler, node) - .withTDEpoch(epoch, not is_valid_epoch) - .withContinuation(action) - .enqueue(); + bool const is_obj = HandlerManager::isHandlerObjGroup(handler); + if (is_obj) { + objgroup::dispatchObjGroup( + msg.template to(), handler, node, action + ); + } else { + runnable::makeRunnable(msg, true, handler, node) + .withTDEpoch(epoch, not is_valid_epoch) + .withContinuation(action) + .enqueue(); + } if (is_valid_epoch) { theTerm()->consume(epoch); @@ -174,9 +189,16 @@ template print_ptr(user_msg.get()), envelopeGetEpoch(sys_msg->env) ); - runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) - .withTDEpochFromMsg() - .enqueue(); + bool const is_obj = HandlerManager::isHandlerObjGroup(handler); + if (is_obj) { + objgroup::dispatchObjGroup( + user_msg.template to(), handler, sys_msg->from_node, nullptr + ); + } else { + runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) + .withTDEpochFromMsg() + .enqueue(); + } } template @@ -407,9 +429,16 @@ template auto base_msg = user_msg.template to(); return messaging::PendingSend(base_msg, [=](MsgPtr in) { - runnable::makeRunnable(user_msg, true, typed_handler, node) - .withTDEpochFromMsg() - .enqueue(); + bool const is_obj = HandlerManager::isHandlerObjGroup(typed_handler); + if (is_obj) { + objgroup::dispatchObjGroup( + user_msg.template to(), typed_handler, node, nullptr + ); + } else { + runnable::makeRunnable(user_msg, true, typed_handler, node) + .withTDEpochFromMsg() + .enqueue(); + } }); } };