diff --git a/src/db_impl.cc b/src/db_impl.cc index b663225c5..11903f2f1 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -1017,6 +1017,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted); } } + TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished"); } void TitanDBImpl::OnCompactionCompleted( diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 0360d4aeb..b27d94ea5 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -107,6 +107,12 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, // Nothing to do ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); } else { + { + mutex_.Unlock(); + auto* cfd = reinterpret_cast(cfh.get())->cfd(); + db_impl_->WaitForFlushMemTable(cfd); + mutex_.Lock(); + } BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, env_options_, blob_manager_.get(), blob_file_set_.get(), log_buffer, &shuting_down_, diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index 7cc9b086b..2e498d549 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -2,7 +2,9 @@ #include #include +#include "db/db_impl/db_impl.h" #include "file/filename.h" +#include "port/port.h" #include "rocksdb/utilities/debug.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -1105,6 +1107,76 @@ TEST_F(TitanDBTest, GCAfterDropCF) { Close(); } +TEST_F(TitanDBTest, GCBeforeFlushCommit) { + std::atomic is_first_flush{true}; + DBImpl* db_impl = nullptr; + + SyncPoint::GetInstance()->LoadDependency( + {{"TitanDBTest::GCBeforeFlushCommit:PauseInstall", + "TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"}, + {"TitanDBImpl::OnFlushCompleted:Finished", + "TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}}); + SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) { + if (is_first_flush) { + is_first_flush = false; + } else { + // skip waiting for the second flush. + return; + } + auto* db_mutex = db_impl->mutex(); + db_mutex->Unlock(); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:PauseInstall"); + Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s + db_mutex->Lock(); + }); + + options_.create_if_missing = true; + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options_.max_background_jobs = 8; + options_.max_write_buffer_number = 4; + options_.min_blob_size = 0; + options_.merge_small_file_threshold = 1024 * 1024; + options_.disable_background_gc = true; + Open(); + uint32_t cf_id = db_->DefaultColumnFamily()->GetID(); + + db_impl = reinterpret_cast(db_->GetRootDB()); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v")); + // t1 will wait for the second flush complete before install super version. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"); + // In the second flush we check if memtable has been committed, and signal + // the first flush to proceed. + ASSERT_OK(db_->Put(WriteOptions(), "bar", "v")); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"); + // Set GC mark to force GC select the file. + auto blob_storage = GetBlobStorage().lock(); + std::map> blob_files; + blob_storage->ExportBlobFiles(blob_files); + ASSERT_EQ(2, blob_files.size()); + auto second_file = blob_files.rbegin()->second.lock(); + second_file->set_gc_mark(true); + ASSERT_OK(db_impl_->TEST_StartGC(cf_id)); + ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles()); + t1.join(); + // Check value after memtable committed. + std::string value; + // Before fixing the issue, this call will return + // Corruption: Missing blob file error. + ASSERT_OK(db_->Get(ReadOptions(), "bar", &value)); + ASSERT_EQ("v", value); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace titandb } // namespace rocksdb