Skip to content

Commit

Permalink
#702: reduce: make reduce a delayed call by returning PendingSend for…
Browse files Browse the repository at this point in the history
… collective reduce, collections, and objgroups

This makes returning a sequentialID impossible. In order for collections to order properly, add a reduceImmediate public overload set that takes the same parameters as reduce but returns the sequential id instead of a PendingSend. This is used internally by the new reduce and collection reduceMsgExprImpl, but can be called by the user.
  • Loading branch information
nmm0 committed Jul 15, 2020
1 parent 9571dba commit 708deb7
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 24 deletions.
2 changes: 2 additions & 0 deletions src/vt/collective/reduce/reduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "vt/messaging/message.h"
#include "vt/collective/tree/tree.h"
#include "vt/utils/hash/hash_tuple.h"
#include "vt/messaging/pending_send.h"

#include <tuple>
#include <unordered_map>
Expand All @@ -80,6 +81,7 @@ namespace vt { namespace collective { namespace reduce {
struct Reduce : virtual collective::tree::Tree {
using ReduceStateType = ReduceState;
using ReduceNumType = typename ReduceStateType::ReduceNumType;
using PendingSendType = messaging::PendingSend;

/**
* \internal \brief Construct a new reducer instance
Expand Down
6 changes: 5 additions & 1 deletion src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "vt/objgroup/dispatch/dispatch.h"
#include "vt/messaging/message/message.h"
#include "vt/messaging/message/smart_ptr.h"
#include "vt/messaging/pending_send.h"

#include <memory>
#include <functional>
Expand Down Expand Up @@ -93,6 +94,7 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
using DispatchBasePtrType = std::unique_ptr<DispatchBaseType>;
using MsgContainerType = std::vector<MsgSharedPtr<ShortMessage>>;
using BaseProxyListType = std::set<ObjGroupProxyType>;
using PendingSendType = messaging::PendingSend;

/**
* \internal \brief Construct the ObjGroupManager
Expand Down Expand Up @@ -231,9 +233,11 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
* \param[in] proxy proxy to the object group
* \param[in] msg reduction message
* \param[in] stamp stamp to identify reduction across nodes
*
* \return the PendingSend corresponding to the reduce
*/
template <typename ObjT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduce(
PendingSendType reduce(
ProxyType<ObjT> proxy, MsgSharedPtr<MsgT> msg,
collective::reduce::ReduceStamp const& stamp
);
Expand Down
2 changes: 1 addition & 1 deletion src/vt/objgroup/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void ObjGroupManager::broadcast(MsgSharedPtr<MsgT> msg, HandlerType han) {
}

template <typename ObjT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void ObjGroupManager::reduce(
ObjGroupManager::PendingSendType ObjGroupManager::reduce(
ProxyType<ObjT> proxy, MsgSharedPtr<MsgT> msg,
collective::reduce::ReduceStamp const& stamp
) {
Expand Down
15 changes: 12 additions & 3 deletions src/vt/objgroup/proxy/proxy_objgroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "vt/utils/static_checks/msg_ptr.h"
#include "vt/rdmahandle/handle.fwd.h"
#include "vt/rdmahandle/handle_set.fwd.h"
#include "vt/messaging/pending_send.h"

namespace vt { namespace objgroup { namespace proxy {

Expand All @@ -76,6 +77,8 @@ template <typename ObjT>
struct Proxy {
using ReduceStamp = collective::reduce::ReduceStamp;

using PendingSendType = messaging::PendingSend;

Proxy() = default;
Proxy(Proxy const&) = default;
Proxy(Proxy&&) = default;
Expand Down Expand Up @@ -126,6 +129,8 @@ struct Proxy {
* \param[in] msg the reduction message
* \param[in] cb the callback to trigger after the reduction is finished
* \param[in] stamp the stamp to identify the reduction
*
* \return the PendingSend associated with the reduce
*/
template <
typename OpT = collective::None,
Expand All @@ -135,7 +140,7 @@ struct Proxy {
MsgT, OpT, collective::reduce::operators::ReduceCallback<MsgT>
>
>
void reduce(
PendingSendType reduce(
MsgPtrT msg, Callback<MsgT> cb, ReduceStamp stamp = ReduceStamp{}
) const;

Expand All @@ -145,6 +150,8 @@ struct Proxy {
*
* \param[in] msg the reduction message
* \param[in] stamp the stamp to identify the reduction
*
* \return the PendingSend associated with the reduce
*/
template <
typename OpT = collective::None,
Expand All @@ -153,21 +160,23 @@ struct Proxy {
typename MsgT = typename util::MsgPtrType<MsgPtrT>::MsgType,
ActiveTypedFnType<MsgT> *f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
void reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const;
PendingSendType reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const;

/**
* \brief Reduce over the objgroup instance on each node with target specified
* in reduction message type.
*
* \param[in] msg the reduction message
* \param[in] stamp the stamp to identify the reduction
*
* \return the PendingSend associated with the reduce
*/
template <
typename MsgPtrT,
typename MsgT = typename util::MsgPtrType<MsgPtrT>::MsgType,
ActiveTypedFnType<MsgT> *f
>
void reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const;
PendingSendType reduce(MsgPtrT msg, ReduceStamp stamp = ReduceStamp{}) const;

/**
* \brief Get raw pointer to the local object instance residing on the current
Expand Down
6 changes: 3 additions & 3 deletions src/vt/objgroup/proxy/proxy_objgroup.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ template <typename ObjT>
template <
typename OpT, typename MsgPtrT, typename MsgT, ActiveTypedFnType<MsgT> *f
>
void Proxy<ObjT>::reduce(
typename Proxy<ObjT>::PendingSendType Proxy<ObjT>::reduce(
MsgPtrT inmsg, Callback<MsgT> cb, ReduceStamp stamp
) const {
auto proxy = Proxy<ObjT>(*this);
Expand All @@ -97,15 +97,15 @@ template <
typename OpT, typename FunctorT, typename MsgPtrT, typename MsgT,
ActiveTypedFnType<MsgT> *f
>
void Proxy<ObjT>::reduce(MsgPtrT inmsg, ReduceStamp stamp) const {
typename Proxy<ObjT>::PendingSendType Proxy<ObjT>::reduce(MsgPtrT inmsg, ReduceStamp stamp) const {
auto proxy = Proxy<ObjT>(*this);
MsgPtr<MsgT> msg = promoteMsg(static_cast<MsgT*>(inmsg));
return theObjGroup()->reduce<ObjT, MsgT, f>(proxy,msg,stamp);
}

template <typename ObjT>
template <typename MsgPtrT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void Proxy<ObjT>::reduce(MsgPtrT inmsg, ReduceStamp stamp) const {
typename Proxy<ObjT>::PendingSendType Proxy<ObjT>::reduce(MsgPtrT inmsg, ReduceStamp stamp) const {
auto proxy = Proxy<ObjT>(*this);
MsgPtr<MsgT> msg = promoteMsg(inmsg);
return theObjGroup()->reduce<ObjT, MsgT, f>(proxy,msg,stamp);
Expand Down
16 changes: 12 additions & 4 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,11 @@ struct CollectionManager
* the associated handler (if a callback is specified on a particular node,
* the root will run the handler that triggers the callback at the appropriate
* location)
*
* \return a PendingSend corresponding to the reduce
*/
template <typename ColT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceMsg(
messaging::PendingSend reduceMsg(
CollectionProxyWrapType<ColT> const& proxy,
MsgT *const msg, ReduceStamp stamp = ReduceStamp{},
NodeType root_node = uninitialized_destination
Expand All @@ -768,9 +770,11 @@ struct CollectionManager
* \param[in] msg the reduce message
* \param[in] stamp the reduce stamp
* \param[in] idx the index of collection element being reduced
*
* \return a PendingSend corresponding to the reduce
*/
template <typename ColT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceMsg(
messaging::PendingSend reduceMsg(
CollectionProxyWrapType<ColT> const& proxy,
MsgT *const msg, ReduceStamp stamp, typename ColT::IndexType const& idx
);
Expand All @@ -787,9 +791,11 @@ struct CollectionManager
* the associated handler (if a callback is specified on a particular node,
* the root will run the handler that triggers the callback at the appropriate
* location)
*
* \return a PendingSend corresponding to the reduce
*/
template <typename ColT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceMsgExpr(
messaging::PendingSend reduceMsgExpr(
CollectionProxyWrapType<ColT> const& proxy,
MsgT *const msg, ReduceIdxFuncType<typename ColT::IndexType> expr_fn,
ReduceStamp stamp = ReduceStamp{},
Expand All @@ -805,9 +811,11 @@ struct CollectionManager
* \param[in] expr_fn expression function to pick indices
* \param[in] stamp the reduce stamp
* \param[in] idx the index of collection element being reduced
*
* \return a PendingSend corresponding to the reduce
*/
template <typename ColT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceMsgExpr(
messaging::PendingSend reduceMsgExpr(
CollectionProxyWrapType<ColT> const& proxy,
MsgT *const msg, ReduceIdxFuncType<typename ColT::IndexType> expr_fn,
ReduceStamp stamp, typename ColT::IndexType const& idx
Expand Down
12 changes: 6 additions & 6 deletions src/vt/vrt/collection/reducable/reducable.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct Reducable : BaseProxyT {
MsgT, OpT, collective::reduce::operators::ReduceCallback<MsgT>
>
>
void reduce(
messaging::PendingSend reduce(
MsgT *const msg, Callback<MsgT> cb, ReduceStamp stamp = ReduceStamp{}
) const;

Expand All @@ -85,25 +85,25 @@ struct Reducable : BaseProxyT {
typename MsgT,
ActiveTypedFnType<MsgT> *f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
void reduce(MsgT *const msg, ReduceStamp stamp = ReduceStamp{}) const;
messaging::PendingSend reduce(MsgT *const msg, ReduceStamp stamp = ReduceStamp{}) const;

template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduce(
messaging::PendingSend reduce(
MsgT *const msg, ReduceStamp stamp = ReduceStamp{},
NodeType const& node = uninitialized_destination
) const;

template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceExpr(
messaging::PendingSend reduceExpr(
MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp = ReduceStamp{},
NodeType const& node = uninitialized_destination
) const;

template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduce(MsgT *const msg, ReduceStamp stamp, IndexT const& idx) const;
messaging::PendingSend reduce(MsgT *const msg, ReduceStamp stamp, IndexT const& idx) const;

template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void reduceExpr(
messaging::PendingSend reduceExpr(
MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, IndexT const& idx
) const;
};
Expand Down
12 changes: 6 additions & 6 deletions src/vt/vrt/collection/reducable/reducable.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Reducable<ColT,IndexT,BaseProxyT>::Reducable(VirtualProxyType const in_proxy)

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename OpT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduce(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduce(
MsgT *const msg, Callback<MsgT> cb, ReduceStamp stamp
) const {
auto const proxy = this->getProxy();
Expand All @@ -76,7 +76,7 @@ void Reducable<ColT,IndexT,BaseProxyT>::reduce(

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename OpT, typename FunctorT, typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduce(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduce(
MsgT *const msg, ReduceStamp stamp
) const {
auto const proxy = this->getProxy();
Expand All @@ -86,7 +86,7 @@ void Reducable<ColT,IndexT,BaseProxyT>::reduce(

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduce(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduce(
MsgT *const msg, ReduceStamp stamp, NodeType const& node
) const {
auto const proxy = this->getProxy();
Expand All @@ -95,7 +95,7 @@ void Reducable<ColT,IndexT,BaseProxyT>::reduce(

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduceExpr(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduceExpr(
MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, NodeType const& node
) const {
auto const proxy = this->getProxy();
Expand All @@ -104,7 +104,7 @@ void Reducable<ColT,IndexT,BaseProxyT>::reduceExpr(

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduce(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduce(
MsgT *const msg, ReduceStamp stamp, IndexT const& idx
) const {
auto const proxy = this->getProxy();
Expand All @@ -113,7 +113,7 @@ void Reducable<ColT,IndexT,BaseProxyT>::reduce(

template <typename ColT, typename IndexT, typename BaseProxyT>
template <typename MsgT, ActiveTypedFnType<MsgT> *f>
void Reducable<ColT,IndexT,BaseProxyT>::reduceExpr(
messaging::PendingSend Reducable<ColT,IndexT,BaseProxyT>::reduceExpr(
MsgT *const msg, ReduceIdxFuncType fn, ReduceStamp stamp, IndexT const& idx
) const {
auto const proxy = this->getProxy();
Expand Down

0 comments on commit 708deb7

Please sign in to comment.