diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 15aad0a02a3..ba07fa8a5e1 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -73,22 +73,26 @@ Database::Database( { std::unique_lock lock(readLock_); - if (read_.empty()) + // We need to check whether we're stopping again to + // avoid a race condition that can stall this thread + // during shutdown. + if (read_.empty() && !isStopping()) { runningThreads_--; readCondVar_.wait(lock); runningThreads_++; } - if (isStopping()) - continue; - - // If configured, extract multiple object at a time to - // minimize the overhead of acquiring the mutex. - for (int cnt = 0; - !read_.empty() && cnt != requestBundle_; - ++cnt) - read.insert(read_.extract(read_.begin())); + // If we are not stopping then extract multiple object + // at a time to minimize the overhead of acquiring the + // mutex. + if (!isStopping()) + { + for (int cnt = 0; + !read_.empty() && cnt != requestBundle_; + ++cnt) + read.insert(read_.extract(read_.begin())); + } } for (auto it = read.begin(); it != read.end(); ++it) @@ -120,6 +124,7 @@ Database::Database( read.clear(); } + --runningThreads_; --readThreads_; }, i); @@ -160,15 +165,34 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept void Database::stop() { - if (!readStopping_.exchange(true, std::memory_order_relaxed)) { std::lock_guard lock(readLock_); - read_.clear(); - readCondVar_.notify_all(); + + if (!readStopping_.exchange(true, std::memory_order_relaxed)) + { + JLOG(j_.debug()) << "Clearing read queue because of stop request"; + read_.clear(); + readCondVar_.notify_all(); + } } + JLOG(j_.debug()) << "Waiting for stop request to complete..."; + + using namespace std::chrono; + + auto const start = steady_clock::now(); + while (readThreads_.load() != 0) + { + assert(steady_clock::now() - start < 30s); std::this_thread::yield(); + } + + JLOG(j_.debug()) << "Stop request completed in " + << duration_cast( + steady_clock::now() - start) + .count() + << " millseconds"; } void @@ -177,10 +201,16 @@ Database::asyncFetch( std::uint32_t ledgerSeq, std::function const&)>&& cb) { - // Post a read - std::lock_guard lock(readLock_); - read_[hash].emplace_back(ledgerSeq, std::move(cb)); - readCondVar_.notify_one(); + if (!isStopping()) + { + std::lock_guard lock(readLock_); + + if (!isStopping()) + { + read_[hash].emplace_back(ledgerSeq, std::move(cb)); + readCondVar_.notify_one(); + } + } } void