diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index fad97590e41..d80913ad7ef 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -92,11 +92,6 @@ class LedgerCleanerImp : public LedgerCleaner // //-------------------------------------------------------------------------- - void - onPrepare() override - { - } - void onStart() override { diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 29af2f8655b..89cfdf5b696 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -977,11 +977,6 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -1634,7 +1629,6 @@ void ApplicationImp::doStart(bool withTimers) { startTimers_ = withTimers; - prepare(); start(); } diff --git a/src/ripple/app/main/LoadManager.cpp b/src/ripple/app/main/LoadManager.cpp index 5c481c5c3e8..b63972f34e1 100644 --- a/src/ripple/app/main/LoadManager.cpp +++ b/src/ripple/app/main/LoadManager.cpp @@ -77,11 +77,6 @@ LoadManager::resetDeadlockDetector() //------------------------------------------------------------------------------ -void -LoadManager::onPrepare() -{ -} - void LoadManager::onStart() { diff --git a/src/ripple/app/main/LoadManager.h b/src/ripple/app/main/LoadManager.h index 4ff3d8b8a8f..c5d344bdb4b 100644 --- a/src/ripple/app/main/LoadManager.h +++ b/src/ripple/app/main/LoadManager.h @@ -82,9 +82,6 @@ class LoadManager : public Stoppable //-------------------------------------------------------------------------- // Stoppable members - void - onPrepare() override; - void onStart() override; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 67e9f29241e..acc8c31d595 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -607,7 +607,33 @@ class NetworkOPsImp final : public NetworkOPs // Stoppable. void - onStop() override; + onStop() override + { + { + boost::system::error_code ec; + heartbeatTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: heartbeatTimer cancel error: " + << ec.message(); + } + + ec.clear(); + clusterTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: clusterTimer cancel error: " + << ec.message(); + } + } + // Make sure that any waitHandlers pending in our timers are done + // before we declare ourselves stopped. + using namespace std::chrono_literals; + waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); + stopped(); + } private: void @@ -673,7 +699,6 @@ class NetworkOPsImp final : public NetworkOPs ConsensusPhase mLastConsensusPhase; LedgerMaster& m_ledgerMaster; - std::shared_ptr mAcquiringLedger; SubInfoMapType mSubAccount; SubInfoMapType mSubRTAccount; @@ -3516,34 +3541,6 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl) return true; } -void -NetworkOPsImp::onStop() -{ - mAcquiringLedger.reset(); - { - boost::system::error_code ec; - heartbeatTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: heartbeatTimer cancel error: " << ec.message(); - } - - ec.clear(); - clusterTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: clusterTimer cancel error: " << ec.message(); - } - } - // Make sure that any waitHandlers pending in our timers are done - // before we declare ourselves stopped. - using namespace std::chrono_literals; - waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); - stopped(); -} - #ifndef USE_NEW_BOOK_PAGE // NIKB FIXME this should be looked at. There's no reason why this shouldn't diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index 8952453cba0..e45cb30b207 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -21,7 +21,6 @@ #define RIPPLE_APP_MISC_SHAMAPSTORE_H_INCLUDED #include -#include #include #include #include @@ -29,17 +28,16 @@ namespace ripple { class TransactionMaster; +class Stoppable; /** * class to create database, launch online delete thread, and * related SQLite database */ -class SHAMapStore : public Stoppable +class SHAMapStore { public: - SHAMapStore(Stoppable& parent) : Stoppable("SHAMapStore", parent) - { - } + virtual ~SHAMapStore() = default; /** Called by LedgerMaster every time a ledger validates. */ virtual void diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 86e96587bf7..8d4bca6b48d 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -150,7 +150,7 @@ SHAMapStoreImp::SHAMapStoreImp( Stoppable& parent, NodeStore::Scheduler& scheduler, beast::Journal journal) - : SHAMapStore(parent) + : Stoppable("SHAMapStore", parent) , app_(app) , scheduler_(scheduler) , journal_(journal) @@ -731,6 +731,7 @@ SHAMapStoreImp::health() void SHAMapStoreImp::onStop() { + // This is really a check for `if (thread_)`. if (deleteInterval_) { { @@ -738,26 +739,12 @@ SHAMapStoreImp::onStop() stop_ = true; } cond_.notify_one(); + // stopped() will be called by the thread_ running run(), + // when it reaches the check for stop_. } else { - stopped(); - } -} - -void -SHAMapStoreImp::onChildrenStopped() -{ - if (deleteInterval_) - { - { - std::lock_guard lock(mutex_); - stop_ = true; - } - cond_.notify_one(); - } - else - { + // There is no thread running run(), so we must call stopped(). stopped(); } } diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 541d74a38ee..062b90dde6d 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -33,7 +34,7 @@ namespace ripple { class NetworkOPs; -class SHAMapStoreImp : public SHAMapStore +class SHAMapStoreImp : public Stoppable, public SHAMapStore { private: struct SavedState @@ -117,7 +118,7 @@ class SHAMapStoreImp : public SHAMapStore boost::optional recoveryWaitTime_; // these do not exist upon SHAMapStore creation, but do exist - // as of onPrepare() or before + // as of run() or before NetworkOPs* netOPs_ = nullptr; LedgerMaster* ledgerMaster_ = nullptr; FullBelowCache* fullBelowCache_ = nullptr; @@ -246,14 +247,10 @@ class SHAMapStoreImp : public SHAMapStore // the main "run()". Health health(); + // // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -265,8 +262,6 @@ class SHAMapStoreImp : public SHAMapStore void onStop() override; // Called when all child Stoppable objects have stoped - void - onChildrenStopped() override; }; } // namespace ripple diff --git a/src/ripple/basics/impl/PerfLogImp.cpp b/src/ripple/basics/impl/PerfLogImp.cpp index 8143865784d..f3bb986d254 100644 --- a/src/ripple/basics/impl/PerfLogImp.cpp +++ b/src/ripple/basics/impl/PerfLogImp.cpp @@ -477,14 +477,12 @@ PerfLogImp::onStop() } thread_.join(); } - if (areChildrenStopped()) - stopped(); } void PerfLogImp::onChildrenStopped() { - onStop(); + stopped(); } //----------------------------------------------------------------------------- diff --git a/src/ripple/basics/impl/PerfLogImp.h b/src/ripple/basics/impl/PerfLogImp.h index 943829b6e96..8fa430ec164 100644 --- a/src/ripple/basics/impl/PerfLogImp.h +++ b/src/ripple/basics/impl/PerfLogImp.h @@ -210,12 +210,6 @@ class PerfLogImp : public PerfLog, Stoppable void rotate() override; - // Stoppable - void - onPrepare() override - { - } - // Called when application is ready to start threads. void onStart() override; diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index cf2395653a3..e192b83beaf 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -339,6 +339,7 @@ class Stoppable std::string m_name; RootStoppable& m_root; Child m_child; + // TODO [C++20]: Use std::atomic_flag instead. std::atomic m_stopped{false}; std::atomic m_childrenStopped{false}; Children m_children; @@ -360,17 +361,9 @@ class RootStoppable : public Stoppable bool isStopping() const; - /** Prepare all contained Stoppable objects. - This calls onPrepare for all Stoppable objects in the tree. - Calls made after the first have no effect. - Thread safety: - May be called from any thread. - */ - void - prepare(); - - /** Start all contained Stoppable objects. - The default implementation does nothing. + /** Prepare and start all contained Stoppable objects. + This calls onPrepare for all Stoppable objects in the tree, bottom-up, + then calls onStart for the same, top-down. Calls made after the first have no effect. Thread safety: May be called from any thread. @@ -381,8 +374,8 @@ class RootStoppable : public Stoppable /** Notify a root stoppable and children to stop, and block until stopped. Has no effect if the stoppable was already notified. This blocks until the stoppable and all of its children have stopped. - Undefined behavior results if stop() is called without a previous call - to start(). + Undefined behavior results if stop() is called without finishing + a previous call to start(). Thread safety: Safe to call from any thread not associated with a Stoppable. */ @@ -393,7 +386,7 @@ class RootStoppable : public Stoppable bool started() const { - return m_started; + return startExited_; } /* JobQueue uses this method for Job counting. */ @@ -411,20 +404,10 @@ class RootStoppable : public Stoppable alertable_sleep_until(std::chrono::system_clock::time_point const& t); private: - /* Notify a root stoppable and children to stop, without waiting. - Has no effect if the stoppable was already notified. - - Returns true on the first call to this method, false otherwise. - - Thread safety: - Safe to call from any thread at any time. - */ - bool - stopAsync(beast::Journal j); - - std::atomic m_prepared{false}; - std::atomic m_started{false}; - std::atomic m_calledStop{false}; + // TODO [C++20]: Use std::atomic_flag instead. + std::atomic startEntered_{false}; + std::atomic startExited_{false}; + std::atomic stopEntered_{false}; std::mutex m_; std::condition_variable c_; JobCounter jobCounter_; @@ -446,9 +429,11 @@ RootStoppable::alertable_sleep_until( std::chrono::system_clock::time_point const& t) { std::unique_lock lock(m_); - if (m_calledStop) + if (stopEntered_) return true; - return c_.wait_until(lock, t, [this] { return m_calledStop.load(); }); + // TODO [C++20]: When `stopEntered_` is changed to a `std::atomic_flag`, + // this call to `load` needs to change to a call to `test`. + return c_.wait_until(lock, t, [this] { return stopEntered_.load(); }); } inline bool diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index f76317351c2..5d54c15660d 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -174,54 +174,39 @@ RootStoppable::~RootStoppable() bool RootStoppable::isStopping() const { - return m_calledStop; -} - -void -RootStoppable::prepare() -{ - if (m_prepared.exchange(true) == false) - prepareRecursive(); + return stopEntered_; } void RootStoppable::start() { - // Courtesy call to prepare. - if (m_prepared.exchange(true) == false) - prepareRecursive(); - - if (m_started.exchange(true) == false) - startRecursive(); + if (startEntered_.exchange(true)) + return; + prepareRecursive(); + startRecursive(); + startExited_ = true; } void RootStoppable::stop(beast::Journal j) { // Must have a prior call to start() - assert(m_started); + assert(startExited_); - if (stopAsync(j)) - stopRecursive(j); -} - -bool -RootStoppable::stopAsync(beast::Journal j) -{ bool alreadyCalled; { - // Even though m_calledStop is atomic, we change its value under a + // Even though stopEntered_ is atomic, we change its value under a // lock. This removes a small timing window that occurs if the - // waiting thread is handling a spurious wakeup while m_calledStop + // waiting thread is handling a spurious wakeup while stopEntered_ // changes state. std::unique_lock lock(m_); - alreadyCalled = m_calledStop.exchange(true); + alreadyCalled = stopEntered_.exchange(true); } if (alreadyCalled) { if (auto stream = j.warn()) - stream << "Stoppable::stop called again"; - return false; + stream << "RootStoppable::stop called again"; + return; } // Wait until all in-flight JobQueue Jobs are completed. @@ -230,7 +215,7 @@ RootStoppable::stopAsync(beast::Journal j) c_.notify_all(); stopAsyncRecursive(j); - return true; + stopRecursive(j); } } // namespace ripple diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index c08a449ce78..321ca368568 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -63,51 +63,51 @@ void Workers::setNumberOfThreads(int numberOfThreads) { static int instance{0}; - if (m_numberOfThreads != numberOfThreads) + if (m_numberOfThreads == numberOfThreads) + return; + + if (perfLog_) + perfLog_->resizeJobs(numberOfThreads); + + if (numberOfThreads > m_numberOfThreads) { - if (perfLog_) - perfLog_->resizeJobs(numberOfThreads); + // Increasing the number of working threads + int const amount = numberOfThreads - m_numberOfThreads; - if (numberOfThreads > m_numberOfThreads) + for (int i = 0; i < amount; ++i) { - // Increasing the number of working threads - int const amount = numberOfThreads - m_numberOfThreads; + // See if we can reuse a paused worker + Worker* worker = m_paused.pop_front(); - for (int i = 0; i < amount; ++i) + if (worker != nullptr) { - // See if we can reuse a paused worker - Worker* worker = m_paused.pop_front(); - - if (worker != nullptr) - { - // If we got here then the worker thread is at [1] - // This will unblock their call to wait() - // - worker->notify(); - } - else - { - worker = new Worker(*this, m_threadNames, instance++); - m_everyone.push_front(worker); - } + // If we got here then the worker thread is at [1] + // This will unblock their call to wait() + // + worker->notify(); } - } - else - { - // Decreasing the number of working threads - int const amount = m_numberOfThreads - numberOfThreads; - - for (int i = 0; i < amount; ++i) + else { - ++m_pauseCount; - - // Pausing a thread counts as one "internal task" - m_semaphore.notify(); + worker = new Worker(*this, m_threadNames, instance++); + m_everyone.push_front(worker); } } + } + else + { + // Decreasing the number of working threads + int const amount = m_numberOfThreads - numberOfThreads; - m_numberOfThreads = numberOfThreads; + for (int i = 0; i < amount; ++i) + { + ++m_pauseCount; + + // Pausing a thread counts as one "internal task" + m_semaphore.notify(); + } } + + m_numberOfThreads = numberOfThreads; } void diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 4dcaa4d658e..76ac42551f2 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -722,12 +722,7 @@ DatabaseShardImp::onChildrenStopped() } } - if (areChildrenStopped()) - stopped(); - else - { - JLOG(j_.warn()) << " Children failed to stop"; - } + stopped(); } void diff --git a/src/ripple/peerfinder/impl/PeerfinderManager.cpp b/src/ripple/peerfinder/impl/PeerfinderManager.cpp index f5310e8c7da..73383befd9e 100644 --- a/src/ripple/peerfinder/impl/PeerfinderManager.cpp +++ b/src/ripple/peerfinder/impl/PeerfinderManager.cpp @@ -227,11 +227,6 @@ class ManagerImp : public Manager m_logic.load(); } - void - onStart() override - { - } - void onStop() override { diff --git a/src/test/basics/PerfLog_test.cpp b/src/test/basics/PerfLog_test.cpp index e223047a79d..452057ef159 100644 --- a/src/test/basics/PerfLog_test.cpp +++ b/src/test/basics/PerfLog_test.cpp @@ -101,7 +101,6 @@ class PerfLog_test : public beast::unit_test::suite void doStart() { - prepare(); start(); } diff --git a/src/test/core/Stoppable_test.cpp b/src/test/core/Stoppable_test.cpp index b5d8327a736..c538af2a9ce 100644 --- a/src/test/core/Stoppable_test.cpp +++ b/src/test/core/Stoppable_test.cpp @@ -476,7 +476,6 @@ class Stoppable_test : public beast::unit_test::suite void run() { - prepare(); start(); stop(journal_); }