Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a hanging DB::Close scenario for atomic flush #11867

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3150,6 +3150,137 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}

// Test fix for this bug:
// A chain of events in a closing scenario would make Close() call hang when
// atomic_flush = true.
// The factors to reproduce the hang:
// 1) An earlier flush job encounters a retryable error, as a result, fails to
// install flush result, and at the same time, spawns a recovery thread.
// 2) A concurrently running flush job only flushing newer memtables
// successfully finish flushing and waiting to install results.
// 3) Shutting down DB, the recovery thread hangs because of this logic:
// Close() -> EndAutoRecovery() -> WaitForBackgroundWork() waits for
// background flush thread in 2).
TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.atomic_flush = true;
options.env = fault_injection_env.get();
// Set a larger value than default so that RocksDB can schedule concurrent
// background flush threads.
options.max_background_jobs = 8;
options.max_write_buffer_number = 8;
CreateAndReopenWithCF({"pikachu"}, options);

ASSERT_EQ(2, handles_.size());

WriteOptions write_opts;
write_opts.disableWAL = true;

// Force events in this order for the test:
// Main thread, Bg Flush thread 1, Bg Flush thread 2,
// Flush(wait)
// Start
// Pick mem 0
// Flush(no_wait)
// Start
// Pick mem 1
// WaitToCommit_0
// Retryable IO
// Spawn recovery
// thread
// Finish/Notify
// WaitToCommit_1
//
// Close()
// WaitToCommit_2
ASSERT_OK(Put(0, "a", "v_0_a", write_opts));

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

SyncPoint::GetInstance()->LoadDependency({
{"DBAtomicFlushTest::BgThr2::Start",
"DBAtomicFlushTest::BgThr1::ProceedAfterPickMemtables"},
{"DBAtomicFlushTest::BgThr2::WaitToInstallResults",
"DBAtomicFlushTest::MainThr::BeforeClose"},
});

std::thread::id bg_flush_thr1, bg_flush_thr2;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void*) {
if (bg_flush_thr1 == std::thread::id()) {
bg_flush_thr1 = std::this_thread::get_id();
} else if (bg_flush_thr2 == std::thread::id()) {
TEST_SYNC_POINT("DBAtomicFlushTest::BgThr2::Start");
bg_flush_thr2 = std::this_thread::get_id();
}
});

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables",
[&](void*) {
if (std::this_thread::get_id() != bg_flush_thr1) {
return;
}
ASSERT_OK(Put(0, "a", "v_1_a", write_opts));

// Kick off flush job2 within the background flush thread1 to
// make sure the newer flush job only picks mem1
FlushOptions flush_opts;
flush_opts.wait = false;
dbfull()->TEST_UnlockMutex();
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
TEST_SYNC_POINT("DBAtomicFlushTest::BgThr1::ProceedAfterPickMemtables");
dbfull()->TEST_LockMutex();
});

SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table::AfterFileSync", [&](void* arg) {
if (std::this_thread::get_id() == bg_flush_thr1) {
auto* ptr = static_cast<Status*>(arg);
assert(ptr);
IOStatus io_status = IOStatus::IOError("Injected retryable failure");
io_status.SetRetryable(true);
*ptr = io_status;
}
});

int called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
if (std::this_thread::get_id() == bg_flush_thr2) {
const auto* ptr = static_cast<std::pair<Status, bool>*>(arg);
assert(ptr);
if (0 == called) {
ASSERT_OK(ptr->first);
ASSERT_TRUE(ptr->second);
} else if (1 == called) {
TEST_SYNC_POINT("DBAtomicFlushTest::BgThr2::WaitToInstallResults");
} else {
ASSERT_TRUE(ptr->first.IsShutdownInProgress());
ASSERT_FALSE(ptr->second);
}
++called;
}
});
SyncPoint::GetInstance()->EnableProcessing();

// Kick off flush job 1 to flush mem0. This flush job encounters the injected
// retryable IO error, finishes with IO error, and spawns a recovery thread.
ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());

TEST_SYNC_POINT("DBAtomicFlushTest::MainThr::BeforeClose");
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ Status DBImpl::CloseHelper() {
// continuing with the shutdown
mutex_.Lock();
shutdown_initiated_ = true;
atomic_flush_install_cv_.SignalAll();
error_handler_.CancelErrorRecovery();
while (error_handler_.IsRecoveryInProgress()) {
bg_cv_.Wait();
Expand Down
7 changes: 5 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
pick_status[i] = true;
}
}

TEST_SYNC_POINT_CALLBACK(
"DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables", nullptr);
if (s.ok()) {
assert(switched_to_mempurge.size() ==
static_cast<long unsigned int>(num_cfs));
Expand Down Expand Up @@ -663,7 +664,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Something went wrong elsewhere, we cannot count on waiting for our
// turn to write/sync to MANIFEST or CURRENT. Just return.
return std::make_pair(versions_->io_status(), false);
} else if (shutting_down_.load(std::memory_order_acquire)) {
} else if (shutdown_initiated_.load(std::memory_order_acquire) ||
shutting_down_.load(std::memory_order_acquire)) {
return std::make_pair(Status::ShutdownInProgress(), false);
}
bool ready = true;
Expand Down Expand Up @@ -3209,6 +3211,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();

atomic_flush_install_cv_.SignalAll();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
Expand Down
2 changes: 2 additions & 0 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart");
InstrumentedMutexLock l(db_mutex_);
if (end_recovery_) {
recovery_in_prog_ = false;
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_,
Status::ShutdownInProgress(),
db_mutex_);
Expand All @@ -684,6 +685,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
// Recover from the retryable error. Create a separate thread to do it.
while (resume_count > 0) {
if (end_recovery_) {
recovery_in_prog_ = false;
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_,
Status::ShutdownInProgress(),
db_mutex_);
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ Status FlushJob::WriteLevel0Table() {
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table::AfterFileSync", &s);
db_mutex_->Lock();
}
base_->Unref();
Expand Down