Skip to content

Commit

Permalink
Fix a race condition during shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
nbougalis committed Jul 18, 2022
1 parent b95ca98 commit a0e70ce
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,26 @@ Database::Database(
{
std::unique_lock<std::mutex> lock(readLock_);

if (read_.empty())
// We need to check whether we're stopping again to
// avoid a race condition that can stall this thread
// during shutdown.
if (read_.empty() && !isStopping())
{
runningThreads_--;
readCondVar_.wait(lock);
runningThreads_++;
}

if (isStopping())
continue;

// If configured, extract multiple object at a time to
// minimize the overhead of acquiring the mutex.
for (int cnt = 0;
!read_.empty() && cnt != requestBundle_;
++cnt)
read.insert(read_.extract(read_.begin()));
// If we are not stopping then extract multiple object
// at a time to minimize the overhead of acquiring the
// mutex.
if (!isStopping())
{
for (int cnt = 0;
!read_.empty() && cnt != requestBundle_;
++cnt)
read.insert(read_.extract(read_.begin()));
}
}

for (auto it = read.begin(); it != read.end(); ++it)
Expand Down Expand Up @@ -120,6 +124,7 @@ Database::Database(
read.clear();
}

--runningThreads_;
--readThreads_;
},
i);
Expand Down Expand Up @@ -160,15 +165,34 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept
void
Database::stop()
{
if (!readStopping_.exchange(true, std::memory_order_relaxed))
{
std::lock_guard lock(readLock_);
read_.clear();
readCondVar_.notify_all();

if (!readStopping_.exchange(true, std::memory_order_relaxed))
{
JLOG(j_.debug()) << "Clearing read queue because of stop request";
read_.clear();
readCondVar_.notify_all();
}
}

JLOG(j_.debug()) << "Waiting for stop request to complete...";

using namespace std::chrono;

auto const start = steady_clock::now();

while (readThreads_.load() != 0)
{
assert(steady_clock::now() - start < 30s);
std::this_thread::yield();
}

JLOG(j_.debug()) << "Stop request completed in "
<< duration_cast<std::chrono::milliseconds>(
steady_clock::now() - start)
.count()
<< " millseconds";
}

void
Expand All @@ -177,10 +201,16 @@ Database::asyncFetch(
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
{
// Post a read
std::lock_guard lock(readLock_);
read_[hash].emplace_back(ledgerSeq, std::move(cb));
readCondVar_.notify_one();
if (!isStopping())
{
std::lock_guard lock(readLock_);

if (!isStopping())
{
read_[hash].emplace_back(ledgerSeq, std::move(cb));
readCondVar_.notify_one();
}
}
}

void
Expand Down

0 comments on commit a0e70ce

Please sign in to comment.