Skip to content

Commit

Permalink
Revert "[PCP-21] Titan GC doesn’t affect online write (#121)" (#249)
Browse files Browse the repository at this point in the history
* Revert "[PCP-21] Titan GC doesn’t affect online write (#121)"

This reverts commit 4dc4ba8.

Signed-off-by: tabokie <xy.tao@outlook.com>

* format and build

Signed-off-by: tabokie <xy.tao@outlook.com>

* restore some modifications to tests

Signed-off-by: tabokie <xy.tao@outlook.com>

* Trigger rebuild

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Jun 6, 2022
1 parent 26c5669 commit a671462
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 585 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
blob_format_test
blob_gc_job_test
blob_gc_picker_test
blob_index_merge_operator_test
gc_stats_test
table_builder_test
thread_safety_test
Expand Down
14 changes: 1 addition & 13 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,6 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: 20
int max_sorted_runs{20};

// If set true, Titan will rewrite valid blob index from GC output as merge
// operands back to data store.
//
// With this feature enabled, Titan background GC won't block online write,
// trade-off being read performance slightly reduced compared to normal
// rewrite mode.
//
// Default: false
bool gc_merge_rewrite{false};

// If set true, Titan will pass empty value in user compaction filter,
// improves compaction performance by avoid fetching value from blob files.
//
Expand Down Expand Up @@ -233,11 +223,9 @@ struct MutableTitanCFOptions {
MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {}

explicit MutableTitanCFOptions(const TitanCFOptions& opts)
: blob_run_mode(opts.blob_run_mode),
gc_merge_rewrite(opts.gc_merge_rewrite) {}
: blob_run_mode(opts.blob_run_mode) {}

TitanBlobRunMode blob_run_mode;
bool gc_merge_rewrite;
};

struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
Expand Down
15 changes: 3 additions & 12 deletions src/blob_file_size_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,15 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
const Slice& value, EntryType type,
SequenceNumber /* seq */,
uint64_t /* file_size */) {
if (type != kEntryBlobIndex && type != kEntryMerge) {
if (type != kEntryBlobIndex) {
return Status::OK();
}

Status s;
MergeBlobIndex index;

if (type == kEntryMerge) {
s = index.DecodeFrom(const_cast<Slice*>(&value));
} else {
s = index.DecodeFromBase(const_cast<Slice*>(&value));
}
BlobIndex index;
auto s = index.DecodeFrom(const_cast<Slice*>(&value));
if (!s.ok()) {
return s;
}
if (BlobIndex::IsDeletionMarker(index)) {
return Status::OK();
}

auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) {
Expand Down
48 changes: 3 additions & 45 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,51 +128,9 @@ Status BlobIndex::DecodeFrom(Slice* src) {
return s;
}

void BlobIndex::EncodeDeletionMarkerTo(std::string* dst) {
dst->push_back(kBlobRecord);
PutVarint64(dst, 0);
BlobHandle dummy;
dummy.EncodeTo(dst);
}

bool BlobIndex::IsDeletionMarker(const BlobIndex& index) {
return index.file_number == 0;
}

bool BlobIndex::operator==(const BlobIndex& rhs) const {
return (file_number == rhs.file_number && blob_handle == rhs.blob_handle);
}

void MergeBlobIndex::EncodeTo(std::string* dst) const {
BlobIndex::EncodeTo(dst);
PutVarint64(dst, source_file_number);
PutVarint64(dst, source_file_offset);
}

void MergeBlobIndex::EncodeToBase(std::string* dst) const {
BlobIndex::EncodeTo(dst);
}

Status MergeBlobIndex::DecodeFrom(Slice* src) {
Status s = BlobIndex::DecodeFrom(src);
if (!s.ok()) {
return s;
}
if (!GetVarint64(src, &source_file_number) ||
!GetVarint64(src, &source_file_offset)) {
return Status::Corruption("MergeBlobIndex");
}
return s;
}

Status MergeBlobIndex::DecodeFromBase(Slice* src) {
return BlobIndex::DecodeFrom(src);
}

bool MergeBlobIndex::operator==(const MergeBlobIndex& rhs) const {
return (source_file_number == rhs.source_file_number &&
source_file_offset == rhs.source_file_offset &&
BlobIndex::operator==(rhs));
bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) {
return (lhs.file_number == rhs.file_number &&
lhs.blob_handle == rhs.blob_handle);
}

void BlobFileMeta::EncodeTo(std::string* dst) const {
Expand Down
19 changes: 1 addition & 18 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "table/format.h"
#include "util.h"

Expand Down Expand Up @@ -165,26 +164,10 @@ struct BlobIndex {
uint64_t file_number{0};
BlobHandle blob_handle;

virtual ~BlobIndex() {}

void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
static void EncodeDeletionMarkerTo(std::string* dst);
static bool IsDeletionMarker(const BlobIndex& index);

bool operator==(const BlobIndex& rhs) const;
};

struct MergeBlobIndex : public BlobIndex {
uint64_t source_file_number{0};
uint64_t source_file_offset{0};

void EncodeTo(std::string* dst) const;
void EncodeToBase(std::string* dst) const;
Status DecodeFrom(Slice* src);
Status DecodeFromBase(Slice* src);

bool operator==(const MergeBlobIndex& rhs) const;
friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs);
};

// Format of blob file meta (not fixed size):
Expand Down
131 changes: 45 additions & 86 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include <memory>

#include "blob_file_size_collector.h"
#include "titan_logging.h"

namespace rocksdb {
Expand Down Expand Up @@ -76,8 +75,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
};

BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options,
bool gc_merge_rewrite, Env* env,
const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options,
BlobFileManager* blob_file_manager,
BlobFileSet* blob_file_set, LogBuffer* log_buffer,
Expand All @@ -87,7 +85,6 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)),
mutex_(mutex),
db_options_(titan_db_options),
gc_merge_rewrite_(gc_merge_rewrite),
env_(env),
env_options_(env_options),
blob_file_manager_(blob_file_manager),
Expand Down Expand Up @@ -274,12 +271,9 @@ void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts,
auto* cfh = blob_gc_->column_family_handle();
for (const std::unique_ptr<BlobFileBuilder::BlobRecordContext>& ctx :
contexts) {
MergeBlobIndex merge_blob_index;
merge_blob_index.file_number = ctx->new_blob_index.file_number;
merge_blob_index.source_file_number = ctx->original_blob_index.file_number;
merge_blob_index.source_file_offset =
ctx->original_blob_index.blob_handle.offset;
merge_blob_index.blob_handle = ctx->new_blob_index.blob_handle;
BlobIndex blob_index;
blob_index.file_number = ctx->new_blob_index.file_number;
blob_index.blob_handle = ctx->new_blob_index.blob_handle;

std::string index_entry;
BlobIndex original_index = ctx->original_blob_index;
Expand All @@ -288,25 +282,16 @@ void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts,
*s = Status::Corruption(Slice());
return;
}
if (!gc_merge_rewrite_) {
merge_blob_index.EncodeToBase(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, ikey.user_key.ToString(),
std::move(original_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
*s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key,
index_entry);
} else {
merge_blob_index.EncodeTo(&index_entry);
rewrite_batches_without_callback_.emplace_back(
std::make_pair(WriteBatch(), original_index.blob_handle.size));
auto& wb = rewrite_batches_without_callback_.back().first;
*s = WriteBatchInternal::Merge(&wb, cfh->GetID(), ikey.user_key,
index_entry);
}
blob_index.EncodeTo(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, ikey.user_key.ToString(),
std::move(original_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
*s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key,
index_entry);
if (!s->ok()) break;
}
}
Expand Down Expand Up @@ -485,65 +470,39 @@ Status BlobGCJob::RewriteValidKeyToLSM() {

std::unordered_map<uint64_t, uint64_t>
dropped; // blob_file_number -> dropped_size
if (!gc_merge_rewrite_) {
for (auto& write_batch : rewrite_batches_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
}
s = db_impl->WriteWithCallback(wo, &write_batch.first,
&write_batch.second);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.gc_bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) {
metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record.
// Though record is dropped, the diff won't counted in discardable
// ratio,
// so we should update the live_data_size here.
BlobIndex blob_index;
Slice str(write_batch.second.value);
blob_index.DecodeFrom(&str);
dropped[blob_index.file_number] += blob_index.blob_handle.size;
} else {
// We hit an error.
break;
}
// count read bytes in write callback
metrics_.gc_bytes_read += write_batch.second.read_bytes();
for (auto& write_batch : rewrite_batches_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
} else {
for (auto& write_batch : rewrite_batches_without_callback_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = Status::Aborted("Column family drop");
break;
}
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
}
s = db_impl->Write(wo, &write_batch.first);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.gc_bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second;
// Key is successfully written to LSM.
} else {
// We hit an error.
break;
}
// read bytes is 0
if (IsShutingDown()) {
s = Status::ShutdownInProgress();
break;
}
s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.gc_bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) {
metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record.
// Though record is dropped, the diff won't counted in discardable
// ratio,
// so we should update the live_data_size here.
BlobIndex blob_index;
Slice str(write_batch.second.value);
blob_index.DecodeFrom(&str);
dropped[blob_index.file_number] += blob_index.blob_handle.size;
} else {
// We hit an error.
break;
}
// count read bytes in write callback
metrics_.gc_bytes_read += write_batch.second.read_bytes();
}
if (s.IsBusy()) {
s = Status::OK();
Expand Down
12 changes: 4 additions & 8 deletions src/blob_gc_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ namespace titandb {
class BlobGCJob {
public:
BlobGCJob(BlobGC *blob_gc, DB *db, port::Mutex *mutex,
const TitanDBOptions &titan_db_options, bool gc_merge_rewrite,
Env *env, const EnvOptions &env_options,
BlobFileManager *blob_file_manager, BlobFileSet *blob_file_set,
LogBuffer *log_buffer, std::atomic_bool *shuting_down,
TitanStats *stats);
const TitanDBOptions &titan_db_options, Env *env,
const EnvOptions &env_options, BlobFileManager *blob_file_manager,
BlobFileSet *blob_file_set, LogBuffer *log_buffer,
std::atomic_bool *shuting_down, TitanStats *stats);

// No copying allowed
BlobGCJob(const BlobGCJob &) = delete;
Expand All @@ -48,7 +47,6 @@ class BlobGCJob {
DBImpl *base_db_impl_;
port::Mutex *mutex_;
TitanDBOptions db_options_;
const bool gc_merge_rewrite_;
Env *env_;
EnvOptions env_options_;
BlobFileManager *blob_file_manager_;
Expand All @@ -60,8 +58,6 @@ class BlobGCJob {
blob_file_builders_;
std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>>
rewrite_batches_;
std::vector<std::pair<WriteBatch, uint64_t /*blob_record_size*/>>
rewrite_batches_without_callback_;

std::atomic_bool *shuting_down_{nullptr};

Expand Down
Loading

0 comments on commit a671462

Please sign in to comment.