From 708deb73dab12fd2db6197c3bdac9d0718ee8735 Mon Sep 17 00:00:00 2001 From: Nicolas Morales Date: Thu, 27 Feb 2020 16:10:03 -0800 Subject: [PATCH] #702: reduce: make reduce a delayed call by returning PendingSend for collective reduce, collections, and objgroups This makes returning a sequentialID impossible. In order for collections to order properly, add a reduceImmediate public overload set that takes the same parameters as reduce but returns the sequential id instead of a PendingSend. This is used internally by the new reduce and collection reduceMsgExprImpl, but can be called by the user. --- src/vt/collective/reduce/reduce.h | 2 ++ src/vt/objgroup/manager.h | 6 +++++- src/vt/objgroup/manager.impl.h | 2 +- src/vt/objgroup/proxy/proxy_objgroup.h | 15 ++++++++++++--- src/vt/objgroup/proxy/proxy_objgroup.impl.h | 6 +++--- src/vt/vrt/collection/manager.h | 16 ++++++++++++---- src/vt/vrt/collection/reducable/reducable.h | 12 ++++++------ src/vt/vrt/collection/reducable/reducable.impl.h | 12 ++++++------ 8 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/vt/collective/reduce/reduce.h b/src/vt/collective/reduce/reduce.h index aeeebb3e54..e38f18888b 100644 --- a/src/vt/collective/reduce/reduce.h +++ b/src/vt/collective/reduce/reduce.h @@ -59,6 +59,7 @@ #include "vt/messaging/message.h" #include "vt/collective/tree/tree.h" #include "vt/utils/hash/hash_tuple.h" +#include "vt/messaging/pending_send.h" #include #include @@ -80,6 +81,7 @@ namespace vt { namespace collective { namespace reduce { struct Reduce : virtual collective::tree::Tree { using ReduceStateType = ReduceState; using ReduceNumType = typename ReduceStateType::ReduceNumType; + using PendingSendType = messaging::PendingSend; /** * \internal \brief Construct a new reducer instance diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index 0214fb2293..0082dcd510 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -56,6 +56,7 @@ #include "vt/objgroup/dispatch/dispatch.h" #include "vt/messaging/message/message.h" #include "vt/messaging/message/smart_ptr.h" +#include "vt/messaging/pending_send.h" #include #include @@ -93,6 +94,7 @@ struct ObjGroupManager : runtime::component::Component { using DispatchBasePtrType = std::unique_ptr; using MsgContainerType = std::vector>; using BaseProxyListType = std::set; + using PendingSendType = messaging::PendingSend; /** * \internal \brief Construct the ObjGroupManager @@ -231,9 +233,11 @@ struct ObjGroupManager : runtime::component::Component { * \param[in] proxy proxy to the object group * \param[in] msg reduction message * \param[in] stamp stamp to identify reduction across nodes + * + * \return the PendingSend corresponding to the reduce */ template *f> - void reduce( + PendingSendType reduce( ProxyType proxy, MsgSharedPtr msg, collective::reduce::ReduceStamp const& stamp ); diff --git a/src/vt/objgroup/manager.impl.h b/src/vt/objgroup/manager.impl.h index edfb9f1b9f..93ee429032 100644 --- a/src/vt/objgroup/manager.impl.h +++ b/src/vt/objgroup/manager.impl.h @@ -287,7 +287,7 @@ void ObjGroupManager::broadcast(MsgSharedPtr msg, HandlerType han) { } template *f> -void ObjGroupManager::reduce( +ObjGroupManager::PendingSendType ObjGroupManager::reduce( ProxyType proxy, MsgSharedPtr msg, collective::reduce::ReduceStamp const& stamp ) { diff --git a/src/vt/objgroup/proxy/proxy_objgroup.h b/src/vt/objgroup/proxy/proxy_objgroup.h index ea167ca232..7b8c2e36a4 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.h @@ -58,6 +58,7 @@ #include "vt/utils/static_checks/msg_ptr.h" #include "vt/rdmahandle/handle.fwd.h" #include "vt/rdmahandle/handle_set.fwd.h" +#include "vt/messaging/pending_send.h" namespace vt { namespace objgroup { namespace proxy { @@ -76,6 +77,8 @@ template struct Proxy { using ReduceStamp = collective::reduce::ReduceStamp; + using PendingSendType = messaging::PendingSend; + Proxy() = default; Proxy(Proxy const&) = default; Proxy(Proxy&&) = default; @@ -126,6 +129,8 @@ struct Proxy { * \param[in] msg the reduction message * \param[in] cb the callback to trigger after the reduction is finished * \param[in] stamp the stamp to identify the reduction + * + * \return the PendingSend associated with the reduce */ template < typename OpT = collective::None, @@ -135,7 +140,7 @@ struct Proxy { MsgT, OpT, collective::reduce::operators::ReduceCallback > > - void reduce( + PendingSendType reduce( MsgPtrT msg, Callback cb, ReduceStamp stamp = ReduceStamp{} ) const; @@ -145,6 +150,8 @@ struct Proxy { * * \param[in] msg the reduction message * \param[in] stamp the stamp to identify the reduction + * + * \return the PendingSend associated with the reduce */ template < typename OpT = collective::None, @@ -153,7 +160,7 @@ struct Proxy { typename MsgT = typename util::MsgPtrType::MsgType, ActiveTypedFnType *f = MsgT::template msgHandler > - void reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const; + PendingSendType reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const; /** * \brief Reduce over the objgroup instance on each node with target specified @@ -161,13 +168,15 @@ struct Proxy { * * \param[in] msg the reduction message * \param[in] stamp the stamp to identify the reduction + * + * \return the PendingSend associated with the reduce */ template < typename MsgPtrT, typename MsgT = typename util::MsgPtrType::MsgType, ActiveTypedFnType *f > - void reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const; + PendingSendType reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const; /** * \brief Get raw pointer to the local object instance residing on the current diff --git a/src/vt/objgroup/proxy/proxy_objgroup.impl.h b/src/vt/objgroup/proxy/proxy_objgroup.impl.h index cd19d975f2..1a44d77887 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.impl.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.impl.h @@ -80,7 +80,7 @@ template template < typename OpT, typename MsgPtrT, typename MsgT, ActiveTypedFnType *f > -void Proxy::reduce( +typename Proxy::PendingSendType Proxy::reduce( MsgPtrT inmsg, Callback cb, ReduceStamp stamp ) const { auto proxy = Proxy(*this); @@ -97,7 +97,7 @@ template < typename OpT, typename FunctorT, typename MsgPtrT, typename MsgT, ActiveTypedFnType *f > -void Proxy::reduce(MsgPtrT inmsg, ReduceStamp stamp) const { +typename Proxy::PendingSendType Proxy::reduce(MsgPtrT inmsg, ReduceStamp stamp) const { auto proxy = Proxy(*this); MsgPtr msg = promoteMsg(static_cast(inmsg)); return theObjGroup()->reduce(proxy,msg,stamp); @@ -105,7 +105,7 @@ void Proxy::reduce(MsgPtrT inmsg, ReduceStamp stamp) const { template template *f> -void Proxy::reduce(MsgPtrT inmsg, ReduceStamp stamp) const { +typename Proxy::PendingSendType Proxy::reduce(MsgPtrT inmsg, ReduceStamp stamp) const { auto proxy = Proxy(*this); MsgPtr msg = promoteMsg(inmsg); return theObjGroup()->reduce(proxy,msg,stamp); diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index f58f8a6743..a750fca793 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -753,9 +753,11 @@ struct CollectionManager * the associated handler (if a callback is specified on a particular node, * the root will run the handler that triggers the callback at the appropriate * location) + * + * \return a PendingSend corresponding to the reduce */ template *f> - void reduceMsg( + messaging::PendingSend reduceMsg( CollectionProxyWrapType const& proxy, MsgT *const msg, ReduceStamp stamp = ReduceStamp{}, NodeType root_node = uninitialized_destination @@ -768,9 +770,11 @@ struct CollectionManager * \param[in] msg the reduce message * \param[in] stamp the reduce stamp * \param[in] idx the index of collection element being reduced + * + * \return a PendingSend corresponding to the reduce */ template *f> - void reduceMsg( + messaging::PendingSend reduceMsg( CollectionProxyWrapType const& proxy, MsgT *const msg, ReduceStamp stamp, typename ColT::IndexType const& idx ); @@ -787,9 +791,11 @@ struct CollectionManager * the associated handler (if a callback is specified on a particular node, * the root will run the handler that triggers the callback at the appropriate * location) + * + * \return a PendingSend corresponding to the reduce */ template *f> - void reduceMsgExpr( + messaging::PendingSend reduceMsgExpr( CollectionProxyWrapType const& proxy, MsgT *const msg, ReduceIdxFuncType expr_fn, ReduceStamp stamp = ReduceStamp{}, @@ -805,9 +811,11 @@ struct CollectionManager * \param[in] expr_fn expression function to pick indices * \param[in] stamp the reduce stamp * \param[in] idx the index of collection element being reduced + * + * \return a PendingSend corresponding to the reduce */ template *f> - void reduceMsgExpr( + messaging::PendingSend reduceMsgExpr( CollectionProxyWrapType const& proxy, MsgT *const msg, ReduceIdxFuncType expr_fn, ReduceStamp stamp, typename ColT::IndexType const& idx diff --git a/src/vt/vrt/collection/reducable/reducable.h b/src/vt/vrt/collection/reducable/reducable.h index ababfaa00e..40a2f7709f 100644 --- a/src/vt/vrt/collection/reducable/reducable.h +++ b/src/vt/vrt/collection/reducable/reducable.h @@ -75,7 +75,7 @@ struct Reducable : BaseProxyT { MsgT, OpT, collective::reduce::operators::ReduceCallback > > - void reduce( + messaging::PendingSend reduce( MsgT *const msg, Callback cb, ReduceStamp stamp = ReduceStamp{} ) const; @@ -85,25 +85,25 @@ struct Reducable : BaseProxyT { typename MsgT, ActiveTypedFnType *f = MsgT::template msgHandler > - void reduce(MsgT *const msg, ReduceStamp stamp = ReduceStamp{}) const; + messaging::PendingSend reduce(MsgT *const msg, ReduceStamp stamp = ReduceStamp{}) const; template *f> - void reduce( + messaging::PendingSend reduce( MsgT *const msg, ReduceStamp stamp = ReduceStamp{}, NodeType const& node = uninitialized_destination ) const; template *f> - void reduceExpr( + messaging::PendingSend reduceExpr( MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp = ReduceStamp{}, NodeType const& node = uninitialized_destination ) const; template *f> - void reduce(MsgT *const msg, ReduceStamp stamp, IndexT const& idx) const; + messaging::PendingSend reduce(MsgT *const msg, ReduceStamp stamp, IndexT const& idx) const; template *f> - void reduceExpr( + messaging::PendingSend reduceExpr( MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, IndexT const& idx ) const; }; diff --git a/src/vt/vrt/collection/reducable/reducable.impl.h b/src/vt/vrt/collection/reducable/reducable.impl.h index 13fea99f34..51d5387fac 100644 --- a/src/vt/vrt/collection/reducable/reducable.impl.h +++ b/src/vt/vrt/collection/reducable/reducable.impl.h @@ -60,7 +60,7 @@ Reducable::Reducable(VirtualProxyType const in_proxy) template template *f> -void Reducable::reduce( +messaging::PendingSend Reducable::reduce( MsgT *const msg, Callback cb, ReduceStamp stamp ) const { auto const proxy = this->getProxy(); @@ -76,7 +76,7 @@ void Reducable::reduce( template template *f> -void Reducable::reduce( +messaging::PendingSend Reducable::reduce( MsgT *const msg, ReduceStamp stamp ) const { auto const proxy = this->getProxy(); @@ -86,7 +86,7 @@ void Reducable::reduce( template template *f> -void Reducable::reduce( +messaging::PendingSend Reducable::reduce( MsgT *const msg, ReduceStamp stamp, NodeType const& node ) const { auto const proxy = this->getProxy(); @@ -95,7 +95,7 @@ void Reducable::reduce( template template *f> -void Reducable::reduceExpr( +messaging::PendingSend Reducable::reduceExpr( MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, NodeType const& node ) const { auto const proxy = this->getProxy(); @@ -104,7 +104,7 @@ void Reducable::reduceExpr( template template *f> -void Reducable::reduce( +messaging::PendingSend Reducable::reduce( MsgT *const msg, ReduceStamp stamp, IndexT const& idx ) const { auto const proxy = this->getProxy(); @@ -113,7 +113,7 @@ void Reducable::reduce( template template *f> -void Reducable::reduceExpr( +messaging::PendingSend Reducable::reduceExpr( MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, IndexT const& idx ) const { auto const proxy = this->getProxy();