diff --git a/src/db_impl.cc b/src/db_impl.cc index da1dac1aa..b663225c5 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -561,12 +561,17 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock(); mutex_.Unlock(); - { + if (storage) { StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS); s = storage->Get(options, index, &record, &buffer); RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ); RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ, index.blob_handle.size); + } else { + ROCKS_LOG_ERROR(db_options_.info_log, + "Column family id:%" PRIu32 " not Found.", handle->GetID()); + return Status::NotFound( + "Column family id: " + std::to_string(handle->GetID()) + " not Found."); } if (s.IsCorruption()) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -633,15 +638,20 @@ Iterator* TitanDBImpl::NewIteratorImpl( auto cfd = reinterpret_cast(handle)->cfd(); mutex_.Lock(); - auto storage = blob_file_set_->GetBlobStorage(handle->GetID()); + auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock(); mutex_.Unlock(); + if (!storage) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Column family id:%" PRIu32 " not Found.", handle->GetID()); + return nullptr; + } + std::unique_ptr iter(db_impl_->NewIteratorImpl( options, cfd, options.snapshot->GetSequenceNumber(), nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/)); - return new TitanDBIterator(options, storage.lock().get(), snapshot, - std::move(iter), env_, stats_.get(), - db_options_.info_log.get()); + return new TitanDBIterator(options, storage.get(), snapshot, std::move(iter), + env_, stats_.get(), db_options_.info_log.get()); } Status TitanDBImpl::NewIterators( diff --git a/src/db_impl.h b/src/db_impl.h index 1fbb422f0..01ae43558 100644 --- a/src/db_impl.h +++ b/src/db_impl.h @@ -187,7 +187,7 @@ class TitanDBImpl : public TitanDB { static void BGWorkGC(void* db); void BackgroundCallGC(); - Status BackgroundGC(LogBuffer* log_buffer); + Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id); void PurgeObsoleteFiles(); Status PurgeObsoleteFilesImpl(); diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 6a15032c5..0360d4aeb 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -29,9 +29,7 @@ void TitanDBImpl::BGWorkGC(void* db) { } void TitanDBImpl::BackgroundCallGC() { - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning"); - { MutexLock l(&mutex_); assert(bg_gc_scheduled_ > 0); @@ -41,13 +39,17 @@ void TitanDBImpl::BackgroundCallGC() { bg_gc_running_++; TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC"); - BackgroundGC(&log_buffer); - - { - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - LogFlush(db_options_.info_log.get()); - mutex_.Lock(); + if (!gc_queue_.empty()) { + uint32_t column_family_id = PopFirstFromGCQueue(); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + db_options_.info_log.get()); + BackgroundGC(&log_buffer, column_family_id); + { + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(db_options_.info_log.get()); + mutex_.Lock(); + } } bg_gc_running_--; @@ -67,36 +69,35 @@ void TitanDBImpl::BackgroundCallGC() { } } -Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { +Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, + uint32_t column_family_id) { mutex_.AssertHeld(); StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS); std::unique_ptr blob_gc; std::unique_ptr cfh; Status s; - if (!gc_queue_.empty()) { - uint32_t column_family_id = PopFirstFromGCQueue(); - std::shared_ptr blob_storage; - // Skip CFs that have been dropped. - if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { - blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock(); - } else { - TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr); - ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].", - cf_info_[column_family_id].name.c_str()); - } - if (blob_storage != nullptr) { - const auto& cf_options = blob_storage->cf_options(); - std::shared_ptr blob_gc_picker = - std::make_shared(db_options_, cf_options, - stats_.get()); - blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get()); - - if (blob_gc) { - cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - assert(column_family_id == cfh->GetID()); - blob_gc->SetColumnFamily(cfh.get()); - } + + std::shared_ptr blob_storage; + // Skip CFs that have been dropped. + if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { + blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock(); + } else { + TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr); + ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].", + cf_info_[column_family_id].name.c_str()); + } + if (blob_storage != nullptr) { + const auto& cf_options = blob_storage->cf_options(); + std::shared_ptr blob_gc_picker = + std::make_shared(db_options_, cf_options, + stats_.get()); + blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get()); + + if (blob_gc) { + cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); + assert(column_family_id == cfh->GetID()); + blob_gc->SetColumnFamily(cfh.get()); } } @@ -146,7 +147,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { return s; } -// TODO(yiwu): merge with BackgroundGC(). Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { // BackgroundCallGC Status s; @@ -160,54 +160,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { bg_gc_running_++; bg_gc_scheduled_++; - // BackgroudGC - StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS); - - std::unique_ptr blob_gc; - std::unique_ptr cfh; - - if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { - return Status::ColumnFamilyDropped( - "Column Family has been dropped before GC."); - } - auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get(); - const auto& cf_options = bs->cf_options(); - std::shared_ptr blob_gc_picker = - std::make_shared(db_options_, cf_options, nullptr); - blob_gc = blob_gc_picker->PickBlobGC(bs); - - if (blob_gc) { - cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - assert(column_family_id == cfh->GetID()); - blob_gc->SetColumnFamily(cfh.get()); - } - - if (UNLIKELY(!blob_gc)) { - ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do"); - } else { - BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, - env_options_, blob_manager_.get(), - blob_file_set_.get(), &log_buffer, &shuting_down_, - stats_.get()); - s = blob_gc_job.Prepare(); - - if (s.ok()) { - mutex_.Unlock(); - s = blob_gc_job.Run(); - mutex_.Lock(); - } - - if (s.ok()) { - s = blob_gc_job.Finish(); - } - - blob_gc->ReleaseGcFiles(); - } - - if (!s.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s", - s.ToString().c_str()); - } + s = BackgroundGC(&log_buffer, column_family_id); { mutex_.Unlock(); diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index c002843da..7cc9b086b 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -153,6 +153,10 @@ class TitanDBTest : public testing::Test { return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID()); } + ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) { + return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release(); + } + void VerifyDB(const std::map& data, ReadOptions ropts = ReadOptions()) { db_impl_->PurgeObsoleteFiles(); @@ -519,6 +523,35 @@ TEST_F(TitanDBTest, DropColumnFamily) { Close(); } +TEST_F(TitanDBTest, DestroyColumnFamilyHandle) { + Open(); + const uint64_t kNumCF = 3; + for (uint64_t i = 1; i <= kNumCF; i++) { + AddCF(std::to_string(i)); + } + const uint64_t kNumEntries = 10; + std::map data; + for (uint64_t i = 1; i <= kNumEntries; i++) { + Put(i, &data); + } + VerifyDB(data); + Flush(); + VerifyDB(data); + + // Destroy column families handle, check whether GC skips the column families. + for (auto& handle : cf_handles_) { + auto cf_id = handle->GetID(); + db_->DestroyColumnFamilyHandle(handle); + ASSERT_OK(db_impl_->TEST_StartGC(cf_id)); + } + cf_handles_.clear(); + VerifyDB(data); + + Reopen(); + VerifyDB(data); + Close(); +} + TEST_F(TitanDBTest, DeleteFilesInRange) { Open();