Skip to content

Commit

Permalink
Temp fix for data loss caused by concurrent flush (tikv#96)
Browse files Browse the repository at this point in the history
Summary:
Temporary fix for tikv#93. Fixing it by waiting until no flush is ongoing before starting a GC job. The drawback is GC can starve if flush runs so frequent that there's no gap between flush jobs.

Test Plan:
See the new test. The test will fail with "Corruption: Missing blob file" error before the fix.
  • Loading branch information
yiwu-arbug committed Oct 16, 2019
1 parent 6a8413c commit c8c3590
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
}
}
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished");
}

void TitanDBImpl::OnCompactionCompleted(
Expand Down
6 changes: 6 additions & 0 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
{
mutex_.Unlock();
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh.get())->cfd();
db_impl_->WaitForFlushMemTable(cfd);
mutex_.Lock();
}
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), log_buffer, &shuting_down_,
Expand Down
72 changes: 72 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "blob_file_size_collector.h"
#include "db/db_impl.h"
#include "db_impl.h"
#include "db_iter.h"
#include "monitoring/statistics.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
Expand Down Expand Up @@ -1104,6 +1106,76 @@ TEST_F(TitanDBTest, GCAfterDropCF) {
Close();
}

TEST_F(TitanDBTest, GCBeforeFlushCommit) {
std::atomic<bool> is_first_flush{true};
DBImpl* db_impl = nullptr;

SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCBeforeFlushCommit:PauseInstall",
"TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"},
{"TitanDBImpl::OnFlushCompleted:Finished",
"TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}});
SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) {
if (is_first_flush) {
is_first_flush = false;
} else {
// skip waiting for the second flush.
return;
}
auto* db_mutex = db_impl->mutex();
db_mutex->Unlock();
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:PauseInstall");
Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s
db_mutex->Lock();
});

options_.create_if_missing = true;
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options_.max_background_jobs = 8;
options_.max_write_buffer_number = 4;
options_.min_blob_size = 0;
options_.merge_small_file_threshold = 1024 * 1024;
options_.disable_background_gc = true;
Open();
uint32_t cf_id = db_->DefaultColumnFamily()->GetID();

db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->Put(WriteOptions(), "foo", "v"));
// t1 will wait for the second flush complete before install super version.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitFlushPause");
// In the second flush we check if memtable has been committed, and signal
// the first flush to proceed.
ASSERT_OK(db_->Put(WriteOptions(), "bar", "v"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush");
// Set GC mark to force GC select the file.
auto blob_storage = GetBlobStorage().lock();
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(2, blob_files.size());
auto second_file = blob_files.rbegin()->second.lock();
second_file->set_gc_mark(true);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
t1.join();
// Check value after memtable committed.
std::string value;
// Before fixing the issue, this call will return
// Corruption: Missing blob file error.
ASSERT_OK(db_->Get(ReadOptions(), "bar", &value));
ASSERT_EQ("v", value);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace titandb
} // namespace rocksdb

Expand Down

0 comments on commit c8c3590

Please sign in to comment.