From c7a0522cd0f0548fd1001635b7c35033f038d294 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Wed, 26 Jul 2023 16:33:48 -0700 Subject: [PATCH 1/2] Fix background atomic flush hang --- db/db_flush_test.cc | 131 +++++++++++++++++++++++++ db/db_impl/db_impl.cc | 1 + db/db_impl/db_impl_compaction_flush.cc | 7 +- db/error_handler.cc | 2 + db/flush_job.cc | 1 + 5 files changed, 140 insertions(+), 2 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index d10bd3180cc..e6e3f9f7337 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -3150,6 +3150,137 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +// Test fix for this bug: +// A chain of events in a closing scenario would make Close() call hang when +// atomic_flush = true. +// The factors to reproduce the hang: +// 1) An earlier flush job encounters a retryable error, as a result, fails to +// install flush result, and at the same time, spawns a recovery thread. +// 2) A concurrently running flush job only flushing newer memtables +// successfully finish flushing and waiting to install results. +// 3) Shutting down DB, the recovery thread hangs because of this logic: +// Close() -> EndAutoRecovery() -> WaitForBackgroundWork() waits for +// background flush thread in 2). +TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + auto fault_injection_env = std::make_shared(env_); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.atomic_flush = true; + options.env = fault_injection_env.get(); + // Set a larger value than default so that RocksDB can schedule concurrent + // background flush threads. + options.max_background_jobs = 8; + options.max_write_buffer_number = 8; + CreateAndReopenWithCF({"pikachu"}, options); + + assert(2 == handles_.size()); + + WriteOptions write_opts; + write_opts.disableWAL = true; + + // Force events in this order for the test: + // Main thread, Bg Flush thread 1, Bg Flush thread 2, + // Flush(wait) + // Start + // Pick mem 0 + // Flush(no_wait) + // Start + // Pick mem 1 + // WaitToCommit_0 + // Retryable IO + // Spawn recovery + // thread + // Finish/Notify + // WaitToCommit_1 + // + // Close() + // WaitToCommit_2 + ASSERT_OK(Put(0, "a", "v_0_a", write_opts)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBAtomicFlushTest::BgThr2::Start", + "DBAtomicFlushTest::BgThr1::ProceedAfterPickMemtables"}, + {"DBAtomicFlushTest::BgThr2::WaitToInstallResults", + "DBAtomicFlushTest::MainThr::BeforeClose"}, + }); + + std::thread::id bg_flush_thr1, bg_flush_thr2; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void*) { + if (bg_flush_thr1 == std::thread::id()) { + bg_flush_thr1 = std::this_thread::get_id(); + } else if (bg_flush_thr2 == std::thread::id()) { + TEST_SYNC_POINT("DBAtomicFlushTest::BgThr2::Start"); + bg_flush_thr2 = std::this_thread::get_id(); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables", + [&](void*) { + if (std::this_thread::get_id() != bg_flush_thr1) { + return; + } + ASSERT_OK(Put(0, "a", "v_1_a", write_opts)); + + // Kick off flush job2 within the background flush thread1 to + // make sure the newer flush job only picks mem1 + FlushOptions flush_opts; + flush_opts.wait = false; + dbfull()->TEST_UnlockMutex(); + ASSERT_OK(dbfull()->Flush(flush_opts, handles_)); + TEST_SYNC_POINT("DBAtomicFlushTest::BgThr1::ProceedAfterPickMemtables"); + dbfull()->TEST_LockMutex(); + }); + + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table::AfterFileSync", [&](void* arg) { + if (std::this_thread::get_id() == bg_flush_thr1) { + auto* ptr = reinterpret_cast(arg); + assert(ptr); + IOStatus io_status = IOStatus::IOError("Injected retryable failure"); + io_status.SetRetryable(true); + *ptr = io_status; + } + }); + + int called = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) { + if (std::this_thread::get_id() == bg_flush_thr2) { + const auto* ptr = reinterpret_cast*>(arg); + assert(ptr); + if (0 == called) { + ASSERT_OK(ptr->first); + ASSERT_TRUE(ptr->second); + } else if (1 == called) { + TEST_SYNC_POINT("DBAtomicFlushTest::BgThr2::WaitToInstallResults"); + } else { + ASSERT_TRUE(ptr->first.IsShutdownInProgress()); + ASSERT_FALSE(ptr->second); + } + ++called; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Kick off flush job 1 to flush mem0. This flush job encounters the injected + // retryable IO error, finishes with IO error, and spawns a recovery thread. + ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError()); + + TEST_SYNC_POINT("DBAtomicFlushTest::MainThr::BeforeClose"); + Close(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) { Options options = GetDefaultOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0b23c3db091..b22471d4154 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -517,6 +517,7 @@ Status DBImpl::CloseHelper() { // continuing with the shutdown mutex_.Lock(); shutdown_initiated_ = true; + atomic_flush_install_cv_.SignalAll(); error_handler_.CancelErrorRecovery(); while (error_handler_.IsRecoveryInProgress()) { bg_cv_.Wait(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8e4f7068214..5ca7a992f86 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -579,7 +579,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( pick_status[i] = true; } } - + TEST_SYNC_POINT_CALLBACK( + "DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables", nullptr); if (s.ok()) { assert(switched_to_mempurge.size() == static_cast(num_cfs)); @@ -663,7 +664,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // Something went wrong elsewhere, we cannot count on waiting for our // turn to write/sync to MANIFEST or CURRENT. Just return. return std::make_pair(versions_->io_status(), false); - } else if (shutting_down_.load(std::memory_order_acquire)) { + } else if (shutdown_initiated_.load(std::memory_order_acquire) || + shutting_down_.load(std::memory_order_acquire)) { return std::make_pair(Status::ShutdownInProgress(), false); } bool ready = true; @@ -3209,6 +3211,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { bg_flush_scheduled_--; // See if there's more work to be done MaybeScheduleFlushOrCompaction(); + atomic_flush_install_cv_.SignalAll(); bg_cv_.SignalAll(); // IMPORTANT: there should be no code after calling SignalAll. This call may diff --git a/db/error_handler.cc b/db/error_handler.cc index efadfbc802f..18553578911 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -672,6 +672,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart"); InstrumentedMutexLock l(db_mutex_); if (end_recovery_) { + recovery_in_prog_ = false; EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_, Status::ShutdownInProgress(), db_mutex_); @@ -684,6 +685,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() { // Recover from the retryable error. Create a separate thread to do it. while (resume_count > 0) { if (end_recovery_) { + recovery_in_prog_ = false; EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_, Status::ShutdownInProgress(), db_mutex_); diff --git a/db/flush_job.cc b/db/flush_job.cc index 046abcd105f..f699397e19a 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -1019,6 +1019,7 @@ Status FlushJob::WriteLevel0Table() { DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); } TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table::AfterFileSync", &s); db_mutex_->Lock(); } base_->Unref(); From 490b4d73896e28ba50b0034652c8a3caf542147c Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Fri, 22 Sep 2023 13:08:55 -0700 Subject: [PATCH 2/2] address review comments --- db/db_flush_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index e6e3f9f7337..b32605f6d59 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -3177,7 +3177,7 @@ TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) { options.max_write_buffer_number = 8; CreateAndReopenWithCF({"pikachu"}, options); - assert(2 == handles_.size()); + ASSERT_EQ(2, handles_.size()); WriteOptions write_opts; write_opts.disableWAL = true; @@ -3243,7 +3243,7 @@ TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) { SyncPoint::GetInstance()->SetCallBack( "FlushJob::WriteLevel0Table::AfterFileSync", [&](void* arg) { if (std::this_thread::get_id() == bg_flush_thr1) { - auto* ptr = reinterpret_cast(arg); + auto* ptr = static_cast(arg); assert(ptr); IOStatus io_status = IOStatus::IOError("Injected retryable failure"); io_status.SetRetryable(true); @@ -3255,7 +3255,7 @@ TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) { SyncPoint::GetInstance()->SetCallBack( "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) { if (std::this_thread::get_id() == bg_flush_thr2) { - const auto* ptr = reinterpret_cast*>(arg); + const auto* ptr = static_cast*>(arg); assert(ptr); if (0 == called) { ASSERT_OK(ptr->first);