From a6714625a94a98a2160af5688f98a2b761a343f4 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Mon, 6 Jun 2022 18:47:59 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"[PCP-21]=20Titan=20GC=20doesn?= =?UTF-8?q?=E2=80=99t=20affect=20online=20write=20(#121)"=20(#249)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revert "[PCP-21] Titan GC doesn’t affect online write (#121)" This reverts commit 4dc4ba89f8999c2981cf916badb088f86b4db227. Signed-off-by: tabokie * format and build Signed-off-by: tabokie * restore some modifications to tests Signed-off-by: tabokie * Trigger rebuild Signed-off-by: tabokie --- CMakeLists.txt | 1 - include/titan/options.h | 14 +- src/blob_file_size_collector.cc | 15 +-- src/blob_format.cc | 48 +------ src/blob_format.h | 19 +-- src/blob_gc_job.cc | 131 +++++++------------ src/blob_gc_job.h | 12 +- src/blob_gc_job_test.cc | 30 ++--- src/blob_index_merge_operator.h | 93 ------------- src/blob_index_merge_operator_test.cc | 180 -------------------------- src/compaction_filter.h | 4 - src/db_impl.cc | 32 +---- src/db_impl.h | 2 - src/db_impl_gc.cc | 11 +- src/db_iter.h | 37 +----- src/options.cc | 1 - src/table_builder_test.cc | 33 +---- src/titan_db_test.cc | 13 +- 18 files changed, 91 insertions(+), 585 deletions(-) delete mode 100644 src/blob_index_merge_operator.h delete mode 100644 src/blob_index_merge_operator_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 951f2a7dc..1c6a71c5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -108,7 +108,6 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release")) blob_format_test blob_gc_job_test blob_gc_picker_test - blob_index_merge_operator_test gc_stats_test table_builder_test thread_safety_test diff --git a/include/titan/options.h b/include/titan/options.h index 05668425e..060ba2f35 100644 --- a/include/titan/options.h +++ b/include/titan/options.h @@ -160,16 +160,6 @@ struct TitanCFOptions : public ColumnFamilyOptions { // Default: 20 int max_sorted_runs{20}; - // If set true, Titan will rewrite valid blob index from GC output as merge - // operands back to data store. - // - // With this feature enabled, Titan background GC won't block online write, - // trade-off being read performance slightly reduced compared to normal - // rewrite mode. - // - // Default: false - bool gc_merge_rewrite{false}; - // If set true, Titan will pass empty value in user compaction filter, // improves compaction performance by avoid fetching value from blob files. // @@ -233,11 +223,9 @@ struct MutableTitanCFOptions { MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {} explicit MutableTitanCFOptions(const TitanCFOptions& opts) - : blob_run_mode(opts.blob_run_mode), - gc_merge_rewrite(opts.gc_merge_rewrite) {} + : blob_run_mode(opts.blob_run_mode) {} TitanBlobRunMode blob_run_mode; - bool gc_merge_rewrite; }; struct TitanOptions : public TitanDBOptions, public TitanCFOptions { diff --git a/src/blob_file_size_collector.cc b/src/blob_file_size_collector.cc index 9d06226f6..f59dbf3d3 100644 --- a/src/blob_file_size_collector.cc +++ b/src/blob_file_size_collector.cc @@ -46,24 +46,15 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */, const Slice& value, EntryType type, SequenceNumber /* seq */, uint64_t /* file_size */) { - if (type != kEntryBlobIndex && type != kEntryMerge) { + if (type != kEntryBlobIndex) { return Status::OK(); } - Status s; - MergeBlobIndex index; - - if (type == kEntryMerge) { - s = index.DecodeFrom(const_cast(&value)); - } else { - s = index.DecodeFromBase(const_cast(&value)); - } + BlobIndex index; + auto s = index.DecodeFrom(const_cast(&value)); if (!s.ok()) { return s; } - if (BlobIndex::IsDeletionMarker(index)) { - return Status::OK(); - } auto iter = blob_files_size_.find(index.file_number); if (iter == blob_files_size_.end()) { diff --git a/src/blob_format.cc b/src/blob_format.cc index 716d86651..d5a39f4dd 100644 --- a/src/blob_format.cc +++ b/src/blob_format.cc @@ -128,51 +128,9 @@ Status BlobIndex::DecodeFrom(Slice* src) { return s; } -void BlobIndex::EncodeDeletionMarkerTo(std::string* dst) { - dst->push_back(kBlobRecord); - PutVarint64(dst, 0); - BlobHandle dummy; - dummy.EncodeTo(dst); -} - -bool BlobIndex::IsDeletionMarker(const BlobIndex& index) { - return index.file_number == 0; -} - -bool BlobIndex::operator==(const BlobIndex& rhs) const { - return (file_number == rhs.file_number && blob_handle == rhs.blob_handle); -} - -void MergeBlobIndex::EncodeTo(std::string* dst) const { - BlobIndex::EncodeTo(dst); - PutVarint64(dst, source_file_number); - PutVarint64(dst, source_file_offset); -} - -void MergeBlobIndex::EncodeToBase(std::string* dst) const { - BlobIndex::EncodeTo(dst); -} - -Status MergeBlobIndex::DecodeFrom(Slice* src) { - Status s = BlobIndex::DecodeFrom(src); - if (!s.ok()) { - return s; - } - if (!GetVarint64(src, &source_file_number) || - !GetVarint64(src, &source_file_offset)) { - return Status::Corruption("MergeBlobIndex"); - } - return s; -} - -Status MergeBlobIndex::DecodeFromBase(Slice* src) { - return BlobIndex::DecodeFrom(src); -} - -bool MergeBlobIndex::operator==(const MergeBlobIndex& rhs) const { - return (source_file_number == rhs.source_file_number && - source_file_offset == rhs.source_file_offset && - BlobIndex::operator==(rhs)); +bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) { + return (lhs.file_number == rhs.file_number && + lhs.blob_handle == rhs.blob_handle); } void BlobFileMeta::EncodeTo(std::string* dst) const { diff --git a/src/blob_format.h b/src/blob_format.h index 15d2d3035..c50d6c5df 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -3,7 +3,6 @@ #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "rocksdb/types.h" #include "table/format.h" #include "util.h" @@ -165,26 +164,10 @@ struct BlobIndex { uint64_t file_number{0}; BlobHandle blob_handle; - virtual ~BlobIndex() {} - - void EncodeTo(std::string* dst) const; - Status DecodeFrom(Slice* src); - static void EncodeDeletionMarkerTo(std::string* dst); - static bool IsDeletionMarker(const BlobIndex& index); - - bool operator==(const BlobIndex& rhs) const; -}; - -struct MergeBlobIndex : public BlobIndex { - uint64_t source_file_number{0}; - uint64_t source_file_offset{0}; - void EncodeTo(std::string* dst) const; - void EncodeToBase(std::string* dst) const; Status DecodeFrom(Slice* src); - Status DecodeFromBase(Slice* src); - bool operator==(const MergeBlobIndex& rhs) const; + friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs); }; // Format of blob file meta (not fixed size): diff --git a/src/blob_gc_job.cc b/src/blob_gc_job.cc index 5624f061e..0b842b0e4 100644 --- a/src/blob_gc_job.cc +++ b/src/blob_gc_job.cc @@ -7,7 +7,6 @@ #include -#include "blob_file_size_collector.h" #include "titan_logging.h" namespace rocksdb { @@ -76,8 +75,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { }; BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, - const TitanDBOptions& titan_db_options, - bool gc_merge_rewrite, Env* env, + const TitanDBOptions& titan_db_options, Env* env, const EnvOptions& env_options, BlobFileManager* blob_file_manager, BlobFileSet* blob_file_set, LogBuffer* log_buffer, @@ -87,7 +85,6 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, base_db_impl_(reinterpret_cast(base_db_)), mutex_(mutex), db_options_(titan_db_options), - gc_merge_rewrite_(gc_merge_rewrite), env_(env), env_options_(env_options), blob_file_manager_(blob_file_manager), @@ -274,12 +271,9 @@ void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts, auto* cfh = blob_gc_->column_family_handle(); for (const std::unique_ptr& ctx : contexts) { - MergeBlobIndex merge_blob_index; - merge_blob_index.file_number = ctx->new_blob_index.file_number; - merge_blob_index.source_file_number = ctx->original_blob_index.file_number; - merge_blob_index.source_file_offset = - ctx->original_blob_index.blob_handle.offset; - merge_blob_index.blob_handle = ctx->new_blob_index.blob_handle; + BlobIndex blob_index; + blob_index.file_number = ctx->new_blob_index.file_number; + blob_index.blob_handle = ctx->new_blob_index.blob_handle; std::string index_entry; BlobIndex original_index = ctx->original_blob_index; @@ -288,25 +282,16 @@ void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts, *s = Status::Corruption(Slice()); return; } - if (!gc_merge_rewrite_) { - merge_blob_index.EncodeToBase(&index_entry); - // Store WriteBatch for rewriting new Key-Index pairs to LSM - GarbageCollectionWriteCallback callback(cfh, ikey.user_key.ToString(), - std::move(original_index)); - callback.value = index_entry; - rewrite_batches_.emplace_back( - std::make_pair(WriteBatch(), std::move(callback))); - auto& wb = rewrite_batches_.back().first; - *s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key, - index_entry); - } else { - merge_blob_index.EncodeTo(&index_entry); - rewrite_batches_without_callback_.emplace_back( - std::make_pair(WriteBatch(), original_index.blob_handle.size)); - auto& wb = rewrite_batches_without_callback_.back().first; - *s = WriteBatchInternal::Merge(&wb, cfh->GetID(), ikey.user_key, - index_entry); - } + blob_index.EncodeTo(&index_entry); + // Store WriteBatch for rewriting new Key-Index pairs to LSM + GarbageCollectionWriteCallback callback(cfh, ikey.user_key.ToString(), + std::move(original_index)); + callback.value = index_entry; + rewrite_batches_.emplace_back( + std::make_pair(WriteBatch(), std::move(callback))); + auto& wb = rewrite_batches_.back().first; + *s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key, + index_entry); if (!s->ok()) break; } } @@ -485,65 +470,39 @@ Status BlobGCJob::RewriteValidKeyToLSM() { std::unordered_map dropped; // blob_file_number -> dropped_size - if (!gc_merge_rewrite_) { - for (auto& write_batch : rewrite_batches_) { - if (blob_gc_->GetColumnFamilyData()->IsDropped()) { - s = Status::Aborted("Column family drop"); - break; - } - if (IsShutingDown()) { - s = Status::ShutdownInProgress(); - break; - } - s = db_impl->WriteWithCallback(wo, &write_batch.first, - &write_batch.second); - if (s.ok()) { - // count written bytes for new blob index. - metrics_.gc_bytes_written += write_batch.first.GetDataSize(); - metrics_.gc_num_keys_relocated++; - metrics_.gc_bytes_relocated += write_batch.second.blob_record_size(); - // Key is successfully written to LSM. - } else if (s.IsBusy()) { - metrics_.gc_num_keys_overwritten++; - metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size(); - // The key is overwritten in the meanwhile. Drop the blob record. - // Though record is dropped, the diff won't counted in discardable - // ratio, - // so we should update the live_data_size here. - BlobIndex blob_index; - Slice str(write_batch.second.value); - blob_index.DecodeFrom(&str); - dropped[blob_index.file_number] += blob_index.blob_handle.size; - } else { - // We hit an error. - break; - } - // count read bytes in write callback - metrics_.gc_bytes_read += write_batch.second.read_bytes(); + for (auto& write_batch : rewrite_batches_) { + if (blob_gc_->GetColumnFamilyData()->IsDropped()) { + s = Status::Aborted("Column family drop"); + break; } - } else { - for (auto& write_batch : rewrite_batches_without_callback_) { - if (blob_gc_->GetColumnFamilyData()->IsDropped()) { - s = Status::Aborted("Column family drop"); - break; - } - if (IsShutingDown()) { - s = Status::ShutdownInProgress(); - break; - } - s = db_impl->Write(wo, &write_batch.first); - if (s.ok()) { - // count written bytes for new blob index. - metrics_.gc_bytes_written += write_batch.first.GetDataSize(); - metrics_.gc_num_keys_relocated++; - metrics_.gc_bytes_relocated += write_batch.second; - // Key is successfully written to LSM. - } else { - // We hit an error. - break; - } - // read bytes is 0 + if (IsShutingDown()) { + s = Status::ShutdownInProgress(); + break; + } + s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second); + if (s.ok()) { + // count written bytes for new blob index. + metrics_.gc_bytes_written += write_batch.first.GetDataSize(); + metrics_.gc_num_keys_relocated++; + metrics_.gc_bytes_relocated += write_batch.second.blob_record_size(); + // Key is successfully written to LSM. + } else if (s.IsBusy()) { + metrics_.gc_num_keys_overwritten++; + metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size(); + // The key is overwritten in the meanwhile. Drop the blob record. + // Though record is dropped, the diff won't counted in discardable + // ratio, + // so we should update the live_data_size here. + BlobIndex blob_index; + Slice str(write_batch.second.value); + blob_index.DecodeFrom(&str); + dropped[blob_index.file_number] += blob_index.blob_handle.size; + } else { + // We hit an error. + break; } + // count read bytes in write callback + metrics_.gc_bytes_read += write_batch.second.read_bytes(); } if (s.IsBusy()) { s = Status::OK(); diff --git a/src/blob_gc_job.h b/src/blob_gc_job.h index 4e0bcc8ab..5a4928d05 100644 --- a/src/blob_gc_job.h +++ b/src/blob_gc_job.h @@ -18,11 +18,10 @@ namespace titandb { class BlobGCJob { public: BlobGCJob(BlobGC *blob_gc, DB *db, port::Mutex *mutex, - const TitanDBOptions &titan_db_options, bool gc_merge_rewrite, - Env *env, const EnvOptions &env_options, - BlobFileManager *blob_file_manager, BlobFileSet *blob_file_set, - LogBuffer *log_buffer, std::atomic_bool *shuting_down, - TitanStats *stats); + const TitanDBOptions &titan_db_options, Env *env, + const EnvOptions &env_options, BlobFileManager *blob_file_manager, + BlobFileSet *blob_file_set, LogBuffer *log_buffer, + std::atomic_bool *shuting_down, TitanStats *stats); // No copying allowed BlobGCJob(const BlobGCJob &) = delete; @@ -48,7 +47,6 @@ class BlobGCJob { DBImpl *base_db_impl_; port::Mutex *mutex_; TitanDBOptions db_options_; - const bool gc_merge_rewrite_; Env *env_; EnvOptions env_options_; BlobFileManager *blob_file_manager_; @@ -60,8 +58,6 @@ class BlobGCJob { blob_file_builders_; std::vector> rewrite_batches_; - std::vector> - rewrite_batches_without_callback_; std::atomic_bool *shuting_down_{nullptr}; diff --git a/src/blob_gc_job_test.cc b/src/blob_gc_job_test.cc index fd6781fba..80b1fbaf7 100644 --- a/src/blob_gc_job_test.cc +++ b/src/blob_gc_job_test.cc @@ -22,7 +22,7 @@ std::string GenValue(int i) { return buffer; } -class BlobGCJobTest : public testing::TestWithParam { +class BlobGCJobTest : public testing::Test { public: std::string dbname_; TitanDB* db_; @@ -41,7 +41,6 @@ class BlobGCJobTest : public testing::TestWithParam { options_.env->CreateDirIfMissing(dbname_); options_.env->CreateDirIfMissing(options_.dirname); } - ~BlobGCJobTest() { Close(); } void DisableMergeSmall() { options_.merge_small_file_threshold = 0; } @@ -158,7 +157,7 @@ class BlobGCJobTest : public testing::TestWithParam { blob_gc->SetColumnFamily(cfh); BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_, - GetParam(), tdb_->env_, EnvOptions(options_), + tdb_->env_, EnvOptions(options_), tdb_->blob_manager_.get(), blob_file_set_, &log_buffer, nullptr, nullptr); @@ -217,8 +216,8 @@ class BlobGCJobTest : public testing::TestWithParam { BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); blob_gc.SetColumnFamily(cfh); BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), - GetParam(), Env::Default(), EnvOptions(), nullptr, - blob_file_set_, nullptr, nullptr, nullptr); + Env::Default(), EnvOptions(), nullptr, blob_file_set_, + nullptr, nullptr, nullptr); bool discardable = false; ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable)); ASSERT_FALSE(discardable); @@ -281,11 +280,11 @@ class BlobGCJobTest : public testing::TestWithParam { } }; -TEST_P(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); } +TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); } -TEST_P(BlobGCJobTest, RunGC) { TestRunGC(); } +TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); } -TEST_P(BlobGCJobTest, GCLimiter) { +TEST_F(BlobGCJobTest, GCLimiter) { class TestLimiter : public RateLimiter { public: TestLimiter(RateLimiter::Mode mode) @@ -378,7 +377,7 @@ TEST_P(BlobGCJobTest, GCLimiter) { Close(); } -TEST_P(BlobGCJobTest, Reopen) { +TEST_F(BlobGCJobTest, Reopen) { DisableMergeSmall(); NewDB(); for (int i = 0; i < 10; i++) { @@ -405,7 +404,7 @@ TEST_P(BlobGCJobTest, Reopen) { // Tests blob file will be kept after GC, if it is still visible by active // snapshots. -TEST_P(BlobGCJobTest, PurgeBlobs) { +TEST_F(BlobGCJobTest, PurgeBlobs) { NewDB(); auto snap1 = db_->GetSnapshot(); @@ -460,7 +459,7 @@ TEST_P(BlobGCJobTest, PurgeBlobs) { CheckBlobNumber(1); } -TEST_P(BlobGCJobTest, DeleteFilesInRange) { +TEST_F(BlobGCJobTest, DeleteFilesInRange) { NewDB(); ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21))); @@ -551,7 +550,7 @@ TEST_P(BlobGCJobTest, DeleteFilesInRange) { delete iter; } -TEST_P(BlobGCJobTest, LevelMergeGC) { +TEST_F(BlobGCJobTest, LevelMergeGC) { options_.level_merge = true; options_.level_compaction_dynamic_level_bytes = true; options_.blob_file_discardable_ratio = 0.5; @@ -602,7 +601,7 @@ TEST_P(BlobGCJobTest, LevelMergeGC) { BlobFileMeta::FileState::kNormal); } -TEST_P(BlobGCJobTest, RangeMergeScheduler) { +TEST_F(BlobGCJobTest, RangeMergeScheduler) { NewDB(); auto init_files = [&](std::vector>> @@ -773,7 +772,7 @@ TEST_P(BlobGCJobTest, RangeMergeScheduler) { } } -TEST_P(BlobGCJobTest, RangeMerge) { +TEST_F(BlobGCJobTest, RangeMerge) { options_.level_merge = true; options_.level_compaction_dynamic_level_bytes = true; options_.blob_file_discardable_ratio = 0.5; @@ -823,9 +822,6 @@ TEST_P(BlobGCJobTest, RangeMerge) { ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kObsolete); } } - -INSTANTIATE_TEST_CASE_P(BlobGCJobTestParameterized, BlobGCJobTest, - ::testing::Values(false, true)); } // namespace titandb } // namespace rocksdb diff --git a/src/blob_index_merge_operator.h b/src/blob_index_merge_operator.h deleted file mode 100644 index dfe85578a..000000000 --- a/src/blob_index_merge_operator.h +++ /dev/null @@ -1,93 +0,0 @@ -#pragma once - -#include "rocksdb/merge_operator.h" - -#include "blob_file_set.h" - -namespace rocksdb { -namespace titandb { - -class BlobIndexMergeOperator : public MergeOperator { - public: - BlobIndexMergeOperator() = default; - - // FullMergeV2 merges one base value with multiple merge operands and - // preserves latest value w.r.t. timestamp of original *put*. Each merge - // is the output of blob GC, and contains meta data including *src-file-no* - // and *src-file-offset*. - // Merge operation follows such rules: - // *. basic rule (keep base value): [Y][Z] ... [X](Y)(Z) => [X] - // a. same put (keep merge value): [Y] ... [X](Y)(X')(X") => [X"] - // we identify this case by checking *src-location* of merges against - // *blob-handle* of base. - // b. deletion (keep deletion marker): [delete](X)(Y) => [deletion marker] - // this is a workaround since vanilla rocksdb disallow empty result from - // merge. - bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - Status s; - if (merge_in.existing_value && merge_in.value_type == kValue) { - merge_out->new_type = kValue; - merge_out->existing_operand = *merge_in.existing_value; - return true; - } - - BlobIndex existing_index; - bool existing_index_valid = false; - if (merge_in.existing_value) { - assert(merge_in.value_type == kBlobIndex); - Slice copy = *merge_in.existing_value; - s = existing_index.DecodeFrom(©); - if (!s.ok()) { - return false; - } - existing_index_valid = !BlobIndex::IsDeletionMarker(existing_index); - } - if (!existing_index_valid) { - // this key must be deleted - merge_out->new_type = kBlobIndex; - merge_out->new_value.clear(); - BlobIndex::EncodeDeletionMarkerTo(&merge_out->new_value); - return true; - } - - MergeBlobIndex index; - BlobIndex merge_index; - for (auto operand : merge_in.operand_list) { - s = index.DecodeFrom(&operand); - if (!s.ok()) { - return false; - } - if (existing_index_valid) { - if (index.source_file_number == existing_index.file_number && - index.source_file_offset == existing_index.blob_handle.offset) { - existing_index_valid = false; - merge_index = index; - } - } else if (index.source_file_number == merge_index.file_number && - index.source_file_offset == merge_index.blob_handle.offset) { - merge_index = index; - } - } - merge_out->new_type = kBlobIndex; - if (existing_index_valid) { - merge_out->existing_operand = *merge_in.existing_value; - } else { - merge_out->new_value.clear(); - merge_index.EncodeTo(&merge_out->new_value); - } - return true; - } - - bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override { - return false; - } - - const char* Name() const override { return "BlobGCOperator"; } -}; - -} // namespace titandb -} // namespace rocksdb diff --git a/src/blob_index_merge_operator_test.cc b/src/blob_index_merge_operator_test.cc deleted file mode 100644 index e8f1e57bd..000000000 --- a/src/blob_index_merge_operator_test.cc +++ /dev/null @@ -1,180 +0,0 @@ -#include "test_util/testharness.h" - -#include "blob_index_merge_operator.h" - -namespace rocksdb { -namespace titandb { - -std::string GenKey(int i) { - char buffer[32]; - snprintf(buffer, sizeof(buffer), "k-%08d", i); - return buffer; -} - -std::string GenValue(int i) { - char buffer[32]; - snprintf(buffer, sizeof(buffer), "v-%08d", i); - return buffer; -} - -BlobIndex GenBlobIndex(uint32_t i, uint32_t j = 0) { - BlobIndex index; - index.file_number = i; - index.blob_handle.offset = j; - index.blob_handle.size = 10; - return index; -} - -MergeBlobIndex GenMergeBlobIndex(BlobIndex src, uint32_t i, uint32_t j = 0) { - MergeBlobIndex index; - index.file_number = i; - index.blob_handle.offset = j; - index.blob_handle.size = 10; - index.source_file_number = src.file_number; - index.source_file_offset = src.blob_handle.offset; - return index; -} - -ValueType ToValueType(MergeOperator::MergeValueType value_type) { - switch (value_type) { - case MergeOperator::kDeletion: - return kTypeDeletion; - case MergeOperator::kValue: - return kTypeValue; - case MergeOperator::kBlobIndex: - return kTypeBlobIndex; - default: - return kTypeValue; - } -} - -MergeOperator::MergeValueType ToMergeValueType(ValueType value_type) { - switch (value_type) { - case kTypeDeletion: - case kTypeSingleDeletion: - case kTypeRangeDeletion: - return MergeOperator::kDeletion; - case kTypeValue: - return MergeOperator::kValue; - case kTypeBlobIndex: - return MergeOperator::kBlobIndex; - default: - return MergeOperator::kValue; - } -} - -class BlobIndexMergeOperatorTest : public testing::Test { - public: - std::string key_; - ValueType value_type_{kTypeDeletion}; - std::string value_; - std::vector operands_; - std::shared_ptr merge_operator_; - - BlobIndexMergeOperatorTest() - : key_("k"), - merge_operator_(std::make_shared()) {} - - void Put(std::string value, ValueType type = kTypeValue) { - value_ = value; - value_type_ = type; - operands_.clear(); - } - - void Put(BlobIndex blob_index) { - value_.clear(); - blob_index.EncodeTo(&value_); - value_type_ = kTypeBlobIndex; - operands_.clear(); - } - - void Merge(MergeBlobIndex blob_index) { - std::string tmp; - blob_index.EncodeTo(&tmp); - operands_.emplace_back(tmp); - } - - void Read(ValueType expect_type, std::string expect_value) { - std::string tmp_result_string; - Slice tmp_result_operand(nullptr, 0); - MergeOperator::MergeValueType merge_type = ToMergeValueType(value_type_); - Slice value = value_; - std::vector operands; - for (auto& op : operands_) { - operands.emplace_back(op); - } - const MergeOperator::MergeOperationInput merge_in( - key_, merge_type, - merge_type == MergeOperator::kDeletion ? nullptr : &value, operands, - nullptr); - MergeOperator::MergeOperationOutput merge_out(tmp_result_string, - tmp_result_operand); - - ASSERT_EQ(true, merge_operator_->FullMergeV2(merge_in, &merge_out)); - ASSERT_EQ(true, merge_out.new_type != MergeOperator::kDeletion); - - if (merge_out.new_type == merge_type) { - ASSERT_EQ(expect_type, value_type_); - } else { - ASSERT_EQ(expect_type, ToValueType(merge_out.new_type)); - } - - if (tmp_result_operand.data()) { - ASSERT_EQ(expect_value, tmp_result_operand); - } else { - ASSERT_EQ(expect_value, tmp_result_string); - } - } - - void Clear() { - value_type_ = kTypeDeletion; - value_.clear(); - operands_.clear(); - } -}; - -TEST_F(BlobIndexMergeOperatorTest, KeepBaseValue) { - // [1] [2] (1->3) - Put(GenBlobIndex(2)); - Merge(GenMergeBlobIndex(GenBlobIndex(1), 3)); - std::string value; - GenBlobIndex(2).EncodeTo(&value); - Read(kTypeBlobIndex, value); - // [v] (1->2) - Clear(); - Put(GenValue(1)); - Merge(GenMergeBlobIndex(GenBlobIndex(1), 2)); - Read(kTypeValue, GenValue(1)); -} - -TEST_F(BlobIndexMergeOperatorTest, KeepLatestMerge) { - // [1] (1->2) (3->4) (2->5) - Put(GenBlobIndex(1)); - Merge(GenMergeBlobIndex(GenBlobIndex(1), 2)); - Merge(GenMergeBlobIndex(GenBlobIndex(3), 4)); - Merge(GenMergeBlobIndex(GenBlobIndex(2), 5)); - std::string value; - GenBlobIndex(5).EncodeTo(&value); - Read(kTypeBlobIndex, value); -} - -TEST_F(BlobIndexMergeOperatorTest, Delete) { - // [delete] (0->1) - Merge(GenMergeBlobIndex(GenBlobIndex(0), 1)); - std::string value; - BlobIndex::EncodeDeletionMarkerTo(&value); - Read(kTypeBlobIndex, value); - // [deletion marker] (0->1) - Clear(); - Put(value, kTypeBlobIndex); - Merge(GenMergeBlobIndex(GenBlobIndex(0), 1)); - Read(kTypeBlobIndex, value); -} - -} // namespace titandb -} // namespace rocksdb - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/src/compaction_filter.h b/src/compaction_filter.h index c5e41f885..bb85b4399 100644 --- a/src/compaction_filter.h +++ b/src/compaction_filter.h @@ -60,10 +60,6 @@ class TitanCompactionFilter final : public CompactionFilter { // Unable to decode blob index. Keeping the value. return Decision::kKeep; } - if (BlobIndex::IsDeletionMarker(blob_index)) { - // TODO(yiwu): handle deletion marker at bottom level. - return Decision::kKeep; - } BlobRecord record; PinnableSlice buffer; diff --git a/src/db_impl.cc b/src/db_impl.cc index b326061ab..742d4ae77 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -151,7 +151,6 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options, stats_.reset(new TitanStats(db_options_.statistics.get())); } blob_manager_.reset(new FileManager(this)); - shared_merge_operator_ = std::make_shared(); } TitanDBImpl::~TitanDBImpl() { Close(); } @@ -272,7 +271,6 @@ Status TitanDBImpl::OpenImpl(const std::vector& descs, db_options_, desc.options, this, blob_manager_, &mutex_, blob_file_set_.get(), stats_.get())); cf_opts.table_factory = titan_table_factories.back(); - cf_opts.merge_operator = shared_merge_operator_; if (cf_opts.compaction_filter != nullptr || cf_opts.compaction_filter_factory != nullptr) { std::shared_ptr titan_cf_factory = @@ -411,7 +409,6 @@ Status TitanDBImpl::CreateColumnFamilies( options.table_factory = titan_table_factory.back(); options.table_properties_collector_factories.emplace_back( std::make_shared()); - options.merge_operator = shared_merge_operator_; if (options.compaction_filter != nullptr || options.compaction_filter_factory != nullptr) { std::shared_ptr titan_cf_factory = @@ -626,9 +623,6 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, s = index.DecodeFrom(value); assert(s.ok()); if (!s.ok()) return s; - if (BlobIndex::IsDeletionMarker(index)) { - return Status::NotFound("encounter deletion marker"); - } BlobRecord record; PinnableSlice buffer; @@ -1049,8 +1043,6 @@ Status TitanDBImpl::SetOptions( auto opts = new_options; bool set_blob_run_mode = false; TitanBlobRunMode blob_run_mode = TitanBlobRunMode::kNormal; - bool set_gc_merge_rewrite = false; - bool gc_merge_rewrite = false; { auto p = opts.find("blob_run_mode"); set_blob_run_mode = (p != opts.end()); @@ -1069,19 +1061,6 @@ Status TitanDBImpl::SetOptions( opts.erase(p); } } - { - auto p = opts.find("gc_merge_rewrite"); - set_gc_merge_rewrite = (p != opts.end()); - if (set_gc_merge_rewrite) { - try { - gc_merge_rewrite = ParseBoolean("", p->second); - } catch (std::exception& e) { - return Status::InvalidArgument("Error parsing " + p->second + ":" + - std::string(e.what())); - } - opts.erase(p); - } - } if (opts.size() > 0) { s = db_->SetOptions(column_family, opts); if (!s.ok()) { @@ -1089,19 +1068,14 @@ Status TitanDBImpl::SetOptions( } } // Make sure base db's SetOptions success before setting blob_run_mode. - if (set_blob_run_mode || set_gc_merge_rewrite) { + if (set_blob_run_mode) { uint32_t cf_id = column_family->GetID(); { MutexLock l(&mutex_); assert(cf_info_.count(cf_id) > 0); TitanColumnFamilyInfo& cf_info = cf_info_[cf_id]; - if (set_blob_run_mode) { - cf_info.titan_table_factory->SetBlobRunMode(blob_run_mode); - cf_info.mutable_cf_options.blob_run_mode = blob_run_mode; - } - if (set_gc_merge_rewrite) { - cf_info.mutable_cf_options.gc_merge_rewrite = gc_merge_rewrite; - } + cf_info.titan_table_factory->SetBlobRunMode(blob_run_mode); + cf_info.mutable_cf_options.blob_run_mode = blob_run_mode; } } return Status::OK(); diff --git a/src/db_impl.h b/src/db_impl.h index f90383a1a..deec6103e 100644 --- a/src/db_impl.h +++ b/src/db_impl.h @@ -7,7 +7,6 @@ #include "blob_file_manager.h" #include "blob_file_set.h" -#include "blob_index_merge_operator.h" #include "table_factory.h" #include "titan/db.h" #include "titan_stats.h" @@ -289,7 +288,6 @@ class TitanDBImpl : public TitanDB { DBImpl* db_impl_; TitanDBOptions db_options_; std::unique_ptr directory_; - std::shared_ptr shared_merge_operator_; std::atomic initialized_{false}; diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index fc08f6089..acbb88e9c 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -164,7 +164,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, mutex_.AssertHeld(); std::unique_ptr blob_gc; - bool gc_merge_rewrite = false; std::unique_ptr cfh; Status s; @@ -188,8 +187,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); assert(column_family_id == cfh->GetID()); blob_gc->SetColumnFamily(cfh.get()); - gc_merge_rewrite = - cf_info_[column_family_id].mutable_cf_options.gc_merge_rewrite; } } @@ -201,10 +198,10 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, TITAN_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); } else { StopWatch gc_sw(env_, statistics(stats_.get()), TITAN_GC_MICROS); - BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, - gc_merge_rewrite, env_, env_options_, - blob_manager_.get(), blob_file_set_.get(), log_buffer, - &shuting_down_, stats_.get()); + BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, + env_options_, blob_manager_.get(), + blob_file_set_.get(), log_buffer, &shuting_down_, + stats_.get()); s = blob_gc_job.Prepare(); if (s.ok()) { mutex_.Unlock(); diff --git a/src/db_iter.h b/src/db_iter.h index 4ece3292e..5046218b2 100644 --- a/src/db_iter.h +++ b/src/db_iter.h @@ -51,7 +51,7 @@ class TitanDBIterator : public Iterator { iter_->SeekToFirst(); if (ShouldGetBlobValue()) { StopWatch seek_sw(env_, statistics(stats_), TITAN_SEEK_MICROS); - GetBlobValue(true); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_SEEK); } } @@ -60,7 +60,7 @@ class TitanDBIterator : public Iterator { iter_->SeekToLast(); if (ShouldGetBlobValue()) { StopWatch seek_sw(env_, statistics(stats_), TITAN_SEEK_MICROS); - GetBlobValue(false); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_SEEK); } } @@ -69,7 +69,7 @@ class TitanDBIterator : public Iterator { iter_->Seek(target); if (ShouldGetBlobValue()) { StopWatch seek_sw(env_, statistics(stats_), TITAN_SEEK_MICROS); - GetBlobValue(true); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_SEEK); } } @@ -78,7 +78,7 @@ class TitanDBIterator : public Iterator { iter_->SeekForPrev(target); if (ShouldGetBlobValue()) { StopWatch seek_sw(env_, statistics(stats_), TITAN_SEEK_MICROS); - GetBlobValue(false); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_SEEK); } } @@ -88,7 +88,7 @@ class TitanDBIterator : public Iterator { iter_->Next(); if (ShouldGetBlobValue()) { StopWatch next_sw(env_, statistics(stats_), TITAN_NEXT_MICROS); - GetBlobValue(true); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_NEXT); } } @@ -98,7 +98,7 @@ class TitanDBIterator : public Iterator { iter_->Prev(); if (ShouldGetBlobValue()) { StopWatch prev_sw(env_, statistics(stats_), TITAN_PREV_MICROS); - GetBlobValue(false); + GetBlobValue(); RecordTick(statistics(stats_), TITAN_NUM_PREV); } } @@ -128,7 +128,7 @@ class TitanDBIterator : public Iterator { return true; } - void GetBlobValue(bool forward) { + void GetBlobValue() { assert(iter_->status().ok()); BlobIndex index; @@ -140,30 +140,7 @@ class TitanDBIterator : public Iterator { status_.ToString().c_str()); return; } - while (BlobIndex::IsDeletionMarker(index)) { - // skip deletion marker - if (forward) { - iter_->Next(); - } else { - iter_->Prev(); - } - if (!ShouldGetBlobValue()) { - return; - } else { - status_ = DecodeInto(iter_->value(), &index); - if (!status_.ok()) { - TITAN_LOG_ERROR(info_log_, - "Titan iterator: failed to decode blob index %s: %s", - iter_->value().ToString(true /*hex*/).c_str(), - status_.ToString().c_str()); - return; - } - } - } - GetBlobValueImpl(index); - } - void GetBlobValueImpl(const BlobIndex& index) { auto it = files_.find(index.file_number); if (it == files_.end()) { std::unique_ptr prefetcher; diff --git a/src/options.cc b/src/options.cc index 19eafa238..8ae5097e6 100644 --- a/src/options.cc +++ b/src/options.cc @@ -43,7 +43,6 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts, sample_file_size_ratio(immutable_opts.sample_file_size_ratio), merge_small_file_threshold(immutable_opts.merge_small_file_threshold), blob_run_mode(mutable_opts.blob_run_mode), - gc_merge_rewrite(mutable_opts.gc_merge_rewrite), skip_value_in_compaction_filter( immutable_opts.skip_value_in_compaction_filter) {} diff --git a/src/table_builder_test.cc b/src/table_builder_test.cc index b85f9fdf0..2beb761ce 100644 --- a/src/table_builder_test.cc +++ b/src/table_builder_test.cc @@ -495,11 +495,11 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { std::string key(1, i); InternalKey ikey(key, 1, kTypeValue); std::string value; - if (i % 4 == 0) { + if (i % 3 == 0) { value = std::string(1, i); - } else if (i % 4 == 1) { + } else if (i % 3 == 1) { value = std::string(kMinBlobSize, i); - } else if (i % 4 == 2) { + } else if (i % 3 == 2) { ikey.Set(key, 1, kTypeBlobIndex); BlobIndex blobIndex; // set different values in different fields @@ -507,16 +507,6 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { blobIndex.blob_handle.size = i * 2 + 1; blobIndex.blob_handle.offset = i * 3 + 2; blobIndex.EncodeTo(&value); - } else { - ikey.Set(key, 1, kTypeMerge); - MergeBlobIndex mergeIndex; - // set different values in different fields - mergeIndex.file_number = i; - mergeIndex.blob_handle.size = i * 2 + 1; - mergeIndex.blob_handle.offset = i * 3 + 2; - mergeIndex.source_file_number = i * 4 + 3; - mergeIndex.source_file_offset = i * 5 + 4; - mergeIndex.EncodeTo(&value); } table_builder->Add(ikey.Encode(), value); } @@ -541,10 +531,10 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey)); // check order ASSERT_EQ(ikey.user_key, key); - if (i % 4 == 0) { + if (i % 3 == 0) { ASSERT_EQ(ikey.type, kTypeValue); ASSERT_EQ(iter->value(), std::string(1, i)); - } else if (i % 4 == 1) { + } else if (i % 3 == 1) { ASSERT_EQ(ikey.type, kTypeBlobIndex); BlobIndex index; ASSERT_OK(DecodeInto(iter->value(), &index)); @@ -554,7 +544,7 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer)); ASSERT_EQ(record.key, key); ASSERT_EQ(record.value, std::string(kMinBlobSize, i)); - } else if (i % 4 == 2) { + } else if (i % 3 == 2) { ASSERT_EQ(ikey.type, kTypeBlobIndex); BlobIndex index; // We do not have corresponding blob file in this test, so we only check @@ -563,17 +553,6 @@ TEST_F(TableBuilderTest, DictCompressDisorder) { ASSERT_EQ(index.file_number, i); ASSERT_EQ(index.blob_handle.size, i * 2 + 1); ASSERT_EQ(index.blob_handle.offset, i * 3 + 2); - } else { - ASSERT_EQ(ikey.type, kTypeMerge); - MergeBlobIndex mergeIndex; - // We do not have corresponding blob file in this test, so we only check - // MergeBlobIndex. - ASSERT_OK(DecodeInto(iter->value(), &mergeIndex)); - ASSERT_EQ(mergeIndex.file_number, i); - ASSERT_EQ(mergeIndex.blob_handle.size, i * 2 + 1); - ASSERT_EQ(mergeIndex.blob_handle.offset, i * 3 + 2); - ASSERT_EQ(mergeIndex.source_file_number, i * 4 + 3); - ASSERT_EQ(mergeIndex.source_file_offset, i * 5 + 4); } iter->Next(); } diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index 943bcf6d1..07325864d 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -999,24 +999,13 @@ TEST_F(TitanDBTest, SetOptions) { std::unordered_map opts; - // Set titan blob_run_mode. + // Set titan options. opts["blob_run_mode"] = "kReadOnly"; ASSERT_OK(db_->SetOptions(opts)); titan_options = db_->GetTitanOptions(); ASSERT_EQ(TitanBlobRunMode::kReadOnly, titan_options.blob_run_mode); opts.clear(); - // Set titan gc_merge_rewrite. - opts["gc_merge_rewrite"] = "true"; - ASSERT_OK(db_->SetOptions(opts)); - titan_options = db_->GetTitanOptions(); - ASSERT_EQ(true, titan_options.gc_merge_rewrite); - opts["gc_merge_rewrite"] = "0"; - ASSERT_OK(db_->SetOptions(opts)); - titan_options = db_->GetTitanOptions(); - ASSERT_EQ(false, titan_options.gc_merge_rewrite); - opts.clear(); - // Set column family options. opts["disable_auto_compactions"] = "true"; ASSERT_OK(db_->SetOptions(opts));