Skip to content

Commit

Permalink
merge BackgroundGC with TEST_StartGC (#94)
Browse files Browse the repository at this point in the history
* merge TEST_StartGC with BackgroundGC

Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 committed Oct 16, 2019
1 parent b9915d9 commit 8ac5003
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 88 deletions.
20 changes: 15 additions & 5 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,17 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock();

{
if (storage) {
StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return Status::NotFound(
"Column family id: " + std::to_string(handle->GetID()) + " not Found.");
}
if (s.IsCorruption()) {
ROCKS_LOG_ERROR(db_options_.info_log,
Expand Down Expand Up @@ -633,15 +638,20 @@ Iterator* TitanDBImpl::NewIteratorImpl(
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();

mutex_.Lock();
auto storage = blob_file_set_->GetBlobStorage(handle->GetID());
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock();

if (!storage) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return nullptr;
}

std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter), env_, stats_.get(),
db_options_.info_log.get());
return new TitanDBIterator(options, storage.get(), snapshot, std::move(iter),
env_, stats_.get(), db_options_.info_log.get());
}

Status TitanDBImpl::NewIterators(
Expand Down
2 changes: 1 addition & 1 deletion src/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TitanDBImpl : public TitanDB {

static void BGWorkGC(void* db);
void BackgroundCallGC();
Status BackgroundGC(LogBuffer* log_buffer);
Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id);

void PurgeObsoleteFiles();
Status PurgeObsoleteFilesImpl();
Expand Down
117 changes: 35 additions & 82 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ void TitanDBImpl::BGWorkGC(void* db) {
}

void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");

{
MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0);
Expand All @@ -41,13 +39,17 @@ void TitanDBImpl::BackgroundCallGC() {
bg_gc_running_++;

TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC");
BackgroundGC(&log_buffer);

{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
db_options_.info_log.get());
BackgroundGC(&log_buffer, column_family_id);
{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
}
}

bg_gc_running_--;
Expand All @@ -67,36 +69,35 @@ void TitanDBImpl::BackgroundCallGC() {
}
}

Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
uint32_t column_family_id) {
mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);

std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
stats_.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}

std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
stats_.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}
}

Expand Down Expand Up @@ -146,7 +147,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
return s;
}

// TODO(yiwu): merge with BackgroundGC().
Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
// BackgroundCallGC
Status s;
Expand All @@ -160,54 +160,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
bg_gc_running_++;
bg_gc_scheduled_++;

// BackgroudGC
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);

std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;

if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
return Status::ColumnFamilyDropped(
"Column Family has been dropped before GC.");
}
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options, nullptr);
blob_gc = blob_gc_picker->PickBlobGC(bs);

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}

if (UNLIKELY(!blob_gc)) {
ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), &log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();

if (s.ok()) {
mutex_.Unlock();
s = blob_gc_job.Run();
mutex_.Lock();
}

if (s.ok()) {
s = blob_gc_job.Finish();
}

blob_gc->ReleaseGcFiles();
}

if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
s = BackgroundGC(&log_buffer, column_family_id);

{
mutex_.Unlock();
Expand Down
33 changes: 33 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ class TitanDBTest : public testing::Test {
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
}

ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) {
return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release();
}

void VerifyDB(const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles();
Expand Down Expand Up @@ -519,6 +523,35 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Close();
}

TEST_F(TitanDBTest, DestroyColumnFamilyHandle) {
Open();
const uint64_t kNumCF = 3;
for (uint64_t i = 1; i <= kNumCF; i++) {
AddCF(std::to_string(i));
}
const uint64_t kNumEntries = 10;
std::map<std::string, std::string> data;
for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data);
}
VerifyDB(data);
Flush();
VerifyDB(data);

// Destroy column families handle, check whether GC skips the column families.
for (auto& handle : cf_handles_) {
auto cf_id = handle->GetID();
db_->DestroyColumnFamilyHandle(handle);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
}
cf_handles_.clear();
VerifyDB(data);

Reopen();
VerifyDB(data);
Close();
}

TEST_F(TitanDBTest, DeleteFilesInRange) {
Open();

Expand Down

0 comments on commit 8ac5003

Please sign in to comment.