From 86e523cd4059e97b6e23edb4acd5312e7124cc97 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 14 May 2023 12:06:52 +0300 Subject: [PATCH 1/3] WriteController: fix for stop while shutting down Also switch to waiting a sec on the CV each time. This is required since a bg error doesn't signal the CV in the WriteController. --- db/column_family.h | 2 ++ db/db_impl/db_impl_write.cc | 29 +++++++++++++++-------------- db/db_test.cc | 6 +++++- db/db_test_util.cc | 7 +++++++ db/db_test_util.h | 2 ++ db/write_controller.cc | 14 ++++++++++---- db/write_controller.h | 2 +- 7 files changed, 42 insertions(+), 20 deletions(-) diff --git a/db/column_family.h b/db/column_family.h index 9e071d89da..e552c7d121 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -487,6 +487,8 @@ class ColumnFamilyData { const MutableCFOptions& mutable_cf_options, WriteStallCause& write_stall_cause); + void TEST_ResetWriteControllerToken() { write_controller_token_.reset(); } + private: std::unique_ptr DynamicSetupDelay( WriteController* write_controller, uint64_t compaction_needed_bytes, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 4d3f7adfd7..e6ce985ebf 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1810,13 +1810,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, mutex_.AssertHeld(); uint64_t time_delayed = 0; bool delayed = false; + bool stopped = false; { StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL, &time_delayed); // To avoid parallel timed delays (bad throttling), only support them // on the primary write queue. uint64_t delay; - // TODO: yuval - check whether db_mutex can be unlocked during GetDelay if (&write_thread == &write_thread_) { delay = write_controller_->GetDelay(immutable_db_options_.clock, num_bytes); @@ -1865,7 +1865,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); } - delayed = true; + stopped = true; // Notify write_thread about the stall so it can setup a barrier and // fail any pending writers with no_slowdown @@ -1876,16 +1876,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait"); } mutex_.Unlock(); - - write_controller_->WaitOnCV(error_handler_); + auto continue_wait = [this]() -> bool { + return (this->error_handler_.GetBGError().ok() && + !(this->shutting_down_.load(std::memory_order_relaxed))); + }; + write_controller_->WaitOnCV(continue_wait); mutex_.Lock(); TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_); write_thread.EndWriteStall(); } } - assert(!delayed || !write_options.no_slowdown); - if (delayed) { + assert((!delayed && !stopped) || !write_options.no_slowdown); + if (delayed || stopped) { default_cf_internal_stats_->AddDBStats( InternalStats::kIntStatsWriteStallMicros, time_delayed); RecordTick(stats_, STALL_MICROS, time_delayed); @@ -1895,14 +1898,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, // writes, we can ignore any background errors and allow the write to // proceed Status s; - if (write_controller_->IsStopped()) { - if (!shutting_down_.load(std::memory_order_relaxed)) { - // If writes are still stopped and db not shutdown, it means we bailed - // due to a background error - s = Status::Incomplete(error_handler_.GetBGError().ToString()); - } else { - s = Status::ShutdownInProgress("stalled writes"); - } + if (stopped && shutting_down_.load(std::memory_order_relaxed)) { + s = Status::ShutdownInProgress("stalled writes"); + } else if (write_controller_->IsStopped()) { + // If writes are still stopped and db not shutdown, it means we bailed + // due to a background error + s = Status::Incomplete(error_handler_.GetBGError().ToString()); } if (error_handler_.IsDBStopped()) { s = error_handler_.GetBGError(); diff --git a/db/db_test.cc b/db/db_test.cc index 10c5dfa3e7..8dc0f96ba1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7340,7 +7340,7 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) { } } -TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) { +TEST_F(DBTest, ShuttingDownNotBlockStalledWrites) { Options options = CurrentOptions(); options.disable_auto_compactions = true; Reopen(options); @@ -7377,6 +7377,10 @@ TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) { TEST_SYNC_POINT("DBTest::ShuttingDownNotBlockStalledWrites"); CancelAllBackgroundWork(db_, true); + // In addition to raising the shutting_down_ flag, we need to reset the Write + // Controller tokens since only the detor of the StopWriteToken wakes up the + // condition variable which the stopped thread is waiting on. + ResetWriteControllerTokens(dbfull()); thd.join(); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 3a7cafb0cf..fd92bb7767 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1700,6 +1700,13 @@ void VerifySstUniqueIds(const TablePropertiesCollection& props) { } } +void DBTestBase::ResetWriteControllerTokens(DBImpl* db) { + auto versions = db->GetVersionSet(); + for (auto* cfd : versions->GetRefedColumnFamilySet()) { + cfd->TEST_ResetWriteControllerToken(); + } +} + template TargetCacheChargeTrackingCache::TargetCacheChargeTrackingCache( std::shared_ptr target) diff --git a/db/db_test_util.h b/db/db_test_util.h index 92996a5009..f7e1be9a2a 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1329,6 +1329,8 @@ class DBTestBase : public testing::Test { // supported void SetTimeElapseOnlySleepOnReopen(DBOptions* options); + void ResetWriteControllerTokens(DBImpl* db); + private: // Prone to error on direct use void MaybeInstallTimeElapseOnlySleep(const DBOptions& options); diff --git a/db/write_controller.cc b/db/write_controller.cc index 023eff3131..853cf0b21f 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -106,11 +107,13 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id, } } -void WriteController::WaitOnCV(const ErrorHandler& error_handler) { +void WriteController::WaitOnCV(std::function continue_wait) { std::unique_lock lock(stop_mu_); - while (error_handler.GetBGError().ok() && IsStopped()) { + while (continue_wait() && IsStopped()) { TEST_SYNC_POINT("WriteController::WaitOnCV"); - stop_cv_.wait(lock); + // need to time the wait since the stop_cv_ is not signalled if a bg error + // is raised. + stop_cv_.wait_for(lock, std::chrono::seconds(1)); } } @@ -125,7 +128,10 @@ bool WriteController::IsStopped() const { return total_stopped_.load(std::memory_order_relaxed) > 0; } -// This function is no longer under db mutex since credit_in_bytes_ is atomic +// This is inside DB mutex, so we can't sleep and need to minimize +// frequency to get time. +// If it turns out to be a performance issue, we can redesign the thread +// synchronization model here. // The function trust caller will sleep micros returned. uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) { if (total_stopped_.load(std::memory_order_relaxed) > 0) { diff --git a/db/write_controller.h b/db/write_controller.h index 095ec6d64d..846ff8812a 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -109,7 +109,7 @@ class WriteController { uint64_t TEST_GetMapMinRate(); - void WaitOnCV(const ErrorHandler& error_handler); + void WaitOnCV(std::function continue_wait); void NotifyCV(); private: From 719b4ea09504b63e800adedba25e02e82b601327 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Sun, 14 May 2023 16:45:52 +0300 Subject: [PATCH 2/3] update GetDelay comment --- db/write_controller.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/write_controller.cc b/db/write_controller.cc index 853cf0b21f..58ef1ecbf3 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -128,7 +128,7 @@ bool WriteController::IsStopped() const { return total_stopped_.load(std::memory_order_relaxed) > 0; } -// This is inside DB mutex, so we can't sleep and need to minimize +// This is inside the calling DB mutex, so we can't sleep and need to minimize // frequency to get time. // If it turns out to be a performance issue, we can redesign the thread // synchronization model here. From 37b91b6a45869e1269aea48a54fd518e934b7ec3 Mon Sep 17 00:00:00 2001 From: Yuval Ariel Date: Wed, 24 May 2023 09:49:12 +0300 Subject: [PATCH 3/3] update history --- HISTORY.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 3be41fdb17..76434b364d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,9 @@ Based on RocksDB 8.1.1 * stress test: fix decoding error (#498) * db_bench and stress: fix WBM initiation (#510) * Sanitize max_num_parallel_flushes in WBM if 0 (#460) +* WriteController: fix for stop while shutting down (#499) +Also switch to waiting a sec on the CV each time. This is required since a bg error doesn't signal the CV in the WriteController. + ### Miscellaneous * disable failing unit tests and paired bloom filter stress testing * version: update Speedb patch version to 2.4.1 (#503)