From dd17ed8960da7a402568864b8f6a1b371ab969ff Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 8 Oct 2019 20:21:59 +0000 Subject: [PATCH 1/2] Reproduce data loss with concurrent memtable flush and GC Signed-off-by: Yi Wu wait flush before gc Signed-off-by: Yi Wu Fix hot fix Signed-off-by: Yi Wu change to flush_hotfix branch Signed-off-by: Yi Wu update sync point name Signed-off-by: Yi Wu also apply fix to TEST_StartGC Signed-off-by: Yi Wu change to flush_hotfix_6.4 branch Signed-off-by: Yi Wu update test Signed-off-by: Yi Wu --- CMakeLists.txt | 4 +-- src/db_impl.cc | 1 + src/db_impl_gc.cc | 12 ++++++++ src/titan_db_test.cc | 72 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2418486cc..d331f6d77 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,11 +5,11 @@ enable_language(C) find_package(Git) if (NOT ROCKSDB_GIT_REPO) - set(ROCKSDB_GIT_REPO "https://github.com/pingcap/rocksdb.git") + set(ROCKSDB_GIT_REPO "https://github.com/yiwu-arbug/rocksdb.git") endif() if (NOT ROCKSDB_GIT_BRANCH) - set(ROCKSDB_GIT_BRANCH "6.4.tikv") + set(ROCKSDB_GIT_BRANCH "flush_hotfix_6.4") endif() if (NOT DEFINED ROCKSDB_DIR) diff --git a/src/db_impl.cc b/src/db_impl.cc index da1dac1aa..e9fb46879 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -1007,6 +1007,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted); } } + TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished"); } void TitanDBImpl::OnCompactionCompleted( diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 6a15032c5..0c0dfc2f2 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -106,6 +106,12 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { // Nothing to do ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); } else { + { + mutex_.Unlock(); + auto* cfd = reinterpret_cast(cfh.get())->cfd(); + db_impl_->WaitForFlushMemTable(cfd); + mutex_.Lock(); + } BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, env_options_, blob_manager_.get(), blob_file_set_.get(), log_buffer, &shuting_down_, @@ -185,6 +191,12 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { if (UNLIKELY(!blob_gc)) { ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do"); } else { + { + mutex_.Unlock(); + auto* cfd = reinterpret_cast(cfh.get())->cfd(); + db_impl_->WaitForFlushMemTable(cfd); + mutex_.Lock(); + } BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, env_options_, blob_manager_.get(), blob_file_set_.get(), &log_buffer, &shuting_down_, diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index c002843da..782519c0e 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -2,7 +2,9 @@ #include #include +#include "db/db_impl/db_impl.h" #include "file/filename.h" +#include "port/port.h" #include "rocksdb/utilities/debug.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -1072,6 +1074,76 @@ TEST_F(TitanDBTest, GCAfterDropCF) { Close(); } +TEST_F(TitanDBTest, GCBeforeFlushCommit) { + std::atomic is_first_flush{true}; + DBImpl* db_impl = nullptr; + + SyncPoint::GetInstance()->LoadDependency( + {{"TitanDBTest::GCBeforeFlushCommit:PauseInstall", + "TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"}, + {"TitanDBImpl::OnFlushCompleted:Finished", + "TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}}); + SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) { + if (is_first_flush) { + is_first_flush = false; + } else { + // skip waiting for the second flush. + return; + } + auto* db_mutex = db_impl->mutex(); + db_mutex->Unlock(); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:PauseInstall"); + Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s + db_mutex->Lock(); + }); + + options_.create_if_missing = true; + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options_.max_background_jobs = 8; + options_.max_write_buffer_number = 4; + options_.min_blob_size = 0; + options_.merge_small_file_threshold = 1024 * 1024; + options_.disable_background_gc = true; + Open(); + uint32_t cf_id = db_->DefaultColumnFamily()->GetID(); + + db_impl = reinterpret_cast(db_->GetRootDB()); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v")); + // t1 will wait for the second flush complete before install super version. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"); + // In the second flush we check if memtable has been committed, and signal + // the first flush to proceed. + ASSERT_OK(db_->Put(WriteOptions(), "bar", "v")); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"); + // Set GC mark to force GC select the file. + auto blob_storage = GetBlobStorage().lock(); + std::map> blob_files; + blob_storage->ExportBlobFiles(blob_files); + ASSERT_EQ(2, blob_files.size()); + auto second_file = blob_files.rbegin()->second.lock(); + second_file->set_gc_mark(true); + ASSERT_OK(db_impl_->TEST_StartGC(cf_id)); + ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles()); + t1.join(); + // Check value after memtable committed. + std::string value; + // Before fixing the issue, this call will return + // Corruption: Missing blob file error. + ASSERT_OK(db_->Get(ReadOptions(), "bar", &value)); + ASSERT_EQ("v", value); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace titandb } // namespace rocksdb From a52439ebd22bdaae14636ac074ec333a54d2d599 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 16 Oct 2019 03:05:57 +0000 Subject: [PATCH 2/2] revert CMakeLists.txt Signed-off-by: Yi Wu --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d331f6d77..2418486cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,11 +5,11 @@ enable_language(C) find_package(Git) if (NOT ROCKSDB_GIT_REPO) - set(ROCKSDB_GIT_REPO "https://github.com/yiwu-arbug/rocksdb.git") + set(ROCKSDB_GIT_REPO "https://github.com/pingcap/rocksdb.git") endif() if (NOT ROCKSDB_GIT_BRANCH) - set(ROCKSDB_GIT_BRANCH "flush_hotfix_6.4") + set(ROCKSDB_GIT_BRANCH "6.4.tikv") endif() if (NOT DEFINED ROCKSDB_DIR)