Skip to content

Commit

Permalink
#702: PendingSend: add new action type for epoch actions. Create a ne…
Browse files Browse the repository at this point in the history
…w constructor that allows construction with the new EpochActionType. Additionally move some messages that are not used externally to private to preserve class invariant. Move the definitions of some non-templated functions to the source file. Make the move constructor noexcept.
  • Loading branch information
nmm0 committed Aug 25, 2020
1 parent 306d713 commit 6dca1da
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 37 deletions.
43 changes: 40 additions & 3 deletions src/vt/messaging/pending_send.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@

namespace vt { namespace messaging {

PendingSend::PendingSend(
MsgSharedPtr<BaseMsgType> const& in_msg, ByteType const& in_msg_size)
: msg_(in_msg.toVirtual<BaseMsgType>()), msg_size_(in_msg_size) {
produceMsg();
}

PendingSend::PendingSend(EpochType ep, EpochActionType const& in_action)
: epoch_action_{in_action}, epoch_produced_(ep) {
if (epoch_produced_ != no_epoch) {
theTerm()->produce(epoch_produced_, 1);
}
}

PendingSend::PendingSend(PendingSend&& in) noexcept
: msg_size_(std::move(in.msg_size_)),
epoch_produced_(std::move(in.epoch_produced_))
{
std::swap(msg_, in.msg_);
std::swap(epoch_action_, in.epoch_action_);
std::swap(send_action_, in.send_action_);
}

void PendingSend::sendMsg() {
if (send_action_ == nullptr) {
theMsg()->doMessageSend(msg_, msg_size_);
Expand All @@ -58,7 +80,7 @@ void PendingSend::sendMsg() {
send_action_ = nullptr;
}

EpochType PendingSend::getProduceEpoch() const {
EpochType PendingSend::getProduceEpochFromMsg() const {
if (msg_ == nullptr or envelopeIsTerm(msg_->env) or
not envelopeIsEpochType(msg_->env)) {
return no_epoch;
Expand All @@ -67,9 +89,8 @@ EpochType PendingSend::getProduceEpoch() const {
return envelopeGetEpoch(msg_->env);
}


void PendingSend::produceMsg() {
epoch_produced_ = getProduceEpoch();
epoch_produced_ = getProduceEpochFromMsg();
if (epoch_produced_ != no_epoch) {
theTerm()->produce(epoch_produced_, 1);
}
Expand All @@ -81,4 +102,20 @@ void PendingSend::consumeMsg() {
}
}

void PendingSend::release() {
bool send_msg = msg_ != nullptr || send_action_ != nullptr;
vtAssert(!send_msg || !epoch_action_, "cannot have both a message and epoch action");
if (epoch_produced_ != no_epoch) {
theMsg()->pushEpoch(epoch_produced_);
}
if (send_msg) {
sendMsg();
} else if ( epoch_action_ ) {
epoch_action_();
}
if (epoch_produced_ != no_epoch) {
theMsg()->popEpoch(epoch_produced_);
}
}

}}
62 changes: 28 additions & 34 deletions src/vt/messaging/pending_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ namespace vt { namespace messaging {
struct PendingSend final {
/// Function for complex action on send---takes a message to operate on
using SendActionType = std::function<void(MsgSharedPtr<BaseMsgType>&)>;
using EpochActionType = std::function<void()>;

/**
* \brief Construct a pending send.
Expand Down Expand Up @@ -137,36 +138,11 @@ struct PendingSend final {
produceMsg();
}

/**
* \brief Get the epoch produced when holder was created
*
* This is required because the epoch on the envelope can change in some cases
* in between when this is created and actually released.
*
* \return the produce epoch
*/
EpochType getProduceEpoch() const;

/**
* \brief Produce on the messages epoch to inhibit early termination
*/
void produceMsg();

/**
* \brief Consume on the messages epoch to inhibit early termination
*/
void consumeMsg();
PendingSend(EpochType ep, EpochActionType const& in_action);

explicit PendingSend(std::nullptr_t) { }
PendingSend(PendingSend&& in)
: msg_(std::move(in.msg_)),
msg_size_(std::move(in.msg_size_)),
send_action_(std::move(in.send_action_)),
epoch_produced_(std::move(in.epoch_produced_))
{
in.msg_ = nullptr;
in.send_action_ = nullptr;
}
PendingSend(PendingSend&& in) noexcept;

PendingSend(const PendingSend&) = delete;
PendingSend& operator=(PendingSend&& in) = delete;
PendingSend& operator=(PendingSend& in) = delete;
Expand All @@ -189,21 +165,39 @@ struct PendingSend final {
/**
* \brief Release the message, run action if needed
*/
void release() {
if (msg_ != nullptr || send_action_ != nullptr) {
sendMsg();
}
}
void release();

private:

/**
* \brief Get the epoch produced when holder was created
*
* This is required because the epoch on the envelope can change in some cases
* in between when this is created and actually released.
*
* \return the produce epoch
*/
EpochType getProduceEpochFromMsg() const;

/**
* \brief Produce on the messages epoch to inhibit early termination
*/
void produceMsg();

/**
* \brief Consume on the messages epoch to inhibit early termination
*/
void consumeMsg();

/// Send the message saved directly or trigger the lambda for
/// specialized sends from the pending holder
void sendMsg();

private:
MsgPtr<BaseMsgType> msg_ = nullptr;
ByteType msg_size_ = no_byte;
SendActionType send_action_ = nullptr;
SendActionType send_action_ = {};
EpochActionType epoch_action_ = {};
EpochType epoch_produced_ = no_epoch;
};

Expand Down

0 comments on commit 6dca1da

Please sign in to comment.