Skip to content

Commit

Permalink
Clean-up the Stoppable architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
thejohnfreeman authored and ximinez committed Jan 8, 2021
1 parent 1fd1c34 commit 78245a0
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 195 deletions.
5 changes: 0 additions & 5 deletions src/ripple/app/ledger/impl/LedgerCleaner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ class LedgerCleanerImp : public LedgerCleaner
//
//--------------------------------------------------------------------------

void
onPrepare() override
{
}

void
onStart() override
{
Expand Down
6 changes: 0 additions & 6 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,6 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
// Stoppable
//

void
onPrepare() override
{
}

void
onStart() override
{
Expand Down Expand Up @@ -1634,7 +1629,6 @@ void
ApplicationImp::doStart(bool withTimers)
{
startTimers_ = withTimers;
prepare();
start();
}

Expand Down
5 changes: 0 additions & 5 deletions src/ripple/app/main/LoadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ LoadManager::resetDeadlockDetector()

//------------------------------------------------------------------------------

void
LoadManager::onPrepare()
{
}

void
LoadManager::onStart()
{
Expand Down
3 changes: 0 additions & 3 deletions src/ripple/app/main/LoadManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class LoadManager : public Stoppable
//--------------------------------------------------------------------------

// Stoppable members
void
onPrepare() override;

void
onStart() override;

Expand Down
57 changes: 27 additions & 30 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,33 @@ class NetworkOPsImp final : public NetworkOPs
// Stoppable.

void
onStop() override;
onStop() override
{
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message();
}

ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}

private:
void
Expand Down Expand Up @@ -673,7 +699,6 @@ class NetworkOPsImp final : public NetworkOPs
ConsensusPhase mLastConsensusPhase;

LedgerMaster& m_ledgerMaster;
std::shared_ptr<InboundLedger> mAcquiringLedger;

SubInfoMapType mSubAccount;
SubInfoMapType mSubRTAccount;
Expand Down Expand Up @@ -3516,34 +3541,6 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl)
return true;
}

void
NetworkOPsImp::onStop()
{
mAcquiringLedger.reset();
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: " << ec.message();
}

ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: " << ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}

#ifndef USE_NEW_BOOK_PAGE

// NIKB FIXME this should be looked at. There's no reason why this shouldn't
Expand Down
8 changes: 3 additions & 5 deletions src/ripple/app/misc/SHAMapStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,23 @@
#define RIPPLE_APP_MISC_SHAMAPSTORE_H_INCLUDED

#include <ripple/app/ledger/Ledger.h>
#include <ripple/core/Stoppable.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/protocol/ErrorCodes.h>
#include <boost/optional.hpp>

namespace ripple {

class TransactionMaster;
class Stoppable;

/**
* class to create database, launch online delete thread, and
* related SQLite database
*/
class SHAMapStore : public Stoppable
class SHAMapStore
{
public:
SHAMapStore(Stoppable& parent) : Stoppable("SHAMapStore", parent)
{
}
virtual ~SHAMapStore() = default;

/** Called by LedgerMaster every time a ledger validates. */
virtual void
Expand Down
23 changes: 5 additions & 18 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ SHAMapStoreImp::SHAMapStoreImp(
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
: SHAMapStore(parent)
: Stoppable("SHAMapStore", parent)
, app_(app)
, scheduler_(scheduler)
, journal_(journal)
Expand Down Expand Up @@ -731,33 +731,20 @@ SHAMapStoreImp::health()
void
SHAMapStoreImp::onStop()
{
// This is really a check for `if (thread_)`.
if (deleteInterval_)
{
{
std::lock_guard lock(mutex_);
stop_ = true;
}
cond_.notify_one();
// stopped() will be called by the thread_ running run(),
// when it reaches the check for stop_.
}
else
{
stopped();
}
}

void
SHAMapStoreImp::onChildrenStopped()
{
if (deleteInterval_)
{
{
std::lock_guard lock(mutex_);
stop_ = true;
}
cond_.notify_one();
}
else
{
// There is no thread running run(), so we must call stopped().
stopped();
}
}
Expand Down
13 changes: 4 additions & 9 deletions src/ripple/app/misc/SHAMapStoreImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/misc/SHAMapStore.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/Stoppable.h>
#include <ripple/nodestore/DatabaseRotating.h>
#include <atomic>
#include <chrono>
Expand All @@ -33,7 +34,7 @@ namespace ripple {

class NetworkOPs;

class SHAMapStoreImp : public SHAMapStore
class SHAMapStoreImp : public Stoppable, public SHAMapStore
{
private:
struct SavedState
Expand Down Expand Up @@ -117,7 +118,7 @@ class SHAMapStoreImp : public SHAMapStore
boost::optional<std::chrono::seconds> recoveryWaitTime_;

// these do not exist upon SHAMapStore creation, but do exist
// as of onPrepare() or before
// as of run() or before
NetworkOPs* netOPs_ = nullptr;
LedgerMaster* ledgerMaster_ = nullptr;
FullBelowCache* fullBelowCache_ = nullptr;
Expand Down Expand Up @@ -246,14 +247,10 @@ class SHAMapStoreImp : public SHAMapStore
// the main "run()".
Health
health();

//
// Stoppable
//
void
onPrepare() override
{
}

void
onStart() override
{
Expand All @@ -265,8 +262,6 @@ class SHAMapStoreImp : public SHAMapStore
void
onStop() override;
// Called when all child Stoppable objects have stoped
void
onChildrenStopped() override;
};

} // namespace ripple
Expand Down
4 changes: 1 addition & 3 deletions src/ripple/basics/impl/PerfLogImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,12 @@ PerfLogImp::onStop()
}
thread_.join();
}
if (areChildrenStopped())
stopped();
}

void
PerfLogImp::onChildrenStopped()
{
onStop();
stopped();
}

//-----------------------------------------------------------------------------
Expand Down
6 changes: 0 additions & 6 deletions src/ripple/basics/impl/PerfLogImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ class PerfLogImp : public PerfLog, Stoppable
void
rotate() override;

// Stoppable
void
onPrepare() override
{
}

// Called when application is ready to start threads.
void
onStart() override;
Expand Down
45 changes: 15 additions & 30 deletions src/ripple/core/Stoppable.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class Stoppable
std::string m_name;
RootStoppable& m_root;
Child m_child;
// TODO [C++20]: Use std::atomic_flag instead.
std::atomic<bool> m_stopped{false};
std::atomic<bool> m_childrenStopped{false};
Children m_children;
Expand All @@ -360,17 +361,9 @@ class RootStoppable : public Stoppable
bool
isStopping() const;

/** Prepare all contained Stoppable objects.
This calls onPrepare for all Stoppable objects in the tree.
Calls made after the first have no effect.
Thread safety:
May be called from any thread.
*/
void
prepare();

/** Start all contained Stoppable objects.
The default implementation does nothing.
/** Prepare and start all contained Stoppable objects.
This calls onPrepare for all Stoppable objects in the tree, bottom-up,
then calls onStart for the same, top-down.
Calls made after the first have no effect.
Thread safety:
May be called from any thread.
Expand All @@ -381,8 +374,8 @@ class RootStoppable : public Stoppable
/** Notify a root stoppable and children to stop, and block until stopped.
Has no effect if the stoppable was already notified.
This blocks until the stoppable and all of its children have stopped.
Undefined behavior results if stop() is called without a previous call
to start().
Undefined behavior results if stop() is called without finishing
a previous call to start().
Thread safety:
Safe to call from any thread not associated with a Stoppable.
*/
Expand All @@ -393,7 +386,7 @@ class RootStoppable : public Stoppable
bool
started() const
{
return m_started;
return startExited_;
}

/* JobQueue uses this method for Job counting. */
Expand All @@ -411,20 +404,10 @@ class RootStoppable : public Stoppable
alertable_sleep_until(std::chrono::system_clock::time_point const& t);

private:
/* Notify a root stoppable and children to stop, without waiting.
Has no effect if the stoppable was already notified.
Returns true on the first call to this method, false otherwise.
Thread safety:
Safe to call from any thread at any time.
*/
bool
stopAsync(beast::Journal j);

std::atomic<bool> m_prepared{false};
std::atomic<bool> m_started{false};
std::atomic<bool> m_calledStop{false};
// TODO [C++20]: Use std::atomic_flag instead.
std::atomic<bool> startEntered_{false};
std::atomic<bool> startExited_{false};
std::atomic<bool> stopEntered_{false};
std::mutex m_;
std::condition_variable c_;
JobCounter jobCounter_;
Expand All @@ -446,9 +429,11 @@ RootStoppable::alertable_sleep_until(
std::chrono::system_clock::time_point const& t)
{
std::unique_lock<std::mutex> lock(m_);
if (m_calledStop)
if (stopEntered_)
return true;
return c_.wait_until(lock, t, [this] { return m_calledStop.load(); });
// TODO [C++20]: When `stopEntered_` is changed to a `std::atomic_flag`,
// this call to `load` needs to change to a call to `test`.
return c_.wait_until(lock, t, [this] { return stopEntered_.load(); });
}

inline bool
Expand Down
Loading

0 comments on commit 78245a0

Please sign in to comment.