diff --git a/db/builder.cc b/db/builder.cc index afb8e44030b..4a0a9b36fb0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -75,13 +75,16 @@ Status BuildTable( InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, int level, const uint64_t creation_time, - const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) { + const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint, + InternalIterator* cmp_iter, + std::unique_ptr cmp_range_del_iter) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; + uint64_t num_duplicated = 0, num_total = 0; meta->fd.file_size = 0; iter->SeekToFirst(); std::unique_ptr range_del_agg( @@ -91,6 +94,15 @@ Status BuildTable( // may be non-ok if a range tombstone key is unparsable return s; } + std::unique_ptr cmp_range_del_agg( + new RangeDelAggregator(internal_comparator, snapshots)); + if (cmp_range_del_iter != nullptr) { + s = cmp_range_del_agg->AddTombstones(std::move(cmp_range_del_iter)); + if (!s.ok()) { + // may be non-ok if a range tombstone key is unparsable + return s; + } + } std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); @@ -139,19 +151,64 @@ Status BuildTable( &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, true /* internal key corruption is not ok */, range_del_agg.get()); c_iter.SeekToFirst(); + + ParsedInternalKey c_ikey, cmp_ikey; + if (cmp_iter != nullptr) { + cmp_iter->SeekToFirst(); + } + const rocksdb::Comparator *comp = internal_comparator.user_comparator(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); const Slice& value = c_iter.value(); - builder->Add(key, value); - meta->UpdateBoundaries(key, c_iter.ikey().sequence); - - // TODO(noetzli): Update stats after flush, too. - if (io_priority == Env::IO_HIGH && - IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { - ThreadStatusUtil::SetThreadOperationProperty( - ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + bool skip = false; + num_total++; + + // Check whether the key is duplicated in later mems + if (ParseInternalKey(key, &c_ikey)) { + if (cmp_iter != nullptr ) { + while (cmp_iter->Valid()) { + if (!ParseInternalKey(cmp_iter->key(), &cmp_ikey)) { + cmp_iter->Next(); + continue; + } + int result = comp->Compare(c_ikey.user_key, cmp_ikey.user_key); + if (result == 0) { + // No delete for merge options. + if (c_ikey.type != kTypeMerge && cmp_ikey.type != kTypeMerge) { + skip = true; + } + break; + } + else if (result > 0) { + cmp_iter->Next(); + } + else { + break; + } + } + } + if (!skip && cmp_range_del_agg->ShouldDelete(c_ikey)) { + skip = true; + } + } + + if (skip) { + num_duplicated++; + continue; + } + else { + builder->Add(key, value); + meta->UpdateBoundaries(key, c_iter.ikey().sequence); + + // TODO(noetzli): Update stats after flush, too. + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + } } } + // nullptr for table_{min,max} so all range tombstones will be flushed range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */, nullptr /* upper_bound */, meta); @@ -219,9 +276,11 @@ Status BuildTable( } // Output to event logger and fire events. + bool show_num = (cmp_iter != nullptr || + cmp_range_del_iter != nullptr)?true:false; EventHelpers::LogAndNotifyTableFileCreationFinished( event_logger, ioptions.listeners, dbname, column_family_name, fname, - job_id, meta->fd, tp, reason, s); + job_id, meta->fd, tp, reason, s, show_num, num_total, num_duplicated); return s; } diff --git a/db/builder.h b/db/builder.h index d83644499b8..c29c1ab8348 100644 --- a/db/builder.h +++ b/db/builder.h @@ -80,6 +80,7 @@ extern Status BuildTable( const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, - Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET); - + Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET, + InternalIterator* cmp_iter = nullptr, + std::unique_ptr cmp_range_del_iter = nullptr); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index 064103ed40a..9d88aa9ef31 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2331,6 +2331,10 @@ void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t* opt, int n) opt->rep.max_write_buffer_number = n; } +void rocksdb_options_set_flush_style(rocksdb_options_t* opt, int style) { + opt->rep.flush_style = static_cast(style); +} + void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt, int n) { opt->rep.min_write_buffer_number_to_merge = n; } diff --git a/db/column_family.cc b/db/column_family.cc index c05373293ab..ad4166c76cc 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -195,6 +195,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, if (result.max_write_buffer_number < 2) { result.max_write_buffer_number = 2; } + if (result.max_write_buffer_number_to_maintain < 0) { result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; } @@ -375,6 +376,7 @@ ColumnFamilyData::ColumnFamilyData( mem_(nullptr), imm_(ioptions_.min_write_buffer_number_to_merge, ioptions_.max_write_buffer_number_to_maintain), + stop(false), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), diff --git a/db/column_family.h b/db/column_family.h index f89fc10d112..5ad003cccc3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -240,6 +240,9 @@ class ColumnFamilyData { MemTable* mem() { return mem_; } Version* current() { return current_; } Version* dummy_versions() { return dummy_versions_; } + void set_stop() { stop = true; } + bool is_stop() { return stop; } + bool is_flush_recursive_dedup() { return (ioptions_.flush_style == kFlushStyleDedup); } void SetCurrent(Version* _current); uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held @@ -386,6 +389,7 @@ class ColumnFamilyData { MemTable* mem_; MemTableList imm_; + bool stop; SuperVersion* super_version_; // An ordinal representing the current SuperVersion. Updated by diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 4752d34285a..ccedec46fa6 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -68,7 +68,7 @@ class CompactionPickerTest : public testing::Test { DeleteVersionStorage(); options_.num_levels = num_levels; vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels, - style, nullptr, false)); + kFlushStyleMerge, style, nullptr, false)); vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 7136a813e61..541c4ba07cb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -257,6 +257,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { cfd->Ref(); + cfd->set_stop(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 1b79acb0f2c..df91a800f0e 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -61,7 +61,8 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, const FileDescriptor& fd, const TableProperties& table_properties, TableFileCreationReason reason, - const Status& s) { + const Status& s, bool show_num, + const uint64_t total_num, const uint64_t dup_num) { if (s.ok() && event_logger) { JSONWriter jwriter; AppendCurrentTime(&jwriter); @@ -89,6 +90,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( << "num_data_blocks" << table_properties.num_data_blocks << "num_entries" << table_properties.num_entries << "filter_policy_name" << table_properties.filter_policy_name; + if (show_num) { + jwriter << "total_paris" << total_num << "duplicated_pairs" << dup_num; + } // user collected properties for (const auto& prop : table_properties.readable_properties) { diff --git a/db/event_helpers.h b/db/event_helpers.h index 674e6c5f6fc..a058c2853ef 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -35,7 +35,8 @@ class EventHelpers { const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, const FileDescriptor& fd, const TableProperties& table_properties, TableFileCreationReason reason, - const Status& s); + const Status& s, const bool show_num = false, + const uint64_t total_num = 0, const uint64_t duplicated_num = 0); static void LogAndNotifyTableFileDeletion( EventLogger* event_logger, int job_id, uint64_t file_number, const std::string& file_path, diff --git a/db/flush_job.cc b/db/flush_job.cc index 2181bebb413..92a31d37b2e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -156,11 +156,16 @@ void FlushJob::PickMemTable() { assert(!pick_memtable_called); pick_memtable_called = true; // Save the contents of the earliest memtable as a new Table - cfd_->imm()->PickMemtablesToFlush(&mems_); + + if (cfd_->is_flush_recursive_dedup() && !cfd_->is_stop()) { + cfd_->imm()->PickMemtablesToFlush(&mems_, &compare_mems_); + } + else { + cfd_->imm()->PickMemtablesToFlush(&mems_); + } if (mems_.empty()) { return; } - ReportFlushInputSize(mems_); // entries mems are (implicitly) sorted in ascending order by their created @@ -287,6 +292,7 @@ Status FlushJob::WriteLevel0Table() { ReadOptions ro; ro.total_order_seek = true; Arena arena; + Arena arena1; uint64_t total_num_entries = 0, total_num_deletes = 0; size_t total_memory_usage = 0; for (MemTable* m : mems_) { @@ -304,13 +310,30 @@ Status FlushJob::WriteLevel0Table() { total_memory_usage += m->ApproximateMemoryUsage(); } + std::vector cmp_memtables; + std::vector cmp_range_del_iters; + uint64_t cmp_entries = 0; + uint64_t cmp_num_deletes = 0; + for (MemTable *m: compare_mems_) { + cmp_memtables.push_back(m->NewIterator(ro, &arena1)); + auto* range_del_iter = m->NewRangeTombstoneIterator(ro); + if (range_del_iter != nullptr) { + cmp_range_del_iters.push_back(range_del_iter); + } + cmp_entries += m->num_entries(); + cmp_num_deletes += m->num_deletes(); + } + event_logger_->Log() - << "job" << job_context_->job_id << "event" + << "job" << job_context_->job_id << "event" << "flush_started" - << "num_memtables" << mems_.size() << "num_entries" << total_num_entries - << "num_deletes" << total_num_deletes << "memory_usage" + << "num_memtables" << mems_.size() << "num_entries"<< total_num_entries + << "num_deletes" << total_num_deletes << "memory_usage" << total_memory_usage << "flush_reason" - << GetFlushReasonString(cfd_->GetFlushReason()); + << GetFlushReasonString(cfd_->GetFlushReason()) + << " compare mems count " << compare_mems_.size() + << " compare mems total " << cmp_entries + << " compare mems total delete " << cmp_num_deletes; { ScopedArenaIterator iter( @@ -320,6 +343,15 @@ Status FlushJob::WriteLevel0Table() { &cfd_->internal_comparator(), range_del_iters.empty() ? nullptr : &range_del_iters[0], static_cast(range_del_iters.size()))); + + ScopedArenaIterator cmp_iter( + NewMergingIterator( + &cfd_->internal_comparator(), &cmp_memtables[0], + static_cast(cmp_memtables.size()), &arena1)); + std::unique_ptr cmp_range_del_iter(NewMergingIterator( + &cfd_->internal_comparator(), + cmp_range_del_iters.empty() ? nullptr : &cmp_range_del_iters[0], + static_cast(cmp_range_del_iters.size()))); ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", cfd_->GetName().c_str(), job_context_->job_id, @@ -353,7 +385,8 @@ Status FlushJob::WriteLevel0Table() { mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, - oldest_key_time, write_hint); + oldest_key_time, write_hint, + cmp_iter.get(), std::move(cmp_range_del_iter)); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index 81a8de3063a..ceed75a6065 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -104,6 +104,7 @@ class FlushJob { // Variables below are set by PickMemTable(): FileMetaData meta_; autovector mems_; + autovector compare_mems_; VersionEdit* edit_; Version* base_; bool pick_memtable_called; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index a09a118b90d..fd651893005 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -279,6 +279,32 @@ bool MemTableList::IsFlushPending() const { return false; } +// Returns the memtables that need to be flushed. +void MemTableList::PickMemtablesToFlush(autovector* ret, + autovector* compare_ret) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_in_progress_) { + assert(!m->flush_completed_); + if (it == memlist.rbegin()) { + num_flush_not_started_--; + if (num_flush_not_started_ == 0) { + imm_flush_needed.store(false, std::memory_order_release); + } + m->flush_in_progress_ = true; // flushing will start very soon + ret->push_back(m); + } + else { + compare_ret->push_back(m); + } + } + } + flush_requested_ = false; // start-flush request is complete +} + // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(autovector* ret) { AutoThreadOperationStageUpdater stage_updater( diff --git a/db/memtable_list.h b/db/memtable_list.h index c2ac65a2fc5..b3fa98a6fd9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -200,6 +200,7 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. + void PickMemtablesToFlush(autovector* mems, autovector* compare_mems); void PickMemtablesToFlush(autovector* mems); // Reset status of the given memtable list back to pending state so that diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 304df2a0455..e4d07aa0db3 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -30,7 +30,8 @@ class VersionBuilderTest : public testing::Test { icmp_(ucmp_), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, + vstorage_(&icmp_, ucmp_, options_.num_levels, + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false), file_num_(1) { mutable_cf_options_.RefreshDerivedOptions(ioptions_); @@ -123,7 +124,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -158,7 +160,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -198,7 +201,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -229,7 +233,7 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -244,7 +248,7 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { EnvOptions env_options; VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false); VersionEdit version_edit; version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), diff --git a/db/version_set.cc b/db/version_set.cc index 4af3355908f..59f44d87881 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -878,7 +878,9 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, - CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage, + FlushStyle flush_style, + CompactionStyle compaction_style, + VersionStorageInfo* ref_vstorage, bool _force_consistency_checks) : internal_comparator_(internal_comparator), user_comparator_(user_comparator), @@ -886,6 +888,7 @@ VersionStorageInfo::VersionStorageInfo( num_levels_(levels), num_non_empty_levels_(0), file_indexer_(user_comparator), + flush_style_(flush_style), compaction_style_(compaction_style), files_(new std::vector[num_levels_]), base_level_(num_levels_ == 1 ? -1 : 1), @@ -934,6 +937,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(), (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(), cfd_ == nullptr ? 0 : cfd_->NumberLevels(), + cfd_ == nullptr ? kFlushStyleMerge + : cfd_->ioptions()->flush_style, cfd_ == nullptr ? kCompactionStyleLevel : cfd_->ioptions()->compaction_style, (cfd_ == nullptr || cfd_->current() == nullptr) diff --git a/db/version_set.h b/db/version_set.h index ea6e4e88a75..343e57440a1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -94,6 +94,7 @@ class VersionStorageInfo { public: VersionStorageInfo(const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int num_levels, + FlushStyle flush_style, CompactionStyle compaction_style, VersionStorageInfo* src_vstorage, bool _force_consistency_checks); @@ -411,6 +412,7 @@ class VersionStorageInfo { FileIndexer file_indexer_; Arena arena_; // Used to allocate space for file_levels_ + FlushStyle flush_style_; CompactionStyle compaction_style_; // List of files per level, files in each level are arranged diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 625d4592264..d266698860a 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -110,7 +110,8 @@ class VersionStorageInfoTest : public testing::Test { options_(GetOptionsWithNumLevels(6, logger_)), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {} + vstorage_(&icmp_, ucmp_, 6, kFlushStyleMerge, + kCompactionStyleLevel, nullptr, false) {} ~VersionStorageInfoTest() { for (int i = 0; i < vstorage_.num_levels(); i++) { diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index e7436c772d4..e49b738d6c3 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -56,6 +56,15 @@ enum CompactionPri : char { kMinOverlappingRatio = 0x3, }; +// Indicate the way when flushing memtables. +enum FlushStyle : unsigned char { + // Current style. + kFlushStyleMerge = 0x0, + // Recursively delete duplicated/invalid key/value pairs compared to later + // memtables when flushing. + kFlushStyleDedup = 0x1, +}; + struct CompactionOptionsFIFO { // once the total sum of table files reaches this, we will delete the oldest // table file @@ -472,6 +481,8 @@ struct AdvancedColumnFamilyOptions { // Default: 256GB uint64_t hard_pending_compaction_bytes_limit = 256 * 1073741824ull; + FlushStyle flush_style = kFlushStyleMerge; + // The compaction style. Default: kCompactionStyleLevel CompactionStyle compaction_style = kCompactionStyleLevel; diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index d5b739ccc1e..abe8bb1b4b4 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -798,6 +798,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_flush_style(rocksdb_options_t*, int); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_background_compactions( diff --git a/options/cf_options.cc b/options/cf_options.cc index 227c662e423..14687969d0b 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -25,7 +25,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& cf_options) - : compaction_style(cf_options.compaction_style), + : flush_style(cf_options.flush_style), + compaction_style(cf_options.compaction_style), compaction_pri(cf_options.compaction_pri), prefix_extractor(cf_options.prefix_extractor.get()), user_comparator(cf_options.comparator), diff --git a/options/cf_options.h b/options/cf_options.h index 40db7c0eafc..6ceaa119dc5 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -26,6 +26,8 @@ struct ImmutableCFOptions { ImmutableCFOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& cf_options); + FlushStyle flush_style; + CompactionStyle compaction_style; CompactionPri compaction_pri; diff --git a/options/options.cc b/options/options.cc index 03591b61eb9..994f4953fff 100644 --- a/options/options.cc +++ b/options/options.cc @@ -72,6 +72,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.soft_pending_compaction_bytes_limit), hard_pending_compaction_bytes_limit( options.hard_pending_compaction_bytes_limit), + flush_style(options.flush_style), compaction_style(options.compaction_style), compaction_pri(options.compaction_pri), compaction_options_universal(options.compaction_options_universal), @@ -214,6 +215,19 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d", disable_auto_compactions); + const auto& it_flush_style = + flush_style_to_string.find(flush_style); + std::string str_flush_style; + if (it_flush_style == flush_style_to_string.end()) { + assert(false); + str_flush_style = "unknown_" + std::to_string(flush_style); + } else { + str_flush_style = it_flush_style->second; + } + ROCKS_LOG_HEADER(log, + " Options.flush_style: %s", + str_flush_style.c_str()); + const auto& it_compaction_style = compaction_style_to_string.find(compaction_style); std::string str_compaction_style; @@ -473,6 +487,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeLevelStyleCompaction( // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast max_bytes_for_level_base = memtable_memory_budget; + flush_style = kFlushStyleMerge; // level style compaction compaction_style = kCompactionStyleLevel; @@ -496,6 +511,8 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeUniversalStyleCompaction( // this means we'll use 50% extra memory in the worst case, but will reduce // write stalls. max_write_buffer_number = 6; + + flush_style = kFlushStyleMerge; // universal style compaction compaction_style = kCompactionStyleUniversal; compaction_options_universal.compression_size_percent = 80; diff --git a/options/options_helper.cc b/options/options_helper.cc index 7e4feff4a91..5be68482190 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -192,6 +192,11 @@ std::map {kCompactionStyleFIFO, "kCompactionStyleFIFO"}, {kCompactionStyleNone, "kCompactionStyleNone"}}; +std::map + OptionsHelper::flush_style_to_string = { + {kFlushStyleMerge, "kFlushStyleMerge"}, + {kFlushStyleDedup, "kFlushStyleDedup"}}; + std::map OptionsHelper::compaction_pri_to_string = { {kByCompensatedSize, "kByCompensatedSize"}, {kOldestLargestSeqFirst, "kOldestLargestSeqFirst"}, @@ -453,6 +458,10 @@ bool ParseOptionHelper(char* opt_address, const OptionType& opt_type, case OptionType::kDouble: *reinterpret_cast(opt_address) = ParseDouble(value); break; + case OptionType::kFlushStyle: + return ParseEnum( + flush_style_string_map, value, + reinterpret_cast(opt_address)); case OptionType::kCompactionStyle: return ParseEnum( compaction_style_string_map, value, @@ -566,6 +575,10 @@ bool SerializeSingleOptionHelper(const char* opt_address, *value = EscapeOptionString( *(reinterpret_cast(opt_address))); break; + case OptionType::kFlushStyle: + return SerializeEnum( + flush_style_string_map, + *(reinterpret_cast(opt_address)), value); case OptionType::kCompactionStyle: return SerializeEnum( compaction_style_string_map, @@ -1506,6 +1519,11 @@ std::unordered_map {"kCompactionStyleFIFO", kCompactionStyleFIFO}, {"kCompactionStyleNone", kCompactionStyleNone}}; +std::unordered_map + OptionsHelper::flush_style_string_map = { + {"kFlushStyleMerge", kFlushStyleMerge}, + {"kFlushStyleDedup", kFlushStyleDedup}}; + std::unordered_map OptionsHelper::compaction_pri_string_map = { {"kByCompensatedSize", kByCompensatedSize}, @@ -1795,6 +1813,10 @@ std::unordered_map {offset_of(&ColumnFamilyOptions::compaction_style), OptionType::kCompactionStyle, OptionVerificationType::kNormal, false, 0}}, + {"flush_style", + {offset_of(&ColumnFamilyOptions::flush_style), + OptionType::kFlushStyle, OptionVerificationType::kNormal, false, + 0}}, {"compaction_pri", {offset_of(&ColumnFamilyOptions::compaction_pri), OptionType::kCompactionPri, OptionVerificationType::kNormal, false, diff --git a/options/options_helper.h b/options/options_helper.h index 2e46877db3c..d712fc27a6c 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -55,6 +55,7 @@ enum class OptionType { kString, kDouble, kCompactionStyle, + kFlushStyle, kCompactionPri, kSliceTransform, kCompressionType, @@ -140,6 +141,7 @@ extern bool ParseOptionHelper(char* opt_address, const OptionType& opt_type, struct OptionsHelper { static std::map compaction_style_to_string; + static std::map flush_style_to_string; static std::map compaction_pri_to_string; static std::map compaction_stop_style_to_string; @@ -162,6 +164,8 @@ struct OptionsHelper { static std::unordered_map encoding_type_string_map; static std::unordered_map compaction_style_string_map; + static std::unordered_map + flush_style_string_map; static std::unordered_map compaction_pri_string_map; static std::unordered_map @@ -180,6 +184,8 @@ struct OptionsHelper { // Some aliasing static auto& compaction_style_to_string = OptionsHelper::compaction_style_to_string; +static auto& flush_style_to_string = + OptionsHelper::flush_style_to_string; static auto& compaction_pri_to_string = OptionsHelper::compaction_pri_to_string; static auto& compaction_stop_style_to_string = OptionsHelper::compaction_stop_style_to_string; @@ -202,6 +208,8 @@ static auto& block_base_table_index_type_string_map = static auto& encoding_type_string_map = OptionsHelper::encoding_type_string_map; static auto& compaction_style_string_map = OptionsHelper::compaction_style_string_map; +static auto& flush_style_string_map = + OptionsHelper::flush_style_string_map; static auto& compaction_pri_string_map = OptionsHelper::compaction_pri_string_map; static auto& wal_recovery_mode_string_map = diff --git a/options/options_parser.cc b/options/options_parser.cc index 040e51c9291..d96fd282cc7 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -557,6 +557,9 @@ bool AreEqualOptions( case OptionType::kCompactionStyle: return (*reinterpret_cast(offset1) == *reinterpret_cast(offset2)); + case OptionType::kFlushStyle: + return (*reinterpret_cast(offset1) == + *reinterpret_cast(offset2)); case OptionType::kCompactionPri: return (*reinterpret_cast(offset1) == *reinterpret_cast(offset2)); diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index d64473c3a38..029431261b3 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -431,6 +431,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "optimize_filters_for_hits=false;" "level_compaction_dynamic_level_bytes=false;" "inplace_update_support=false;" + "flush_style=kFlushStyleMerge;" "compaction_style=kCompactionStyleFIFO;" "compaction_pri=kMinOverlappingRatio;" "hard_pending_compaction_bytes_limit=0;" diff --git a/options/options_test.cc b/options/options_test.cc index edbfd3cfb71..8a7ac0d9e5a 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -80,6 +80,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"arena_block_size", "22"}, {"disable_auto_compactions", "true"}, {"compaction_style", "kCompactionStyleLevel"}, + {"flush_style", "kFlushStyleMerge"}, {"compaction_pri", "kOldestSmallestSeqFirst"}, {"verify_checksums_in_compaction", "false"}, {"compaction_options_fifo", "23"}, @@ -178,6 +179,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.arena_block_size, 22U); ASSERT_EQ(new_cf_opt.disable_auto_compactions, true); ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel); + ASSERT_EQ(new_cf_opt.flush_style, kFlushStyleMerge); ASSERT_EQ(new_cf_opt.compaction_pri, kOldestSmallestSeqFirst); ASSERT_EQ(new_cf_opt.compaction_options_fifo.max_table_files_size, static_cast(23));