Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign stoppable objects #3741

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
0747c09
Remove Stoppable::onPrepare
thejohnfreeman Dec 29, 2020
73e6347
[fold] Move timer from RootStoppable to LoadManager
thejohnfreeman Dec 29, 2020
6b56845
[fold] Call stopped() in LedgerCleanerImp::onStop
thejohnfreeman Dec 30, 2020
2180d82
[fold] Remove Stoppable from base classes of OrderBookDB, LedgerMaster
thejohnfreeman Dec 30, 2020
e8f8d04
[fold] Call stopped() in SHAMapStoreImp::onStop
thejohnfreeman Dec 30, 2020
0282848
[fold] Call stopped() in NodeStore::Database::onStop
thejohnfreeman Dec 30, 2020
50a9967
[fold] NodeStoreScheduler is not a Stoppable
thejohnfreeman Dec 30, 2020
9af6acc
[fold] Call stopped() in ServerHandlerImp::onStop
thejohnfreeman Dec 30, 2020
1bb43f7
[fold] Stop NodeStore::TaskQueue from NodeStore::DatabaseShardImp::on…
thejohnfreeman Dec 30, 2020
94c830c
[fold] Move JobCounter from RootStoppable to JobQueue
thejohnfreeman Dec 31, 2020
ef7d968
[fold] Call stopped() in OverlayImpl::onStop
thejohnfreeman Dec 31, 2020
c27f440
[fold] Call stopped() in NodeStore::DatabaseShardImp::onStop
thejohnfreeman Dec 31, 2020
aad8a32
[fold] Call stopped() in PerfLogImp::onStop
thejohnfreeman Dec 31, 2020
59557fe
[fold] Remove Stoppable::{stopped,onChildrenStopped}
thejohnfreeman Dec 31, 2020
78050db
[fold] Remove Stoppable::{setParent,getRoot}
thejohnfreeman Dec 31, 2020
251bf3e
[fold] Remove RootStoppable::start and Stoppable::startRecursive
thejohnfreeman Jan 5, 2021
299e059
[fold] Inline Application::start
thejohnfreeman Jan 6, 2021
ad12f0e
[fold] LoadManager is not a Stoppable
thejohnfreeman Jan 6, 2021
ffa6c2c
[fold] PeerFinder::Manager is not a Stoppable
thejohnfreeman Jan 6, 2021
4d32cf6
[fold] Remove unused include
thejohnfreeman Jan 6, 2021
f331b3f
[fold] ServerHandlerImp is not a Stoppable
thejohnfreeman Jan 6, 2021
f8bd287
[fold] LedgerCleaner is not a Stoppable
thejohnfreeman Jan 6, 2021
15a9472
[fold] NetworkOPsImp is not a Stoppable
thejohnfreeman Jan 6, 2021
eb68656
[fold] Remove unused include
thejohnfreeman Jan 6, 2021
2d26155
[fold] GRPCServer is not a Stoppable
thejohnfreeman Jan 6, 2021
727b349
[fold] PerfLogImp is not a Stoppable
thejohnfreeman Jan 7, 2021
3e08270
[fold] Use a fixture for PerfLog tests
thejohnfreeman Jan 7, 2021
15b7da1
[fold] OverlayImpl is not a Stoppable
thejohnfreeman Jan 8, 2021
4a150d1
[fold] InboundTransactionsImp is not a Stoppable
thejohnfreeman Jan 8, 2021
2b35d46
[fold] InboundLedgersImp is not a Stoppable
thejohnfreeman Jan 8, 2021
833767e
[fold] ReportingETL is not a Stoppable
thejohnfreeman Jan 22, 2021
77a6ac7
[fold] PgPool is not a Stoppable
thejohnfreeman Jan 22, 2021
8c5deef
[fold] NodeStore::Database is not a Stoppable
thejohnfreeman Jan 18, 2021
023998b
[fold] SHAMapStoreImp is not a Stoppable
thejohnfreeman Jan 19, 2021
21db979
[fold] ShardArchiveHandler is not a Stoppable
thejohnfreeman Jan 19, 2021
0b14202
[fold] JobQueue is not a Stoppable
thejohnfreeman Jan 19, 2021
fe5d5db
[fold] Remove Stoppable
thejohnfreeman Jan 19, 2021
ddf177e
[fold] Document ClosureCounter and Workers
thejohnfreeman Jan 19, 2021
846b0c7
[fold] Remove mentions of Stoppable
thejohnfreeman Jan 19, 2021
6a45a85
[fold] Rename HTTPDownloader::{onStop -> stop}
thejohnfreeman Jan 19, 2021
5e892a7
[fold] review fixes
thejohnfreeman Jan 26, 2021
cd9cd6e
[fold] Merge 1.7.0-b12
thejohnfreeman Jan 27, 2021
e8a8d3e
[fold] LedgerReplayer is not a Stoppable
thejohnfreeman Jan 27, 2021
bd22635
[fold] Use CTAD
thejohnfreeman Jan 31, 2021
a40ddf2
[fold] Move start, stop to PerfLog interface and use it
thejohnfreeman Jan 31, 2021
6fe6f26
[fold] Prefer narrow capture list
thejohnfreeman Jan 31, 2021
905a274
[fold] Rename Timer::{start -> async_wait}
thejohnfreeman Jan 31, 2021
f556e07
[fold] Move start, stop to Overlay interface and use it
thejohnfreeman Jan 31, 2021
a4fbb6c
[fold] Stop at the beginning of every NodeStore::Database destructor
thejohnfreeman Feb 1, 2021
954c6fb
[fold] Rename Workers::{pauseAllThreadsAndWait -> stop}
thejohnfreeman Feb 1, 2021
3596ce5
[fold] Ignore overlay when it is missing
thejohnfreeman Feb 1, 2021
fcba4a6
[fold] Fix comment
thejohnfreeman Feb 2, 2021
92e9c6a
[fold] Stop ServerHandlerImp
thejohnfreeman Feb 3, 2021
01521e5
[fold] Rearrange stop sequence
thejohnfreeman Feb 7, 2021
5e05663
Merge 1.7.0-rc2
thejohnfreeman Feb 7, 2021
6a025ff
Fix declaration of CassandraBackend::counters_
thejohnfreeman Feb 5, 2021
51adc26
[fold] review fixes
thejohnfreeman Feb 8, 2021
e531354
[fold] Move LedgerCleaner from LedgerMaster to ApplicationImp
thejohnfreeman Feb 8, 2021
5bfc16f
[fold] Merge 1.8.0-b1
thejohnfreeman Mar 15, 2021
2a39b7c
[fold] Add missing include
thejohnfreeman Mar 23, 2021
d3c1c1f
Fix compilation of krb5 dependency with GCC 10
thejohnfreeman Mar 23, 2021
e74afc3
Merge 1.8.0-b2
thejohnfreeman Apr 2, 2021
f1beb4f
[fold] Put PgPool behind conditional compilation
thejohnfreeman Apr 2, 2021
145f5a1
[fold] More conditional compilation
thejohnfreeman Apr 2, 2021
220f440
[fold] Requested changes
thejohnfreeman Apr 30, 2021
7b459ca
[fold] Requested changes
thejohnfreeman May 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/LedgerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
#include <ripple/app/misc/CanonicalTXSet.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/basics/UptimeClock.h>
ximinez marked this conversation as resolved.
Show resolved Hide resolved
#include <ripple/basics/chrono.h>
#include <ripple/beast/insight/Collector.h>
#include <ripple/beast/utility/PropertyStream.h>
#include <ripple/core/Stoppable.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STValidation.h>
Expand Down Expand Up @@ -69,7 +69,7 @@ class ReportingShouldProxy : public std::runtime_error
// Tracks the current ledger and any ledgers in the process of closing
// Tracks ledger history
// Tracks held transactions
class LedgerMaster : public Stoppable, public AbstractFetchPackContainer
class LedgerMaster : public AbstractFetchPackContainer
{
public:
// Age for last validated ledger if the process has yet to validate.
Expand Down
9 changes: 3 additions & 6 deletions src/ripple/app/ledger/OrderBookDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@

namespace ripple {

OrderBookDB::OrderBookDB(Application& app, Stoppable& parent)
: Stoppable("OrderBookDB", parent)
, app_(app)
, mSeq(0)
, j_(app.journal("OrderBookDB"))
OrderBookDB::OrderBookDB(Application& app)
: app_(app), mSeq(0), j_(app.journal("OrderBookDB"))
{
}

Expand Down Expand Up @@ -101,7 +98,7 @@ OrderBookDB::update(std::shared_ptr<ReadView const> const& ledger)
{
for (auto& sle : ledger->sles)
{
if (isStopping())
if (app_.isStopping())
{
JLOG(j_.info())
<< "OrderBookDB::update exiting due to isStopping";
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/OrderBookDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@

namespace ripple {

class OrderBookDB : public Stoppable
class OrderBookDB
{
public:
OrderBookDB(Application& app, Stoppable& parent);
OrderBookDB(Application& app);

void
setup(std::shared_ptr<ReadView const> const& ledger);
Expand Down
34 changes: 10 additions & 24 deletions src/ripple/app/ledger/impl/LedgerCleaner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class LedgerCleanerImp : public LedgerCleaner

std::thread thread_;

enum class State : char { readyToClean = 0, startCleaning, cleaning };
State state_ = State::readyToClean;
enum class State : char { notCleaning = 0, cleaning };
State state_ = State::notCleaning;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this restructure this could probably reduced to:

bool cleaning_{false};

Once the codebase moves to C++20 using std::atomic_flag could be a better choice, since it adds wait and notify_one/notify_all functionality. A TODO comment might be nice to add.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::atomic_flag seems to be missing the predicated wait overload, and because the wakeup condition is complex (stopped or cleaning), I don't think we win anything by switching to std::atomic_flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the simpler bool cleaning_{false};

bool shouldExit_ = false;

// The lowest ledger in the range we're checking.
Expand Down Expand Up @@ -108,6 +108,7 @@ class LedgerCleanerImp : public LedgerCleaner
wakeup_.notify_one();
}
thread_.join();
stopped();
Copy link
Contributor

@miguelportilla miguelportilla Feb 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider

    if (thread_.joinable())
        thread_.join();

also renaming shouldExit_ to stop_

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're assuming that LedgerCleaner::stop is called exactly once. I want stop methods to be able to assume that. It will make the transition to RAII much easier.

}

//--------------------------------------------------------------------------
Expand Down Expand Up @@ -214,11 +215,8 @@ class LedgerCleanerImp : public LedgerCleaner
if (params.isMember(jss::stop) && params[jss::stop].asBool())
minRange_ = maxRange_ = 0;

if (state_ == State::readyToClean)
{
state_ = State::startCleaning;
wakeup_.notify_one();
}
state_ = State::cleaning;
thejohnfreeman marked this conversation as resolved.
Show resolved Hide resolved
wakeup_.notify_one();
}
}

Expand All @@ -228,36 +226,26 @@ class LedgerCleanerImp : public LedgerCleaner
//
//--------------------------------------------------------------------------
private:
void
init()
{
JLOG(j_.debug()) << "Initializing";
}

void
run()
{
beast::setCurrentThreadName("LedgerCleaner");
JLOG(j_.debug()) << "Started";

init();

while (true)
{
{
std::unique_lock<std::mutex> lock(mutex_);
state_ = State::notCleaning;
wakeup_.wait(lock, [this]() {
return (shouldExit_ || state_ == State::startCleaning);
return (shouldExit_ || state_ == State::cleaning);
});
if (shouldExit_)
break;

state_ = State::cleaning;
assert(state_ == State::cleaning);
}
doLedgerCleaner();
}

stopped();
}

// VFALCO TODO This should return boost::optional<uint256>
Expand Down Expand Up @@ -413,12 +401,11 @@ class LedgerCleanerImp : public LedgerCleaner
bool doNodes;
bool doTxns;

while (app_.getFeeTrack().isLoadedLocal())
if (app_.getFeeTrack().isLoadedLocal())
{
JLOG(j_.debug()) << "Waiting for load to subside";
std::this_thread::sleep_for(std::chrono::seconds(5));
if (shouldExit())
return;
continue;
}

{
Expand All @@ -427,7 +414,6 @@ class LedgerCleanerImp : public LedgerCleaner
(minRange_ == 0))
{
minRange_ = maxRange_ = 0;
state_ = State::readyToClean;
return;
}
ledgerIndex = maxRange_;
Expand Down
11 changes: 6 additions & 5 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,13 @@ LedgerMaster::LedgerMaster(
Stoppable& parent,
beast::insight::Collector::ptr const& collector,
beast::Journal journal)
: Stoppable("LedgerMaster", parent)
, app_(app)
: app_(app)
, m_journal(journal)
, mLedgerHistory(collector, app)
, mLedgerCleaner(
detail::make_LedgerCleaner(app, *this, app_.journal("LedgerCleaner")))
, mLedgerCleaner(detail::make_LedgerCleaner(
app,
parent,
app_.journal("LedgerCleaner")))
, standalone_(app_.config().standalone())
, fetch_depth_(
app_.getSHAMapStore().clampFetchDepth(app_.config().FETCH_DEPTH))
Expand Down Expand Up @@ -1557,7 +1558,7 @@ LedgerMaster::newPFWork(
}
// If we're stopping don't give callers the expectation that their
// request will be fulfilled, even if it may be serviced.
return mPathFindThread > 0 && !isStopping();
return mPathFindThread > 0 && !app_.isStopping();
}

std::recursive_mutex&
Expand Down
50 changes: 29 additions & 21 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
#ifdef RIPPLED_REPORTING
std::shared_ptr<PgPool> pgPool_;
#endif

std::unique_ptr<CollectorManager> m_collectorManager;
std::unique_ptr<JobQueue> m_jobQueue;
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr<SHAMapStore> m_shaMapStore;
PendingSaves pendingSaves_;
Expand All @@ -175,15 +178,13 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp

// These are not Stoppable-derived
NodeCache m_tempNodeCache;
std::unique_ptr<CollectorManager> m_collectorManager;
CachedSLEs cachedSLEs_;
std::pair<PublicKey, SecretKey> nodeIdentity_;
ValidatorKeys const validatorKeys_;

std::unique_ptr<Resource::Manager> m_resourceManager;

// These are Stoppable-related
std::unique_ptr<JobQueue> m_jobQueue;
std::unique_ptr<NodeStore::Database> m_nodeStore;
NodeFamily nodeFamily_;
std::unique_ptr<NodeStore::DatabaseShard> shardStore_;
Expand Down Expand Up @@ -288,7 +289,22 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
: nullptr)
#endif

, m_nodeStoreScheduler(*this)
, m_collectorManager(CollectorManager::New(
thejohnfreeman marked this conversation as resolved.
Show resolved Hide resolved
config_->section(SECTION_INSIGHT),
logs_->journal("Collector")))

// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
, m_jobQueue(std::make_unique<JobQueue>(
m_collectorManager->group("jobq"),
*this,
logs_->journal("JobQueue"),
*logs_,
*perfLog_))

, m_nodeStoreScheduler(*m_jobQueue)

, m_shaMapStore(make_SHAMapStore(
*this,
*this,
Expand All @@ -304,26 +320,13 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
stopwatch(),
logs_->journal("TaggedCache"))

, m_collectorManager(CollectorManager::New(
config_->section(SECTION_INSIGHT),
logs_->journal("Collector")))
, cachedSLEs_(std::chrono::minutes(1), stopwatch())
, validatorKeys_(*config_, m_journal)

, m_resourceManager(Resource::make_Manager(
m_collectorManager->collector(),
logs_->journal("Resource")))

// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
, m_jobQueue(std::make_unique<JobQueue>(
m_collectorManager->group("jobq"),
m_nodeStoreScheduler,
logs_->journal("JobQueue"),
*logs_,
*perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))

, nodeFamily_(*this, *m_collectorManager)
Expand All @@ -336,7 +339,7 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
4,
logs_->journal("ShardStore")))

, m_orderBookDB(*this, *m_jobQueue)
, m_orderBookDB(*this)

, m_pathRequests(std::make_unique<PathRequests>(
*this,
Expand Down Expand Up @@ -457,7 +460,7 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
std::chrono::milliseconds(100),
get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this, *m_jobQueue))
, reportingETL_(std::make_unique<ReportingETL>(*this, *m_ledgerMaster))
, reportingETL_(std::make_unique<ReportingETL>(*this, *this))
{
add(m_resourceManager.get());

Expand All @@ -476,9 +479,6 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
// started in this constructor.
//

// VFALCO HACK
m_nodeStoreScheduler.setJobQueue(*m_jobQueue);

thejohnfreeman marked this conversation as resolved.
Show resolved Hide resolved
add(m_ledgerMaster->getPropertySource());
}

Expand All @@ -498,6 +498,8 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
checkSigs() const override;
void
checkSigs(bool) override;
bool
isStopping() const override;
int
fdRequired() const override;

Expand Down Expand Up @@ -1768,6 +1770,12 @@ ApplicationImp::checkSigs(bool check)
checkSigs_ = check;
}

bool
ApplicationImp::isStopping() const
{
return Stoppable::isStopping();
}

int
ApplicationImp::fdRequired() const
{
Expand Down
2 changes: 2 additions & 0 deletions src/ripple/app/main/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class Application : public beast::PropertyStream::Source
checkSigs() const = 0;
virtual void
checkSigs(bool) = 0;
virtual bool
isStopping() const = 0;

//
// ---
Expand Down
39 changes: 14 additions & 25 deletions src/ripple/app/main/LoadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ LoadManager::LoadManager(
, journal_(journal)
, deadLock_()
, armed_(false)
, stop_(false)
{
}

Expand Down Expand Up @@ -89,13 +88,15 @@ LoadManager::onStart()
void
LoadManager::onStop()
{
{
std::lock_guard<std::mutex> sl(mutex_);
thejohnfreeman marked this conversation as resolved.
Show resolved Hide resolved
stop_ = true;
// There is at most one thread waiting on this condition.
cv_.notify_all();
}
if (thread_.joinable())
{
JLOG(journal_.debug()) << "Stopping";
{
std::lock_guard sl(mutex_);
stop_ = true;
}
thread_.join();
}
stopped();
Expand All @@ -109,19 +110,22 @@ LoadManager::run()
beast::setCurrentThreadName("LoadManager");

using namespace std::chrono_literals;
using clock_type = std::chrono::system_clock;
using clock_type = std::chrono::steady_clock;
nbougalis marked this conversation as resolved.
Show resolved Hide resolved

auto t = clock_type::now();
bool stop = false;

while (!(stop || isStopping()))
while (true)
{
{
// Copy out shared data under a lock. Use copies outside lock.
t += 1s;
std::unique_lock<std::mutex> sl(mutex_);
if (cv_.wait_until(sl, t, [this] { return stop_; }))
{
break;
}
// Copy out shared data under a lock. Use copies outside lock.
auto const deadLock = deadLock_;
auto const armed = armed_;
stop = stop_;
sl.unlock();

// Measure the amount of time we have been deadlocked, in seconds.
Expand Down Expand Up @@ -192,22 +196,7 @@ LoadManager::run()
// subscribe in NetworkOPs or Application.
app_.getOPs().reportFeeChange();
}

t += 1s;
auto const duration = t - clock_type::now();

if ((duration < 0s) || (duration > 1s))
{
JLOG(journal_.warn()) << "time jump";
t = clock_type::now();
}
else
{
alertable_sleep_until(t);
}
}

stopped();
}

//------------------------------------------------------------------------------
Expand Down
Loading