From 269945dc9ab217b724dded91de5d7d10389ca408 Mon Sep 17 00:00:00 2001 From: yuval-io <105581454+Yuval-Ariel@users.noreply.github.com> Date: Wed, 24 May 2023 09:55:27 +0300 Subject: [PATCH] WC: fix for stop while shutting down (#499) Also switch to waiting a sec on the CV each time. This is required since a bg error doesn't signal the CV in the WriteController. --- HISTORY.md | 3 +++ db/column_family.h | 2 ++ db/db_impl/db_impl_write.cc | 29 +++++++++++++++-------------- db/db_test.cc | 6 +++++- db/db_test_util.cc | 7 +++++++ db/db_test_util.h | 2 ++ db/write_controller.cc | 14 ++++++++++---- db/write_controller.h | 2 +- 8 files changed, 45 insertions(+), 20 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 79394b3b85..bf9fb29507 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,9 @@ Based on RocksDB 8.1.1 * stress test: fix decoding error (#498) * db_bench and stress: fix WBM initiation (#510) * Sanitize max_num_parallel_flushes in WBM if 0 (#460) +* WriteController: fix for stop while shutting down (#499) +Also switch to waiting a sec on the CV each time. This is required since a bg error doesn't signal the CV in the WriteController. + ### Miscellaneous * disable failing unit tests and paired bloom filter stress testing * version: update Speedb patch version to 2.4.1 (#503) diff --git a/db/column_family.h b/db/column_family.h index 3da504e35e..6dca7b451e 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -493,6 +493,8 @@ class ColumnFamilyData { const MutableCFOptions& mutable_cf_options, WriteStallCause& write_stall_cause); + void TEST_ResetWriteControllerToken() { write_controller_token_.reset(); } + private: std::unique_ptr DynamicSetupDelay( WriteController* write_controller, uint64_t compaction_needed_bytes, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index faeaf88647..1a15d3e210 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1823,13 +1823,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, mutex_.AssertHeld(); uint64_t time_delayed = 0; bool delayed = false; + bool stopped = false; { StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL, Histograms::HISTOGRAM_ENUM_MAX, &time_delayed); // To avoid parallel timed delays (bad throttling), only support them // on the primary write queue. uint64_t delay; - // TODO: yuval - check whether db_mutex can be unlocked during GetDelay if (&write_thread == &write_thread_) { delay = write_controller_->GetDelay(immutable_db_options_.clock, num_bytes); @@ -1878,7 +1878,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); } - delayed = true; + stopped = true; // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown @@ -1889,16 +1889,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait"); } mutex_.Unlock(); - - write_controller_->WaitOnCV(error_handler_); + auto continue_wait = [this]() -> bool { + return (this->error_handler_.GetBGError().ok() && + !(this->shutting_down_.load(std::memory_order_relaxed))); + }; + write_controller_->WaitOnCV(continue_wait); mutex_.Lock(); TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_); write_thread.EndWriteStall(); } } - assert(!delayed || !write_options.no_slowdown); - if (delayed) { + assert((!delayed && !stopped) || !write_options.no_slowdown); + if (delayed || stopped) { default_cf_internal_stats_->AddDBStats( InternalStats::kIntStatsWriteStallMicros, time_delayed); RecordTick(stats_, STALL_MICROS, time_delayed); @@ -1908,14 +1911,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, // writes, we can ignore any background errors and allow the write to // proceed Status s; - if (write_controller_->IsStopped()) { - if (!shutting_down_.load(std::memory_order_relaxed)) { - // If writes are still stopped and db not shutdown, it means we bailed - // due to a background error - s = Status::Incomplete(error_handler_.GetBGError().ToString()); - } else { - s = Status::ShutdownInProgress("stalled writes"); - } + if (stopped && shutting_down_.load(std::memory_order_relaxed)) { + s = Status::ShutdownInProgress("stalled writes"); + } else if (write_controller_->IsStopped()) { + // If writes are still stopped and db not shutdown, it means we bailed + // due to a background error + s = Status::Incomplete(error_handler_.GetBGError().ToString()); } if (error_handler_.IsDBStopped()) { s = error_handler_.GetBGError(); diff --git a/db/db_test.cc b/db/db_test.cc index ac3dcae723..5c7e465777 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7493,7 +7493,7 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) { } } -TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) { +TEST_F(DBTest, ShuttingDownNotBlockStalledWrites) { Options options = CurrentOptions(); options.disable_auto_compactions = true; Reopen(options); @@ -7530,6 +7530,10 @@ TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) { TEST_SYNC_POINT("DBTest::ShuttingDownNotBlockStalledWrites"); CancelAllBackgroundWork(db_, true); + // In addition to raising the shutting_down_ flag, we need to reset the Write + // Controller tokens since only the detor of the StopWriteToken wakes up the + // condition variable which the stopped thread is waiting on. + ResetWriteControllerTokens(dbfull()); thd.join(); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 1c91b2e547..d889a660d8 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1732,6 +1732,13 @@ void VerifySstUniqueIds(const TablePropertiesCollection& props) { } } +void DBTestBase::ResetWriteControllerTokens(DBImpl* db) { + auto versions = db->GetVersionSet(); + for (auto* cfd : versions->GetRefedColumnFamilySet()) { + cfd->TEST_ResetWriteControllerToken(); + } +} + template TargetCacheChargeTrackingCache::TargetCacheChargeTrackingCache( std::shared_ptr target) diff --git a/db/db_test_util.h b/db/db_test_util.h index 438c0a4d76..652e0871ec 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1377,6 +1377,8 @@ class DBTestBase : public testing::Test { &tp->index_key_is_user_key, &tp->index_value_is_delta_encoded, &tp->index_size, &tp->filter_size); } + + void ResetWriteControllerTokens(DBImpl* db); private: // Prone to error on direct use void MaybeInstallTimeElapseOnlySleep(const DBOptions& options); diff --git a/db/write_controller.cc b/db/write_controller.cc index 023eff3131..58ef1ecbf3 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -106,11 +107,13 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id, } } -void WriteController::WaitOnCV(const ErrorHandler& error_handler) { +void WriteController::WaitOnCV(std::function continue_wait) { std::unique_lock lock(stop_mu_); - while (error_handler.GetBGError().ok() && IsStopped()) { + while (continue_wait() && IsStopped()) { TEST_SYNC_POINT("WriteController::WaitOnCV"); - stop_cv_.wait(lock); + // need to time the wait since the stop_cv_ is not signalled if a bg error + // is raised. + stop_cv_.wait_for(lock, std::chrono::seconds(1)); } } @@ -125,7 +128,10 @@ bool WriteController::IsStopped() const { return total_stopped_.load(std::memory_order_relaxed) > 0; } -// This function is no longer under db mutex since credit_in_bytes_ is atomic +// This is inside the calling DB mutex, so we can't sleep and need to minimize +// frequency to get time. +// If it turns out to be a performance issue, we can redesign the thread +// synchronization model here. // The function trust caller will sleep micros returned. uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) { if (total_stopped_.load(std::memory_order_relaxed) > 0) { diff --git a/db/write_controller.h b/db/write_controller.h index 095ec6d64d..846ff8812a 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -109,7 +109,7 @@ class WriteController { uint64_t TEST_GetMapMinRate(); - void WaitOnCV(const ErrorHandler& error_handler); + void WaitOnCV(std::function continue_wait); void NotifyCV(); private: