Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
#672: docs: write doxygen for TD
Browse files Browse the repository at this point in the history
lifflander committed Jun 17, 2020
1 parent f516d7b commit e91da82
Showing 1 changed file with 402 additions and 8 deletions.
410 changes: 402 additions & 8 deletions src/vt/termination/termination.h
Original file line number Diff line number Diff line change
@@ -74,6 +74,17 @@ namespace vt { namespace term {

using DijkstraScholtenTerm = term::ds::StateDS;

/**
* \struct TerminationDetector
*
* \brief Detect global termination and of subsets of work
*
* Implements distributed algorithms to termination detection across the entire
* VT runtime and for subset of work, encapsulated in an epoch. Ships with two
* algorithms: 4-counter wave-based termination for large collective epochs;
* and, Dijkstra-Scholten parental responsibility termination for rooted
* epochs. Epochs may have other epochs nested within them, forming a graph.
*/
struct TerminationDetector :
runtime::component::Component<TerminationDetector>,
TermAction, collective::tree::Tree, DijkstraScholtenTerm, TermInterface
@@ -88,7 +99,11 @@ struct TerminationDetector :
using EpochGraph = termination::graph::EpochGraph;
using EpochGraphMsg = termination::graph::EpochGraphMsg<EpochGraph>;

/**
* \internal \brief Construct a termination detector
*/
TerminationDetector();

virtual ~TerminationDetector() {}

std::string name() override { return "TerminationDetector"; }
@@ -99,43 +114,145 @@ struct TerminationDetector :
* termination, send(..) for Dijkstra-Scholten parental responsibility TD
*
***************************************************************************/

/**
* \brief Produce on an epoch
*
* \param[in] epoch the epoch to produce; if empty, produce on global epoch
* \param[in] num_units number of units to produce
* \param[in] node the node where this unit will be consumed (optional)
*/
void produce(
EpochType epoch = any_epoch_sentinel, TermCounterType num_units = 1,
NodeType node = uninitialized_destination
);

/**
* \brief Consume on an epoch
*
* \param[in] epoch the epoch to consume; if empty, consume on global epoch
* \param[in] num_units number of units to consume
* \param[in] node the node where this unit was produced (optional)
*/
void consume(
EpochType epoch = any_epoch_sentinel, TermCounterType num_units = 1,
NodeType node = uninitialized_destination
);

/**
* \internal \brief Special produce for hang detection
*/
inline void hangDetectSend() { hang_.l_prod++; }

/**
* \internal \brief Special consume for hang detection
*/
inline void hangDetectRecv() { hang_.l_cons++; }
/***************************************************************************/

friend struct ds::StateDS;
friend struct TermState;
friend struct EpochDependency;

/**
* \brief Check if an epoch is rooted
*
* \param[in] epoch the epoch to check
*
* \return whether it is rooted
*/
bool isRooted(EpochType epoch);

/**
* \brief Check if the algorithm behind an epoch is Dijkstra-Scholten parental
* responsibility
*
* \param[in] epoch the epoch to check
*
* \return whether is it DS
*/
bool isDS(EpochType epoch);

/**
* \internal \brief Get or create the DS terminator for an epoch
*
* \param[in] epoch the epoch
* \param[in] is_root whether this is the root (relevant when creating)
*
* \return the DS terminator manager
*/
TermStateDSType* getDSTerm(EpochType epoch, bool is_root = false);

/**
* \brief Reset global termination to start producing/consuming again
*/
void resetGlobalTerm();

/**
* \internal \brief Free an epoch after termination
*
* \param[in] epoch the epoch
*/
void freeEpoch(EpochType const& epoch);

public:
/*
* Interface for scoped epochs using C++ lexical scopes to encapsulate epoch
* regimes
/**
* \struct Scoped
*
* \brief Interface for scoped epochs using C++ lexical scopes to encapsulate
* epoch regimes
*/

struct Scoped {
static EpochType rooted(bool small, ActionType closure);
static EpochType rooted(bool small, ActionType closure, ActionType action);
/**
* \brief Run closure in rooted epoch to detect termination of all its work
*
* \param[in] use_ds whether to use DS algorithm or not
* \param[in] closure the closure to wait for termination
*
* \return the epoch created for it
*/
static EpochType rooted(bool use_ds, ActionType closure);

/**
* \brief Run closure in rooted epoch to detect termination of all its work
*
* \param[in] use_ds whether to use DS algorithm or not
* \param[in] closure the closure to wait for termination
* \param[in] action action to execute after closure terminates
*
* \return the epoch created for it
*/
static EpochType rooted(bool use_ds, ActionType closure, ActionType action);

/**
* \brief Collectively run closure in collective epoch to detect termination
* of all its work
*
* \param[in] closure the closure to wait for termination
*
* \return the epoch created for it
*/
static EpochType collective(ActionType closure);

/**
* \brief Collectively run closure in collective epoch to detect termination
* of all its work
*
* \param[in] closure the closure to wait for termination
* \param[in] action action to execute after closure terminates
*
* \return the epoch created for it
*/
static EpochType collective(ActionType closure, ActionType action);

/**
* \brief Run a rooted sequence of actions with termination in-between them
*
* \param[in] use_ds whether to use DS
* \param[in] closures pack of closures to sequence
*/
template <typename... Actions>
static void rootedSeq(bool small, Actions... closures);
static void rootedSeq(bool use_ds, Actions... closures);
} scope;

public:
@@ -212,58 +329,211 @@ struct TerminationDetector :
SuccessorEpochCapture successor = SuccessorEpochCapture{}
);

void activateEpoch(EpochType const& epoch);
/**
* \brief Tell the termination detector that all work has been enqueued for a
* given epoch
*
* \param[in] epoch the finished epoch
*/
void finishedEpoch(EpochType const& epoch);

/**
* \internal \brief Activate an epoch; start detecting on it
*
* \param[in] epoch the epoch to activate
*/
void activateEpoch(EpochType const& epoch);

/**
* \internal \brief Finish an epoch without activation it
*
* \param[in] epoch the epoch that is finished
*/
void finishNoActivateEpoch(EpochType const& epoch);

public:
/*
* Directly call into a specific type of rooted epoch, can not be overridden
*/

/**
* \brief Create a new rooted epoch that uses the 4-counter wave algorithm
*
* \param[in] successor successor epoch that waits for this new epoch
* \param[in] label epoch label for debugging purposes
*
* \return the new epoch
*/
EpochType makeEpochRootedWave(
SuccessorEpochCapture successor, std::string const& label = ""
);

/**
* \brief Create a new rooted epoch that uses the DS algorithm
*
* \param[in] successor successor epoch that waits for this new epoch
* \param[in] label epoch label for debugging purposes
*
* \return the new epoch
*/
EpochType makeEpochRootedDS(
SuccessorEpochCapture successor, std::string const& label = ""
);

private:
enum CallFromEnum { Root, NonRoot };

/**
* \internal \brief Find or create on demand state for a collective wave-based
* epoch
*
* \param[in] epoch the epoch
* \param[in] is_ready whether it is ready
*
* \return termination state for the epoch
*/
TermStateType& findOrCreateState(EpochType const& epoch, bool is_ready);

/**
* \internal \brief Cleanup an epoch after termination
*
* \param[in] epoch the epoch
* \param[in] from the caller
*/
void cleanupEpoch(EpochType const& epoch, CallFromEnum from);

/**
* \internal \brief Produce/consume on an epoch
*
* \param[in] state the epoch state
* \param[in] num_units number of units
* \param[in] produce whether its a produce or consume
* \param[in] node the node producing to or consuming from
*/
void produceConsumeState(
TermStateType& state, TermCounterType const num_units, bool produce,
NodeType node
);

/**
* \internal \brief Produce\consume on an epoch
*
* \param[in] state the epoch state
* \param[in] num_units number of units
* \param[in] produce whether its a produce or consume
* \param[in] node the node producing to or consuming from
*/
void produceConsume(
EpochType epoch = any_epoch_sentinel, TermCounterType num_units = 1,
bool produce = true, NodeType node = uninitialized_destination
);

/**
* \internal \brief Propagate an epoch with state
*
* \param[in] state epoch state
* \param[in] prod num produced
* \param[in] cons num consumed
*/
void propagateEpochExternalState(
TermStateType& state, TermCounterType const& prod, TermCounterType const& cons
);

/**
* \internal \brief Propagate an epoch
*
* \param[in] epoch the epoch
* \param[in] prod num produced
* \param[in] cons num consumed
*/
void propagateEpochExternal(
EpochType const& epoch, TermCounterType const& prod,
TermCounterType const& cons
);

/**
* \internal \brief Get archetype bits embedded in epoch
*
* \param[in] epoch the epoch
*
* \return the bits masked out
*/
EpochType getArchetype(EpochType const& epoch) const;

/**
* \internal \brief Get an epoch's window
*
* \param[in] epoch the epoch
*
* \return the window
*/
EpochWindow* getWindow(EpochType const& epoch);

/**
* \internal \brief Check for and perform actions when a epoch's counts are
* constant.
*
* \param[in] state the epoch state
*/
void countsConstant(TermStateType& state);

public:
/**
* \internal \brief Build the epoch graph. Typically called to output to the
* user due to a failure.
*/
void startEpochGraphBuild();

private:
/**
* \internal \brief Update resolved epochs
*
* \param[in] epoch the epoch
*/
void updateResolvedEpochs(EpochType const& epoch);

/**
* \internal \brief Inquire if an epoch has terminated
*
* \param[in] epoch the epoch
* \param[in] from_node the node inquiring
*/
void inquireTerminated(EpochType const& epoch, NodeType const& from_node);

/**
* \internal \brief Reply to a node whether an epoch has terminated
*
* \param[in] epoch the epoch
* \param[in] is_terminated whether it has terminated
*/
void replyTerminated(EpochType const& epoch, bool const& is_terminated);

public:
/**
* \internal \brief Set whether the scheduler has locally terminated
*
* \param[in] terminated whether it has terminated
* \param[in] no_propagate whether to should propagate state remotely
*/
void setLocalTerminated(bool const terminated, bool const no_propagate = true);

/**
* \internal \brief Progress function to move state forward
*/
void maybePropagate();

/**
* \brief Get number of units produced on global epoch
*
* \return number of produced units
*/
TermCounterType getNumUnits() const;

/**
* \brief Get number of collective epochs that have terminated
*
* \return number of epochs
*/
std::size_t getNumTerminatedCollectiveEpochs() const;

public:
@@ -274,24 +544,94 @@ struct TerminationDetector :
bool isEpochTerminated(EpochType epoch);

public:
/**
* \brief Make the local epoch graph
*
* \return shared pointer to epoch graph
*/
std::shared_ptr<EpochGraph> makeGraph();

private:
/**
* \internal \brief Handler for hang checking
*
* \param[in] msg the message
*/
static void hangCheckHandler(HangCheckMsg* msg);

/**
* \internal \brief Handler for building the local epoch graph
*
* \param[in] msg the message
*/
static void buildLocalGraphHandler(BuildGraphMsg* msg);

/**
* \internal \brief Handler for to call when epoch graph is done building
*
* \param[in] msg the message
*/
static void epochGraphBuiltHandler(EpochGraphMsg* msg);

private:
/**
* \internal \brief Propagate a particular epoch
*
* \param[in] state the state for the epoch
*
* \return whether it made progress
*/
bool propagateEpoch(TermStateType& state);

/**
* \internal \brief Notfiy that an epoch has terminated
*
* \param[in] epoch the epoch
* \param[in] from the caller
*/
void epochTerminated(EpochType const& epoch, CallFromEnum from);

/**
* \internal \brief Do another wave for an epoch
*
* \param[in] epoch the epoch
* \param[in] wave the wave count so far
*/
void epochContinue(EpochType const& epoch, TermWaveType const& wave);

/**
* \internal \brief Setup state for a new epoch
*
* \param[in] epoch the epoch
* \param[in] label the label for debugging
*/
void setupNewEpoch(EpochType const& epoch, std::string const& label);

/**
* \internal \brief Ready an epoch
*
* \param[in] epoch the epoch
*/
void readyNewEpoch(EpochType const& epoch);

/**
* \internal \brief Make an epoch
*
* \param[in] epoch the epoch
* \param[in] is_root whether it is rooted
* \param[in] label the label for debugging
*/
void makeRootedHan(
EpochType const& epoch, bool is_root, std::string const& label = ""
);

public:
/**
* \internal \brief Make a dependency between two epochs
*
* \param[in] predecessor the predecessor epoch
* \param[in] successoor the successoor epoch
*/
void addDependency(EpochType predecessor, EpochType successoor);

public:
@@ -301,16 +641,70 @@ struct TerminationDetector :
std::unordered_set<EpochType> const& getEpochWaitSet() { return epoch_wait_status_; }

private:
/**
* \internal \brief Get an epoch's dependency information
*
* \param[in] epoch the epoch
*
* \return the dependency
*/
EpochDependency* getEpochDep(EpochType epoch);

/**
* \internal \brief Decrement the join counter on an epoch's dependency
*
* \param[in] ep the epoch
*/
void removeEpochStateDependency(EpochType ep);

/**
* \internal \brief Increment the join counter on an epoch's dependency
*
* \param[in] ep the epoch
*/
void addEpochStateDependency(EpochType ep);

private:
/**
* \internal \brief Make a rooted epoch handler
*
* \param[in] msg the message
*/
static void makeRootedHandler(TermMsg* msg);

/**
* \internal \brief Inquire if an epoch terminated handler
*
* \param[in] msg the message
*/
static void inquireEpochTerminated(TermTerminatedMsg* msg);

/**
* \internal \brief Reply if an epoch terminated handler
*
* \param[in] msg the message
*/
static void replyEpochTerminated(TermTerminatedReplyMsg* msg);

/**
* \internal \brief Propagate an epoch handler
*
* \param[in] msg the message
*/
static void propagateEpochHandler(TermCounterMsg* msg);

/**
* \internal \brief Notify an epoch terminated handler
*
* \param[in] msg the message
*/
static void epochTerminatedHandler(TermMsg* msg);

/**
* \internal \brief Continue doing waves for an epoch handler
*
* \param[in] msg the message
*/
static void epochContinueHandler(TermMsg* msg);

private:

0 comments on commit e91da82

Please sign in to comment.