Skip to content

Commit

Permalink
#702: chainset: add merging step to chain set
Browse files Browse the repository at this point in the history
  • Loading branch information
nmm0 committed Apr 29, 2020
1 parent 633d3de commit 24f0d39
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
26 changes: 26 additions & 0 deletions src/vt/messaging/collection_chain_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,32 @@ class CollectionChainSet final {
theTerm()->finishedEpoch(epoch);
}


static void mergeStepCollective(CollectionChainSet &a, CollectionChainSet &b,
std::function<PendingSend(Index)> step_action
) {
mergeStepCollective( "", a, b, step_action);
}

static void mergeStepCollective(
std::string const& label,
CollectionChainSet &a, CollectionChainSet &b,
std::function<PendingSend(Index)> step_action
) {
auto epoch = theTerm()->makeEpochCollective(label);
vt::theMsg()->pushEpoch(epoch);

for (auto &entry : a.chains_) {
auto& idx = entry.first;
auto& chaina = entry.second;
auto& chainb = b.chains_[entry.first];
DependentSendChain::mergeChainStep(chaina, chainb, epoch, step_action(idx));
}

vt::theMsg()->popEpoch(epoch);
theTerm()->finishedEpoch(epoch);
}

/**
* \brief The next collective step to execute across all resident elements
* across all nodes.
Expand Down
11 changes: 7 additions & 4 deletions src/vt/messaging/dependent_send_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ struct MergedClosure {
explicit MergedClosure(std::shared_ptr<PendingSend> shared_state)
: shared_state_(shared_state)
{}
MergedClosure(MergedClosure const&) = default;
MergedClosure(MergedClosure&& in) = default;

void operator()() {
shared_state_.release();
shared_state_.reset();
}

private:
Expand Down Expand Up @@ -137,12 +139,13 @@ class DependentSendChain final {
theTerm()->addDependency(a.last_epoch_, new_epoch);
theTerm()->addDependency(b.last_epoch_, new_epoch);

auto closure = MergedClosure(std::make_shared<PendingSend>(std::move(link)));
auto c1 = MergedClosure(std::make_shared<PendingSend>(std::move(link)));
auto c2 = c1;

// closure is intentionally copied here; basically the ref count will go down
// when all actions are completed and execute the PendingSend
theTerm()->addActionUnique(a.last_epoch_, closure);
theTerm()->addActionUnique(b.last_epoch_, closure);
theTerm()->addActionUnique(a.last_epoch_, std::move(c1));
theTerm()->addActionUnique(b.last_epoch_, std::move(c2));

a.last_epoch_ = new_epoch;
b.last_epoch_ = new_epoch;
Expand Down

0 comments on commit 24f0d39

Please sign in to comment.