diff --git a/db/builder.cc b/db/builder.cc index 0c0bbb236b6..713f54b55c7 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -76,13 +76,15 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level) { + TableProperties* table_properties, int level, + InternalIterator* compare_iter) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; + uint64_t num_duplicated = 0, num_total = 0; meta->fd.file_size = 0; iter->SeekToFirst(); std::unique_ptr range_del_agg( @@ -138,17 +140,56 @@ Status BuildTable( &snapshots, earliest_write_conflict_snapshot, env, true /* internal key corruption is not ok */, range_del_agg.get()); c_iter.SeekToFirst(); + + ParsedInternalKey c_ikey, comp_ikey; + if (compare_iter != nullptr) { + compare_iter->SeekToFirst(); //find the first one + } + const rocksdb::Comparator *comp = internal_comparator.user_comparator(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); const Slice& value = c_iter.value(); - builder->Add(key, value); - meta->UpdateBoundaries(key, c_iter.ikey().sequence); - - // TODO(noetzli): Update stats after flush, too. - if (io_priority == Env::IO_HIGH && - IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { - ThreadStatusUtil::SetThreadOperationProperty( - ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + bool skip = false; + num_total++; + + // Check whether the key is duplicated in later mems + c_ikey = c_iter.ikey(); + if (compare_iter != nullptr ) { + while (compare_iter->Valid()) { + if (!ParseInternalKey(compare_iter->key(), &comp_ikey)) { + compare_iter->Next(); + continue; + } + int result = comp->Compare(c_ikey.user_key, comp_ikey.user_key); + if (result == 0) { + // No delete for merge options. + if (c_ikey.type != kTypeMerge && comp_ikey.type != kTypeMerge) { + skip = true; + } + break; + } + else if (result > 0) { + compare_iter->Next(); + } + else { + break; + } + } + } + if (skip) { + num_duplicated++; + continue; + } + else { + builder->Add(key, value); + meta->UpdateBoundaries(key, c_iter.ikey().sequence); + + // TODO(noetzli): Update stats after flush, too. + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + } } } // nullptr for table_{min,max} so all range tombstones will be flushed @@ -218,9 +259,10 @@ Status BuildTable( } // Output to event logger and fire events. + bool show_num = (compare_iter != nullptr)?true:false; EventHelpers::LogAndNotifyTableFileCreationFinished( event_logger, ioptions.listeners, dbname, column_family_name, fname, - job_id, meta->fd, tp, reason, s); + job_id, meta->fd, tp, reason, s, show_num, num_total, num_duplicated); return s; } diff --git a/db/builder.h b/db/builder.h index b438aad8f9e..02e6c1d40bb 100644 --- a/db/builder.h +++ b/db/builder.h @@ -79,6 +79,7 @@ extern Status BuildTable( InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr, int level = -1); + TableProperties* table_properties = nullptr, int level = -1, + InternalIterator* compare_iter = nullptr); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index cc359b2c183..27435eb8477 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2263,6 +2263,14 @@ void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t* opt, int n) opt->rep.max_write_buffer_number = n; } +void rocksdb_options_set_flush_style(rocksdb_options_t* opt, int style) { + opt->rep.flush_style = static_cast(style); +} + +void rocksdb_options_set_write_buffer_number_to_flush(rocksdb_options_t* opt, int n) { + opt->rep.write_buffer_number_to_flush = n; +} + void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt, int n) { opt->rep.min_write_buffer_number_to_merge = n; } diff --git a/db/column_family.cc b/db/column_family.cc index 72186b706a0..cfb3700f206 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -165,6 +165,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, result.min_write_buffer_number_to_merge = 1; } + if (result.flush_style == kFlushStyleDedup) { + if (result.write_buffer_number_to_flush < 1 || + (result.write_buffer_number_to_flush > + result.min_write_buffer_number_to_merge)) { + result.write_buffer_number_to_flush = 1; + } + } + if (result.num_levels < 1) { result.num_levels = 1; } @@ -181,6 +189,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, if (result.max_write_buffer_number < 2) { result.max_write_buffer_number = 2; } + if (result.max_write_buffer_number_to_maintain < 0) { result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; } @@ -359,7 +368,9 @@ ColumnFamilyData::ColumnFamilyData( write_buffer_manager_(write_buffer_manager), mem_(nullptr), imm_(ioptions_.min_write_buffer_number_to_merge, - ioptions_.max_write_buffer_number_to_maintain), + ioptions_.max_write_buffer_number_to_maintain, + ioptions_.write_buffer_number_to_flush), + stop(false), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), diff --git a/db/column_family.h b/db/column_family.h index a812777a933..c1d66f51a46 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -236,6 +236,9 @@ class ColumnFamilyData { MemTable* mem() { return mem_; } Version* current() { return current_; } Version* dummy_versions() { return dummy_versions_; } + void set_stop() { stop = true; } + bool is_stop() { return stop; } + bool is_flush_recursive_dedup() { return (ioptions_.flush_style == kFlushStyleDedup); } void SetCurrent(Version* _current); uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held @@ -371,6 +374,7 @@ class ColumnFamilyData { MemTable* mem_; MemTableList imm_; + bool stop; SuperVersion* super_version_; // An ordinal representing the current SuperVersion. Updated by diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fbc05e3475f..c1d1d98d512 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -1027,6 +1027,7 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) { default_cf.arena_block_size = 4 * 4096; default_cf.max_write_buffer_number = 10; default_cf.min_write_buffer_number_to_merge = 1; + default_cf.write_buffer_number_to_flush = 0; default_cf.max_write_buffer_number_to_maintain = 0; one.write_buffer_size = 200000; one.arena_block_size = 4 * 4096; diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 5fb3db0a9e8..e1b6ddae2d3 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -70,7 +70,7 @@ class CompactionPickerTest : public testing::Test { DeleteVersionStorage(); options_.num_levels = num_levels; vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels, - style, nullptr, false)); + kFlushStyleMerge, style, nullptr, false)); vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 0f412a754bc..c7b91e5f568 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -226,6 +226,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) { cfd->Ref(); + cfd->set_stop(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); mutex_.Lock(); diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 7ad97893430..76e67043ec1 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -45,7 +45,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, const FileDescriptor& fd, const TableProperties& table_properties, TableFileCreationReason reason, - const Status& s) { + const Status& s, bool show_num, int total_num, int dup_num) { if (s.ok() && event_logger) { JSONWriter jwriter; AppendCurrentTime(&jwriter); @@ -73,6 +73,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( << "num_data_blocks" << table_properties.num_data_blocks << "num_entries" << table_properties.num_entries << "filter_policy_name" << table_properties.filter_policy_name; + if (show_num) { + jwriter << "total_paris" << total_num << "duplicated_pairs" << dup_num; + } // user collected properties for (const auto& prop : table_properties.readable_properties) { diff --git a/db/event_helpers.h b/db/event_helpers.h index 20515be7686..12f3000f7cc 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -33,7 +33,7 @@ class EventHelpers { const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, const FileDescriptor& fd, const TableProperties& table_properties, TableFileCreationReason reason, - const Status& s); + const Status& s, bool show_num = false, int total_num = 0, int duplicated_num = 0); static void LogAndNotifyTableFileDeletion( EventLogger* event_logger, int job_id, uint64_t file_number, const std::string& file_path, diff --git a/db/flush_job.cc b/db/flush_job.cc index 5c4645eeb42..bcba3140718 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -129,11 +129,16 @@ void FlushJob::PickMemTable() { assert(!pick_memtable_called); pick_memtable_called = true; // Save the contents of the earliest memtable as a new Table - cfd_->imm()->PickMemtablesToFlush(&mems_); + + if (cfd_->is_flush_recursive_dedup() && !cfd_->is_stop()) { + cfd_->imm()->PickMemtablesToFlush(&mems_, &compare_mems_); + } + else { + cfd_->imm()->PickMemtablesToFlush(&mems_); + } if (mems_.empty()) { return; } - ReportFlushInputSize(mems_); // entries mems are (implicitly) sorted in ascending order by their created @@ -257,6 +262,7 @@ Status FlushJob::WriteLevel0Table() { ReadOptions ro; ro.total_order_seek = true; Arena arena; + Arena arena1; uint64_t total_num_entries = 0, total_num_deletes = 0; size_t total_memory_usage = 0; for (MemTable* m : mems_) { @@ -274,17 +280,36 @@ Status FlushJob::WriteLevel0Table() { total_memory_usage += m->ApproximateMemoryUsage(); } + std::vector compare_memtables; + int comp_entries = 0; + for (MemTable *m: compare_mems_) { + compare_memtables.push_back(m->NewIterator(ro, &arena1)); + comp_entries += m->num_entries(); + } + event_logger_->Log() << "job" << job_context_->job_id << "event" << "flush_started" << "num_memtables" << mems_.size() << "num_entries" << total_num_entries << "num_deletes" << total_num_deletes << "memory_usage" - << total_memory_usage; + << total_memory_usage << "mems table count " + << mems_.size() << " compare mems count " + << compare_mems_.size() << " compare mems total " + << comp_entries; { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], static_cast(memtables.size()), &arena)); + InternalIterator *compare_iter = nullptr; + ScopedArenaIterator compare_scope_iter; + if (compare_mems_.size() > 0) { + compare_scope_iter.set( + NewMergingIterator(&cfd_->internal_comparator(), &compare_memtables[0], + static_cast(compare_memtables.size()), &arena1)); + compare_iter = compare_scope_iter.get(); + } + std::unique_ptr range_del_iter(NewMergingIterator( &cfd_->internal_comparator(), range_del_iters.empty() ? nullptr : &range_del_iters[0], @@ -308,7 +333,7 @@ Status FlushJob::WriteLevel0Table() { cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */); + Env::IO_HIGH, &table_properties_, 0, compare_iter); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index b06654d7731..1a7c77f5b74 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -104,6 +104,7 @@ class FlushJob { // Variables below are set by PickMemTable(): FileMetaData meta_; autovector mems_; + autovector compare_mems_; VersionEdit* edit_; Version* base_; bool pick_memtable_called; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 90a619a218b..cba9437eac1 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -279,6 +279,33 @@ bool MemTableList::IsFlushPending() const { return false; } +// Returns the memtables that need to be flushed. +void MemTableList::PickMemtablesToFlush(autovector* ret, + autovector* compare_ret) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); + assert(write_buffer_number_to_flush_ > 0); + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_in_progress_) { + assert(!m->flush_completed_); + if (ret->size() < write_buffer_number_to_flush_) { + num_flush_not_started_--; + if (num_flush_not_started_ == 0) { + imm_flush_needed.store(false, std::memory_order_release); + } + m->flush_in_progress_ = true; // flushing will start very soon + ret->push_back(m); + } + else { + compare_ret->push_back(m); + } + } + } + flush_requested_ = false; // start-flush request is complete +} + // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(autovector* ret) { AutoThreadOperationStageUpdater stage_updater( diff --git a/db/memtable_list.h b/db/memtable_list.h index bf20ff1f6a9..74b2f291523 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -158,9 +158,11 @@ class MemTableList { public: // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge, - int max_write_buffer_number_to_maintain) + int max_write_buffer_number_to_maintain, + int write_buffer_number_to_flush = 1) : imm_flush_needed(false), min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + write_buffer_number_to_flush_(write_buffer_number_to_flush), current_(new MemTableListVersion(¤t_memory_usage_, max_write_buffer_number_to_maintain)), num_flush_not_started_(0), @@ -194,6 +196,7 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. + void PickMemtablesToFlush(autovector* mems, autovector* compare_mems); void PickMemtablesToFlush(autovector* mems); // Reset status of the given memtable list back to pending state so that @@ -242,6 +245,8 @@ class MemTableList { const int min_write_buffer_number_to_merge_; + const unsigned int write_buffer_number_to_flush_; + MemTableListVersion* current_; // the number of elements that still need flushing diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 28ea3fd957d..9bf82594efd 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -32,7 +32,8 @@ class VersionBuilderTest : public testing::Test { icmp_(ucmp_), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, options_.num_levels, kCompactionStyleLevel, + vstorage_(&icmp_, ucmp_, options_.num_levels, + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false), file_num_(1) { mutable_cf_options_.RefreshDerivedOptions(ioptions_); @@ -125,7 +126,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -160,7 +162,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -200,7 +203,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, + nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -231,7 +235,7 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) { VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false); version_builder.Apply(&version_edit); version_builder.SaveTo(&new_vstorage); @@ -246,7 +250,7 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) { EnvOptions env_options; VersionBuilder version_builder(env_options, nullptr, &vstorage_); VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, nullptr, false); + kFlushStyleMerge, kCompactionStyleLevel, nullptr, false); VersionEdit version_edit; version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), diff --git a/db/version_set.cc b/db/version_set.cc index ae284c0850f..74746993703 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -884,7 +884,9 @@ void Version::AddRangeDelIteratorsForLevel( VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, - CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage, + FlushStyle flush_style, + CompactionStyle compaction_style, + VersionStorageInfo* ref_vstorage, bool _force_consistency_checks) : internal_comparator_(internal_comparator), user_comparator_(user_comparator), @@ -892,6 +894,7 @@ VersionStorageInfo::VersionStorageInfo( num_levels_(levels), num_non_empty_levels_(0), file_indexer_(user_comparator), + flush_style_(flush_style), compaction_style_(compaction_style), files_(new std::vector[num_levels_]), base_level_(num_levels_ == 1 ? -1 : 1), @@ -939,6 +942,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(), (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(), cfd_ == nullptr ? 0 : cfd_->NumberLevels(), + cfd_ == nullptr ? kFlushStyleMerge + : cfd_->ioptions()->flush_style, cfd_ == nullptr ? kCompactionStyleLevel : cfd_->ioptions()->compaction_style, (cfd_ == nullptr || cfd_->current() == nullptr) diff --git a/db/version_set.h b/db/version_set.h index 7d94d692c1c..cc48254ebed 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -95,6 +95,7 @@ class VersionStorageInfo { public: VersionStorageInfo(const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int num_levels, + FlushStyle flush_style, CompactionStyle compaction_style, VersionStorageInfo* src_vstorage, bool _force_consistency_checks); @@ -371,6 +372,7 @@ class VersionStorageInfo { FileIndexer file_indexer_; Arena arena_; // Used to allocate space for file_levels_ + FlushStyle flush_style_; CompactionStyle compaction_style_; // List of files per level, files in each level are arranged diff --git a/db/version_set_test.cc b/db/version_set_test.cc index c8c8541f702..fee14ec1ab1 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -112,7 +112,8 @@ class VersionStorageInfoTest : public testing::Test { options_(GetOptionsWithNumLevels(6, logger_)), ioptions_(options_), mutable_cf_options_(options_), - vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {} + vstorage_(&icmp_, ucmp_, 6, kFlushStyleMerge, + kCompactionStyleLevel, nullptr, false) {} ~VersionStorageInfoTest() { for (int i = 0; i < vstorage_.num_levels(); i++) { diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index c7ca9270b17..1b97ad5ca5d 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -56,6 +56,15 @@ enum CompactionPri : char { kMinOverlappingRatio = 0x3, }; +// Indicate the way when flushing memtables. +enum FlushStyle : unsigned char { + // Current style. + kFlushStyleMerge = 0x0, + // Recursively delete duplicated/invalid key/value pairs compared to later + // memtables when flushing. + kFlushStyleDedup = 0x1, +}; + struct CompactionOptionsFIFO { // once the total sum of table files reaches this, we will delete the oldest // table file @@ -131,6 +140,11 @@ struct AdvancedColumnFamilyOptions { // individual write buffers. Default: 1 int min_write_buffer_number_to_merge = 1; + // When flush style is dedup, every time when it begins to flush, it flushes + // write_buffer_number_to_flush memtables to l0 while deduping with + // (min_write_buffer_number_to_merge - write_buffer_number_to_flush) later + // memtables. + int write_buffer_number_to_flush = 1; // The total maximum number of write buffers to maintain in memory including // copies of buffers that have already been flushed. Unlike // max_write_buffer_number, this parameter does not affect flushing. @@ -442,6 +456,8 @@ struct AdvancedColumnFamilyOptions { // Default: 256GB uint64_t hard_pending_compaction_bytes_limit = 256 * 1073741824ull; + FlushStyle flush_style = kFlushStyleMerge; + // The compaction style. Default: kCompactionStyleLevel CompactionStyle compaction_style = kCompactionStyleLevel; diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index bee36c47d4e..2faa6aa08bd 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -766,8 +766,12 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_options_statistics_get_string( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number( rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_write_buffer_number_to_flush(rocksdb_options_t*, int); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_flush_style(rocksdb_options_t*, int); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_background_compactions( diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 460828996f3..efe8fe759f2 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -173,8 +173,8 @@ std::string StatisticsImpl::ToString() const { getHistogramImplLocked(h.first)->Data(&hData); snprintf( buffer, kTmpStrBufferSize, - "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n", - h.second.c_str(), hData.median, hData.percentile95, + "%s statistics Percentiles :=> average: %f 50 : %f 95 : %f 99 : %f 100 : %f\n", + h.second.c_str(), hData.average, hData.median, hData.percentile95, hData.percentile99, hData.max); res.append(buffer); } diff --git a/options/cf_options.cc b/options/cf_options.cc index 79e60abb51a..887f83c06a2 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -27,7 +27,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& cf_options) - : compaction_style(cf_options.compaction_style), + : flush_style(cf_options.flush_style), + compaction_style(cf_options.compaction_style), compaction_pri(cf_options.compaction_pri), compaction_options_universal(cf_options.compaction_options_universal), compaction_options_fifo(cf_options.compaction_options_fifo), @@ -39,6 +40,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, compaction_filter_factory(cf_options.compaction_filter_factory.get()), min_write_buffer_number_to_merge( cf_options.min_write_buffer_number_to_merge), + write_buffer_number_to_flush( + cf_options.write_buffer_number_to_flush), max_write_buffer_number_to_maintain( cf_options.max_write_buffer_number_to_maintain), inplace_update_support(cf_options.inplace_update_support), diff --git a/options/cf_options.h b/options/cf_options.h index 397ee5d6f7e..010c7b039b2 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -26,6 +26,8 @@ struct ImmutableCFOptions { ImmutableCFOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& cf_options); + FlushStyle flush_style; + CompactionStyle compaction_style; CompactionPri compaction_pri; @@ -46,6 +48,7 @@ struct ImmutableCFOptions { int min_write_buffer_number_to_merge; + int write_buffer_number_to_flush; int max_write_buffer_number_to_maintain; bool inplace_update_support; diff --git a/options/options.cc b/options/options.cc index 4aaedefda73..7cf5c14a27e 100644 --- a/options/options.cc +++ b/options/options.cc @@ -46,6 +46,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) : max_write_buffer_number(options.max_write_buffer_number), min_write_buffer_number_to_merge( options.min_write_buffer_number_to_merge), + write_buffer_number_to_flush( + options.write_buffer_number_to_flush), max_write_buffer_number_to_maintain( options.max_write_buffer_number_to_maintain), inplace_update_support(options.inplace_update_support), @@ -74,6 +76,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.soft_pending_compaction_bytes_limit), hard_pending_compaction_bytes_limit( options.hard_pending_compaction_bytes_limit), + flush_style(options.flush_style), compaction_style(options.compaction_style), compaction_pri(options.compaction_pri), compaction_options_universal(options.compaction_options_universal), @@ -247,6 +250,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.num_levels: %d", num_levels); ROCKS_LOG_HEADER(log, " Options.min_write_buffer_number_to_merge: %d", min_write_buffer_number_to_merge); + ROCKS_LOG_HEADER(log, " Options.write_buffer_number_to_flush: %d", + write_buffer_number_to_flush); ROCKS_LOG_HEADER(log, " Options.max_write_buffer_number_to_maintain: %d", max_write_buffer_number_to_maintain); ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d", @@ -305,6 +310,19 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d", disable_auto_compactions); + const auto& it_flush_style = + flush_style_to_string.find(flush_style); + std::string str_flush_style; + if (it_flush_style == flush_style_to_string.end()) { + assert(false); + str_flush_style = "unknown_" + std::to_string(flush_style); + } else { + str_flush_style = it_flush_style->second; + } + ROCKS_LOG_HEADER(log, + " Options.flush_style: %s", + str_flush_style.c_str()); + const auto& it_compaction_style = compaction_style_to_string.find(compaction_style); std::string str_compaction_style; @@ -447,6 +465,7 @@ Options::PrepareForBulkLoad() // of flushes. max_write_buffer_number = 6; min_write_buffer_number_to_merge = 1; + write_buffer_number_to_flush = 0; // When compaction is disabled, more parallel flush threads can // help with write throughput. @@ -550,6 +569,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeLevelStyleCompaction( write_buffer_size = static_cast(memtable_memory_budget / 4); // merge two memtables when flushing to L0 min_write_buffer_number_to_merge = 2; + write_buffer_number_to_flush = 0; // this means we'll use 50% extra memory in the worst case, but will reduce // write stalls. max_write_buffer_number = 6; @@ -562,6 +582,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeLevelStyleCompaction( // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast max_bytes_for_level_base = memtable_memory_budget; + flush_style = kFlushStyleMerge; // level style compaction compaction_style = kCompactionStyleLevel; @@ -582,9 +603,12 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeUniversalStyleCompaction( write_buffer_size = static_cast(memtable_memory_budget / 4); // merge two memtables when flushing to L0 min_write_buffer_number_to_merge = 2; + write_buffer_number_to_flush = 1; // this means we'll use 50% extra memory in the worst case, but will reduce // write stalls. max_write_buffer_number = 6; + + flush_style = kFlushStyleMerge; // universal style compaction compaction_style = kCompactionStyleUniversal; compaction_options_universal.compression_size_percent = 80; diff --git a/options/options_helper.cc b/options/options_helper.cc index d799a2c6a69..6854188073d 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -336,6 +336,10 @@ bool ParseOptionHelper(char* opt_address, const OptionType& opt_type, case OptionType::kDouble: *reinterpret_cast(opt_address) = ParseDouble(value); break; + case OptionType::kFlushStyle: + return ParseEnum( + flush_style_string_map, value, + reinterpret_cast(opt_address)); case OptionType::kCompactionStyle: return ParseEnum( compaction_style_string_map, value, @@ -429,6 +433,10 @@ bool SerializeSingleOptionHelper(const char* opt_address, *value = EscapeOptionString( *(reinterpret_cast(opt_address))); break; + case OptionType::kFlushStyle: + return SerializeEnum( + flush_style_string_map, + *(reinterpret_cast(opt_address)), value); case OptionType::kCompactionStyle: return SerializeEnum( compaction_style_string_map, diff --git a/options/options_helper.h b/options/options_helper.h index 747efc30060..1580acbde8a 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -31,6 +31,10 @@ static std::map compaction_style_to_string = { {kCompactionStyleFIFO, "kCompactionStyleFIFO"}, {kCompactionStyleNone, "kCompactionStyleNone"}}; +static std::map flush_style_to_string = { + {kFlushStyleMerge, "kFlushStyleMerge"}, + {kFlushStyleDedup, "kFlushStyleDedup"}}; + static std::map compaction_pri_to_string = { {kByCompensatedSize, "kByCompensatedSize"}, {kOldestLargestSeqFirst, "kOldestLargestSeqFirst"}, @@ -72,6 +76,7 @@ enum class OptionType { kSizeT, kString, kDouble, + kFlushStyle, kCompactionStyle, kCompactionPri, kSliceTransform, @@ -453,6 +458,9 @@ static std::unordered_map cf_options_type_info = { {"min_write_buffer_number_to_merge", {offset_of(&ColumnFamilyOptions::min_write_buffer_number_to_merge), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, + {"write_buffer_number_to_flush", + {offset_of(&ColumnFamilyOptions::write_buffer_number_to_flush), + OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, {"num_levels", {offset_of(&ColumnFamilyOptions::num_levels), OptionType::kInt, OptionVerificationType::kNormal, false, 0}}, @@ -565,6 +573,9 @@ static std::unordered_map cf_options_type_info = { {"compaction_style", {offset_of(&ColumnFamilyOptions::compaction_style), OptionType::kCompactionStyle, OptionVerificationType::kNormal, false, 0}}, + {"flush_style", + {offset_of(&ColumnFamilyOptions::flush_style), + OptionType::kFlushStyle, OptionVerificationType::kNormal, false, 0}}, {"compaction_pri", {offset_of(&ColumnFamilyOptions::compaction_pri), OptionType::kCompactionPri, OptionVerificationType::kNormal, false, 0}}}; @@ -705,6 +716,11 @@ static std::unordered_map {"kCompactionStyleFIFO", kCompactionStyleFIFO}, {"kCompactionStyleNone", kCompactionStyleNone}}; +static std::unordered_map + flush_style_string_map = { + {"kFlushStyleMerge", kFlushStyleMerge}, + {"kFlushStyleDedup", kFlushStyleDedup}}; + static std::unordered_map compaction_pri_string_map = { {"kByCompensatedSize", kByCompensatedSize}, diff --git a/options/options_parser.cc b/options/options_parser.cc index 09f3e1cbfa2..348faf09d46 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -547,6 +547,9 @@ bool AreEqualOptions( case OptionType::kDouble: return AreEqualDoubles(*reinterpret_cast(offset1), *reinterpret_cast(offset2)); + case OptionType::kFlushStyle: + return (*reinterpret_cast(offset1) == + *reinterpret_cast(offset2)); case OptionType::kCompactionStyle: return (*reinterpret_cast(offset1) == *reinterpret_cast(offset2));