Skip to content

Commit

Permalink
[PCP-21] Titan GC doesn’t affect online write (#121)
Browse files Browse the repository at this point in the history
* Green Blob GC by merge operator

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Feb 15, 2020
1 parent ffaa9d1 commit 4dc4ba8
Show file tree
Hide file tree
Showing 19 changed files with 741 additions and 195 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
blob_format_test
blob_gc_job_test
blob_gc_picker_test
blob_index_merge_operator_test
gc_stats_test
table_builder_test
thread_safety_test
Expand Down
14 changes: 13 additions & 1 deletion include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: 20
int max_sorted_runs{20};

// If set true, Titan will rewrite valid blob index from GC output as merge
// operands back to data store.
//
// With this feature enabled, Titan background GC won't block online write,
// trade-off being read performance slightly reduced compared to normal
// rewrite mode.
//
// Default: false
bool gc_merge_rewrite{false};

TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
Expand Down Expand Up @@ -209,9 +219,11 @@ struct MutableTitanCFOptions {
MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {}

explicit MutableTitanCFOptions(const TitanCFOptions& opts)
: blob_run_mode(opts.blob_run_mode) {}
: blob_run_mode(opts.blob_run_mode),
gc_merge_rewrite(opts.gc_merge_rewrite) {}

TitanBlobRunMode blob_run_mode;
bool gc_merge_rewrite;
};

struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
Expand Down
15 changes: 12 additions & 3 deletions src/blob_file_size_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,24 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
const Slice& value, EntryType type,
SequenceNumber /* seq */,
uint64_t /* file_size */) {
if (type != kEntryBlobIndex) {
if (type != kEntryBlobIndex && type != kEntryMerge) {
return Status::OK();
}

BlobIndex index;
auto s = index.DecodeFrom(const_cast<Slice*>(&value));
Status s;
MergeBlobIndex index;

if (type == kEntryMerge) {
s = index.DecodeFrom(const_cast<Slice*>(&value));
} else {
s = index.DecodeFromBase(const_cast<Slice*>(&value));
}
if (!s.ok()) {
return s;
}
if (BlobIndex::IsDeletionMarker(index)) {
return Status::OK();
}

auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) {
Expand Down
10 changes: 6 additions & 4 deletions src/blob_file_size_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(wfile.get(), &table_builder);

constexpr uint64_t kFirstFileNumber = 1ULL;
constexpr uint64_t kSecondFileNumber = 2ULL;
const int kNumEntries = 100;
char buf[16];
for (int i = 0; i < kNumEntries; i++) {
Expand All @@ -97,9 +99,9 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {

BlobIndex index;
if (i % 2 == 0) {
index.file_number = 0ULL;
index.file_number = kFirstFileNumber;
} else {
index.file_number = 1ULL;
index.file_number = kSecondFileNumber;
}
index.blob_handle.size = 10;
std::string value;
Expand Down Expand Up @@ -130,8 +132,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {

ASSERT_EQ(2, result.size());

ASSERT_EQ(kNumEntries / 2 * 10, result[0]);
ASSERT_EQ(kNumEntries / 2 * 10, result[1]);
ASSERT_EQ(kNumEntries / 2 * 10, result[kFirstFileNumber]);
ASSERT_EQ(kNumEntries / 2 * 10, result[kSecondFileNumber]);
}

} // namespace titandb
Expand Down
48 changes: 45 additions & 3 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,51 @@ Status BlobIndex::DecodeFrom(Slice* src) {
return s;
}

bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) {
return (lhs.file_number == rhs.file_number &&
lhs.blob_handle == rhs.blob_handle);
void BlobIndex::EncodeDeletionMarkerTo(std::string* dst) {
dst->push_back(kBlobRecord);
PutVarint64(dst, 0);
BlobHandle dummy;
dummy.EncodeTo(dst);
}

bool BlobIndex::IsDeletionMarker(const BlobIndex& index) {
return index.file_number == 0;
}

bool BlobIndex::operator==(const BlobIndex& rhs) const {
return (file_number == rhs.file_number && blob_handle == rhs.blob_handle);
}

void MergeBlobIndex::EncodeTo(std::string* dst) const {
BlobIndex::EncodeTo(dst);
PutVarint64(dst, source_file_number);
PutVarint64(dst, source_file_offset);
}

void MergeBlobIndex::EncodeToBase(std::string* dst) const {
BlobIndex::EncodeTo(dst);
}

Status MergeBlobIndex::DecodeFrom(Slice* src) {
Status s = BlobIndex::DecodeFrom(src);
if (!s.ok()) {
return s;
}
if (!GetVarint64(src, &source_file_number) ||
!GetVarint64(src, &source_file_offset)) {
return Status::Corruption("MergeBlobIndex");
}
return s;
}

Status MergeBlobIndex::DecodeFromBase(Slice* src) {
return BlobIndex::DecodeFrom(src);
}

bool MergeBlobIndex::operator==(const MergeBlobIndex& rhs) const {
return (source_file_number == rhs.source_file_number &&
source_file_offset == rhs.source_file_offset &&
BlobIndex::operator==(rhs));
}

void BlobFileMeta::EncodeTo(std::string* dst) const {
Expand Down
19 changes: 18 additions & 1 deletion src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "table/format.h"
#include "util.h"

Expand Down Expand Up @@ -139,10 +140,26 @@ struct BlobIndex {
uint64_t file_number{0};
BlobHandle blob_handle;

virtual ~BlobIndex() {}

void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
static void EncodeDeletionMarkerTo(std::string* dst);
static bool IsDeletionMarker(const BlobIndex& index);

bool operator==(const BlobIndex& rhs) const;
};

struct MergeBlobIndex : public BlobIndex {
uint64_t source_file_number{0};
uint64_t source_file_offset{0};

void EncodeTo(std::string* dst) const;
void EncodeToBase(std::string* dst) const;
Status DecodeFrom(Slice* src);
Status DecodeFromBase(Slice* src);

friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs);
bool operator==(const MergeBlobIndex& rhs) const;
};

// Format of blob file meta (not fixed size):
Expand Down
115 changes: 78 additions & 37 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "blob_gc_job.h"

#include "blob_file_size_collector.h"

namespace rocksdb {
namespace titandb {

Expand Down Expand Up @@ -73,7 +75,8 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
};

BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env,
const TitanDBOptions& titan_db_options,
bool gc_merge_rewrite, Env* env,
const EnvOptions& env_options,
BlobFileManager* blob_file_manager,
BlobFileSet* blob_file_set, LogBuffer* log_buffer,
Expand All @@ -83,6 +86,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)),
mutex_(mutex),
db_options_(titan_db_options),
gc_merge_rewrite_(gc_merge_rewrite),
env_(env),
env_options_(env_options),
blob_file_manager_(blob_file_manager),
Expand Down Expand Up @@ -227,21 +231,32 @@ Status BlobGCJob::DoRunGC() {
// blob index's size is counted in `RewriteValidKeyToLSM`
metrics_.bytes_written += blob_record.size();

BlobIndex new_blob_index;
MergeBlobIndex new_blob_index;
new_blob_index.file_number = blob_file_handle->GetNumber();
new_blob_index.source_file_number = blob_index.file_number;
new_blob_index.source_file_offset = blob_index.blob_handle.offset;
blob_file_builder->Add(blob_record, &new_blob_index.blob_handle);
std::string index_entry;
new_blob_index.EncodeTo(&index_entry);

// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
std::move(blob_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key,
index_entry);

if (!gc_merge_rewrite_) {
new_blob_index.EncodeToBase(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
std::move(blob_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key,
index_entry);
} else {
new_blob_index.EncodeTo(&index_entry);
rewrite_batches_without_callback_.emplace_back(
std::make_pair(WriteBatch(), blob_index.blob_handle.size));
auto& wb = rewrite_batches_without_callback_.back().first;
s = WriteBatchInternal::Merge(&wb, cfh->GetID(), blob_record.key,
index_entry);
}
if (!s.ok()) {
break;
}
Expand Down Expand Up @@ -433,32 +448,58 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
WriteOptions wo;
wo.low_pri = true;
wo.ignore_missing_column_families = true;
for (auto& write_batch : rewrite_batches_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
if (!gc_merge_rewrite_) {
for (auto& write_batch : rewrite_batches_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
}
s = db_impl->WriteWithCallback(wo, &write_batch.first,
&write_batch.second);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) {
metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record.
} else {
// We hit an error.
break;
}
// count read bytes in write callback
metrics_.bytes_read += write_batch.second.read_bytes();
}
s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) {
metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record.
} else {
// We hit an error.
break;
} else {
for (auto& write_batch : rewrite_batches_without_callback_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
}
s = db_impl->Write(wo, &write_batch.first);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second;
// Key is successfully written to LSM.
} else {
// We hit an error.
break;
}
// read bytes is 0
}
// count read bytes in write callback
metrics_.bytes_read += write_batch.second.read_bytes();
}
if (s.IsBusy()) {
s = Status::OK();
Expand Down
Loading

0 comments on commit 4dc4ba8

Please sign in to comment.