Skip to content

Commit

Permalink
Rollback other pending memtable flushes when a flush fails (#11865)
Browse files Browse the repository at this point in the history
Summary:
when atomic_flush=false, there are certain cases where we try to install memtable results with already deleted SST files. This can happen when the following sequence events happen:
```
Start Flush0 for memtable M0 to SST0
Start Flush1 for memtable M1 to SST1
Flush 1 returns OK, but don't install to MANIFEST and let whoever flushes M0 to take care of it
Flush0 finishes with a retryable IOError, it rollbacks M0, (incorrectly) does not rollback M1, and deletes SST0 and SST1
Starts Flush2 for M0, it does not pick up M1 since it thought M1 is flushed
Flush2 writes SST2 and finishes OK, tries to install SST2 and SST1
Error opening SST1 since it's already deleted with an  error message like the following:

IO error: No such file or directory: While open a file for random read: /tmp/rocksdbtest-501/db_flush_test_3577_4230653031040984171/000011.sst: No such file or directory
```

This happens since:
1. We currently only rollback the memtables that we are flushing in a flush job when atomic_flush=false.
2. Pending output SSTs from previous flushes are deleted since a pending file number is released whenever a flush job is finished no matter of flush status: https://github.com/facebook/rocksdb/blob/f42e70bf561d4be9b6bbe7316d1c2c0c8a3818e6/db/db_impl/db_impl_compaction_flush.cc#L3161

This PR fixes the issue by rollback these pending flushes.

There is another issue where if a new flush for new memtable starts and finishes after Flush0 finishes. Its output may also be deleted (see more in unit test). It is fixed by checking bg error status before installing a memtable result, and rollback if there is an error.

There is a more efficient fix where we just don't release the pending file output number for flushes that delegate installation. It is more efficient since it does not have to rewrite the flush output file. With the fix in this PR, we can end up with a giant file if a lot of memtables are being flushed together. However, the more efficient fix is a bit more complicated to implement (requires associating such pending file numbers with flush job/memtables) and is more risky since it changes normal flush code path.

Pull Request resolved: #11865

Test Plan: * Added repro unit tests.

Reviewed By: anand1976

Differential Revision: D49484922

Pulled By: cbi42

fbshipit-source-id: 25b536c08f4e02e7f1d0f86571663737d2b5d53d
  • Loading branch information
cbi42 authored and facebook-github-bot committed Sep 21, 2023
1 parent 32fc1e6 commit b927ba5
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 25 deletions.
186 changes: 186 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,192 @@ INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,

INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());

TEST_F(DBFlushTest, NonAtomicFlushRollbackPendingFlushes) {
// Fix a bug in when atomic_flush=false.
// The bug can happen as follows:
// Start Flush0 for memtable M0 to SST0
// Start Flush1 for memtable M1 to SST1
// Flush1 returns OK, but don't install to MANIFEST and let whoever flushes
// M0 to take care of it
// Flush0 finishes with a retryable IOError
// - It rollbacks M0, (incorrectly) not M1
// - Deletes SST1 and SST2
//
// Auto-recovery will start Flush2 for M0, it does not pick up M1 since it
// thinks that M1 is flushed
// Flush2 writes SST3 and finishes OK, tries to install SST3 and SST2
// Error opening SST2 since it's already deleted
//
// The fix is to let Flush0 also rollback M1.
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
// Need first flush to wait for the second flush to finish
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush mem0
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
// trigger bg flush mem1
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Wait for error recover");
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBFlushTest, AbortNonAtomicFlushWhenBGError) {
// Fix a bug in when atomic_flush=false.
// The bug can happen as follows:
// Start Flush0 for memtable M0 to SST0
// Start Flush1 for memtable M1 to SST1
// Flush1 returns OK, but doesn't install output MANIFEST and let whoever
// flushes M0 to take care of it
// Start Flush2 for memtable M2 to SST2
// Flush0 finishes with a retryable IOError
// - It rollbacks M0 AND M1
// - Deletes SST1 and SST2
// Flush2 finishes, does not rollback M2,
// - releases the pending file number that keeps SST2 alive
// - deletes SST2
//
// Then auto-recovery starts, error opening SST2 when try to install
// flush result
//
// The fix is to let Flush2 rollback M2 if it finds that
// there is a background error.
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");

TEST_SYNC_POINT("Let mem2 flush start");
TEST_SYNC_POINT("Wait for mem2 to start writing table");
}
});

SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* mems) {
autovector<MemTable*>* mems_ptr = (autovector<MemTable*>*)mems;
if ((*mems_ptr)[0]->GetID() == 3) {
TEST_SYNC_POINT("Mem2 flush starts writing table");
TEST_SYNC_POINT("Mem2 flush waits until rollback");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"Let mem2 flush start", "Mem2 flush starts"},
{"Mem2 flush starts writing table",
"Wait for mem2 to start writing table"},
{"RollbackMemtableFlush", "Mem2 flush waits until rollback"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush mem0
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
// trigger bg flush mem1
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Mem2 flush starts");
ASSERT_OK(Put(Key(4), "val4"));

TEST_SYNC_POINT("Wait for error recover");
// Recovery flush writes 3 memtables together into 1 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 1;
env_->SetBackgroundThreads(2, Env::HIGH);
DestroyAndReopen(opts);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
std::atomic_int flush_write_table_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_write_table_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
}
});

SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});

ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush0 for mem0
ASSERT_OK(Put(Key(2), "val2"));
dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError();

// trigger bg flush1 for mem1, should see bg error and abort
// before picking a memtable to flush
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Wait for error recover");
// Recovery flush writes 2 memtables together into 1 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// 1 for flush 0 and 1 for recovery flush
ASSERT_EQ(2, flush_write_table_count);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
24 changes: 20 additions & 4 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,20 @@ Status DBImpl::FlushMemTableToOutputFile(
// If the log sync failed, we do not need to pick memtable. Otherwise,
// num_flush_not_started_ needs to be rollback.
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
// Exit a flush due to bg error should not set bg error again.
bool skip_set_bg_error = false;
if (s.ok() && flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
!error_handler_.GetBGError().ok()) {
// Error recovery in progress, should not pick memtable which excludes
// them from being picked up by recovery flush.
// This ensures that when bg error is set, no new flush can pick
// memtables.
skip_set_bg_error = true;
s = error_handler_.GetBGError();
assert(!s.ok());
}

if (s.ok()) {
flush_job.PickMemTable();
need_cancel = true;
Expand All @@ -304,7 +318,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
&switched_to_mempurge);
&switched_to_mempurge, &skip_set_bg_error,
&error_handler_);
need_cancel = false;
}

Expand Down Expand Up @@ -345,7 +360,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
}

if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
!skip_set_bg_error) {
if (log_io_s.ok()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
Expand Down Expand Up @@ -634,8 +650,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].second.ok() && exec_status[i].first) {
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
cfds[i]->imm()->RollbackMemtableFlush(
mems, /*rollback_succeeding_memtables=*/false);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ const Status& ErrorHandler::StartRecoverFromRetryableBGIOError(
}

recovery_in_prog_ = true;
TEST_SYNC_POINT("StartRecoverFromRetryableBGIOError::in_progress");
recovery_thread_.reset(
new port::Thread(&ErrorHandler::RecoverFromRetryableBGIOError, this));

Expand Down
2 changes: 2 additions & 0 deletions db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ void EventHelpers::NotifyOnErrorRecoveryEnd(
info.new_bg_error.PermitUncheckedError();
}
db_mutex->Lock();
} else {
old_bg_error.PermitUncheckedError();
}
}

Expand Down
34 changes: 25 additions & 9 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ void FlushJob::PickMemTable() {
}

Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
bool* switched_to_mempurge, bool* skipped_since_bg_error,
ErrorHandler* error_handler) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
Expand Down Expand Up @@ -303,17 +304,31 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
}

if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
cfd_->imm()->RollbackMemtableFlush(
mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_,
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
assert(!db_options_.atomic_flush);
if (!db_options_.atomic_flush &&
flush_reason_ != FlushReason::kErrorRecovery &&
flush_reason_ != FlushReason::kErrorRecoveryRetryFlush &&
error_handler && !error_handler->GetBGError().ok()) {
cfd_->imm()->RollbackMemtableFlush(
mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
s = error_handler->GetBGError();
if (skipped_since_bg_error) {
*skipped_since_bg_error = true;
}
} else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_,
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
but 'false' if mempurge successful: no new min log number
or new level 0 file path to write to manifest. */);
}
}

if (s.ok() && file_meta != nullptr) {
Expand Down Expand Up @@ -965,6 +980,7 @@ Status FlushJob::WriteLevel0Table() {
&table_properties_, write_hint, full_history_ts_low,
blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
// TODO: Cleanup io_status in BuildTable and table builders
assert(!s.ok() || io_s.ok());
io_s.PermitUncheckedError();
Expand Down
7 changes: 6 additions & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ class FlushJob {
// Require db_mutex held.
// Once PickMemTable() is called, either Run() or Cancel() has to be called.
void PickMemTable();
// @param skip_since_bg_error If not nullptr and if atomic_flush=false,
// then it is set to true if flush installation is skipped and memtable
// is rolled back due to existing background error.
Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr,
bool* switched_to_mempurge = nullptr);
bool* switched_to_mempurge = nullptr,
bool* skipped_since_bg_error = nullptr,
ErrorHandler* error_handler = nullptr);
void Cancel();
const autovector<MemTable*>& GetMemTables() const { return mems_; }

Expand Down
49 changes: 41 additions & 8 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,21 +434,54 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
}

void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t /*file_number*/) {
bool rollback_succeeding_memtables) {
TEST_SYNC_POINT("RollbackMemtableFlush");
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty());

// If the flush was not successful, then just reset state.
// Maybe a succeeding attempt to flush will be successful.
#ifndef NDEBUG
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
}
#endif

if (rollback_succeeding_memtables && !mems.empty()) {
std::list<MemTable*>& memlist = current_->memlist_;
auto it = memlist.rbegin();
for (; *it != mems[0] && it != memlist.rend(); ++it) {
}
// mems should be in memlist
assert(*it == mems[0]);
if (*it == mems[0]) {
++it;
}
while (it != memlist.rend()) {
MemTable* m = *it;
// Only rollback complete, not in-progress,
// in_progress can be flushes that are still writing SSTs
if (m->flush_completed_) {
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
m->file_number_ = 0;
num_flush_not_started_++;
++it;
} else {
break;
}
}
}

m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
for (MemTable* m : mems) {
if (m->flush_in_progress_) {
assert(m->file_number_ == 0);
m->file_number_ = 0;
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
}
imm_flush_needed.store(true, std::memory_order_release);
}
Expand Down
Loading

0 comments on commit b927ba5

Please sign in to comment.