From 5a53078b85ba7a0f0dff059b89d7a67bca3e8cca Mon Sep 17 00:00:00 2001 From: Udi Date: Thu, 29 Sep 2022 13:28:30 +0300 Subject: [PATCH] Proactive Flushes (#155) --- db/column_family.h | 18 +- db/db_impl/db_impl.cc | 7 + db/db_impl/db_impl.h | 9 + db/db_impl/db_impl_compaction_flush.cc | 184 +++++- db/db_impl/db_impl_open.cc | 17 + db/db_impl/db_impl_write.cc | 7 + db/db_test2.cc | 20 +- db/db_write_buffer_manager_test.cc | 140 +++-- db/flush_job.cc | 2 + db_stress_tool/db_stress_common.h | 3 + db_stress_tool/db_stress_gflags.cc | 13 + db_stress_tool/db_stress_test_base.cc | 12 +- db_stress_tool/db_stress_tool.cc | 16 +- include/rocksdb/listener.h | 1 + include/rocksdb/write_buffer_manager.h | 166 +++++- memtable/write_buffer_manager.cc | 364 +++++++++++- memtable/write_buffer_manager_test.cc | 787 ++++++++++++++++++++++++- tools/db_bench_tool.cc | 54 +- tools/db_crashtest.py | 6 + 19 files changed, 1743 insertions(+), 83 deletions(-) diff --git a/db/column_family.h b/db/column_family.h index 5b4f6d630b..8ac4dca332 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -457,7 +457,14 @@ class ColumnFamilyData { void ResetThreadLocalSuperVersions(); // Protected by DB mutex - void set_queued_for_flush(bool value) { queued_for_flush_ = value; } + void set_queued_for_flush(bool value) { + queued_for_flush_ = value; + + if (value) { + ++num_queued_for_flush_; + } + } + void set_queued_for_compaction(bool value) { queued_for_compaction_ = value; } bool queued_for_flush() { return queued_for_flush_; } bool queued_for_compaction() { return queued_for_compaction_; } @@ -538,6 +545,11 @@ class ColumnFamilyData { // Keep track of whether the mempurge feature was ever used. void SetMempurgeUsed() { mempurge_used_ = true; } bool GetMempurgeUsed() { return mempurge_used_; } + uint64_t GetNumQueuedForFlush() const { return num_queued_for_flush_; } + + // TODO - Make it a CF option + static constexpr uint64_t kLaggingFlushesThreshold = 10U; + void SetNumTimedQueuedForFlush(uint64_t num) { num_queued_for_flush_ = num; } // Allocate and return a new epoch number uint64_t NewEpochNumber() { return next_epoch_number_.fetch_add(1); } @@ -657,6 +669,10 @@ class ColumnFamilyData { std::shared_ptr file_metadata_cache_res_mgr_; bool mempurge_used_; + // Used in the WBM's flush initiation heuristics. + // See DBImpl::InitiateMemoryManagerFlushRequest() for more details + uint64_t num_queued_for_flush_ = 0U; + std::atomic next_epoch_number_; }; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 6bf4d350fd..d26027a025 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -544,6 +544,13 @@ Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() { } Status DBImpl::CloseHelper() { + if (is_registered_for_flush_initiation_rqsts_) { + assert(write_buffer_manager_); + assert(write_buffer_manager_->IsInitiatingFlushes()); + write_buffer_manager_->DeregisterFlushInitiator(this); + is_registered_for_flush_initiation_rqsts_ = false; + } + // Guarantee that there is no background error recovery in progress before // continuing with the shutdown mutex_.Lock(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 226772bdcd..44f3a5dc79 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -438,6 +438,13 @@ class DBImpl : public DB { virtual Status LockWAL() override; virtual Status UnlockWAL() override; + // flush initiated by the write buffer manager to free some space + bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush); + bool InitiateMemoryManagerFlushRequestAtomicFlush( + size_t min_size_to_flush, const FlushOptions& flush_options); + bool InitiateMemoryManagerFlushRequestNonAtomicFlush( + size_t min_size_to_flush, const FlushOptions& flush_options); + virtual SequenceNumber GetLatestSequenceNumber() const override; // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire @@ -2696,6 +2703,8 @@ class DBImpl : public DB { // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; + bool is_registered_for_flush_initiation_rqsts_ = false; + // Stop write token that is acquired when first LockWAL() is called. // Destroyed when last UnlockWAL() is called. Controlled by DB mutex. // See lock_wal_count_ diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 9957819b69..95f13ee5cd 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include +#include #include "db/builder.h" #include "db/db_impl/db_impl.h" @@ -2929,6 +2930,12 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, bg_job_limits.max_compactions, bg_flush_scheduled_, bg_compaction_scheduled_); } + *reason = bg_flush_args[0].flush_reason_; + if (write_buffer_manager_) { + write_buffer_manager_->FlushStarted( + *reason == FlushReason::kWriteBufferManagerInitiated); + } + status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer, thread_pri); TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); @@ -2939,7 +2946,6 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_); } #endif /* !NDEBUG */ - *reason = bg_flush_args[0].flush_reason_; for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->UnrefAndTryDelete()) { @@ -3024,6 +3030,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { assert(num_running_flushes_ > 0); num_running_flushes_--; bg_flush_scheduled_--; + if (write_buffer_manager_) { + write_buffer_manager_->FlushEnded( + reason == FlushReason::kWriteBufferManagerInitiated); + } // See if there's more work to be done MaybeScheduleFlushOrCompaction(); atomic_flush_install_cv_.SignalAll(); @@ -3937,4 +3947,176 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) { return error_handler_.GetBGError(); } +bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) { + if (shutdown_initiated_) { + return false; + } + + FlushOptions flush_options; + flush_options.allow_write_stall = true; + flush_options.wait = false; + + if (immutable_db_options_.atomic_flush) { + return InitiateMemoryManagerFlushRequestAtomicFlush(min_size_to_flush, + flush_options); + } else { + return InitiateMemoryManagerFlushRequestNonAtomicFlush(min_size_to_flush, + flush_options); + } +} + +bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush( + size_t min_size_to_flush, const FlushOptions& flush_options) { + assert(immutable_db_options_.atomic_flush); + + autovector cfds; + { + InstrumentedMutexLock lock(&mutex_); + + SelectColumnFamiliesForAtomicFlush(&cfds); + if (cfds.empty()) { + return false; + } + + // min_size_to_flush may be 0. + // Since proactive flushes are active only once recovery is complete => + // SelectColumnFamiliesForAtomicFlush() will keep cf-s in cfds collection + // only if they have a non-empty mutable memtable or any immutable memtable + // => skip the checks and just flush the selected cf-s. + if (min_size_to_flush > 0) { + size_t total_size_to_flush = 0U; + for (const auto& cfd : cfds) { + // Once at least one CF has immutable memtables, we will flush + if (cfd->imm()->NumNotFlushed() > 0) { + // Guarantee a atomic flush will occur + total_size_to_flush = min_size_to_flush; + break; + } else if (cfd->mem()->IsEmpty() == false) { + total_size_to_flush += cfd->mem()->ApproximateMemoryUsage(); + } + } + if (total_size_to_flush < min_size_to_flush) { + return false; + } + } + } + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "write buffer manager initiated Atomic flush started current " + "usage %lu out of %lu", + cfds.front()->write_buffer_mgr()->memory_usage(), + cfds.front()->write_buffer_mgr()->buffer_size()); + + TEST_SYNC_POINT( + "DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush::BeforeFlush"); + auto s = AtomicFlushMemTables( + flush_options, FlushReason::kWriteBufferManagerInitiated, cfds); + + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "write buffer manager initiated Atomic flush finished, status: %s", + s.ToString().c_str()); + return s.ok(); +} + +bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush( + size_t min_size_to_flush, const FlushOptions& flush_options) { + assert(immutable_db_options_.atomic_flush == false); + + // Pick the "oldest" CF that meets one of the following: + // 1. Has at least one IMMUTABLE memtable (=> already has a memtable that + // should be flushed); Or + // 2. Has a MUTABLE memtable > min size to flush + // + // However, care must be taken to avoid starving a CF which has data to flush + // (=> and associated WAL) but, to which there is not much writing. So, in + // case we find such a CF that is lagging enough in the number of flushes it + // has undergone, relative to the cf picked originally, we will pick it + // instead, regardless of its mutable memtable size. + + // The CF picked based on min min_size_to_flush + ColumnFamilyData* orig_cfd_to_flush = nullptr; + // The cf to actually flush (possibly == orig_cfd_to_flush) + ColumnFamilyData* cfd_to_flush = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; + + { + InstrumentedMutexLock lock(&mutex_); + + // First pick the oldest CF with data to flush that meets + // the min_size_to_flush condition + for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if ((cfd->imm()->NumNotFlushed() != 0) || + ((cfd->mem()->IsEmpty() == false) && + (cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) { + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_to_flush == nullptr || seq < seq_num_for_cf_picked) { + cfd_to_flush = cfd; + seq_num_for_cf_picked = seq; + } + } + } + + if (cfd_to_flush == nullptr) { + return false; + } + + orig_cfd_to_flush = cfd_to_flush; + + // A CF was picked. Now see if it should be replaced with a lagging CF + for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (cfd == orig_cfd_to_flush) { + continue; + } + + if ((cfd->imm()->NumNotFlushed() != 0) || + (cfd->mem()->IsEmpty() == false)) { + // The first lagging CF is picked. There may be another lagging CF that + // is older, however, that will be fixed the next time we evaluate. + if (cfd->GetNumQueuedForFlush() + + ColumnFamilyData::kLaggingFlushesThreshold < + orig_cfd_to_flush->GetNumQueuedForFlush()) { + // Fix its counter so it is considered lagging again only when + // it is indeed lagging behind + cfd->SetNumTimedQueuedForFlush( + orig_cfd_to_flush->GetNumQueuedForFlush() - 1); + cfd_to_flush = cfd; + break; + } + } + } + + autovector cfds{cfd_to_flush}; + MaybeFlushStatsCF(&cfds); + } + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] write buffer manager initiated flush " + "started current " + "usage %lu out of %lu, min-size:%lu, seq:%lu, " + "num-flushes:%lu, orig-cf:%s num-flushes:%lu", + cfd_to_flush->GetName().c_str(), + cfd_to_flush->write_buffer_mgr()->memory_usage(), + cfd_to_flush->write_buffer_mgr()->buffer_size(), + min_size_to_flush, seq_num_for_cf_picked, + cfd_to_flush->GetNumQueuedForFlush(), + orig_cfd_to_flush->GetName().c_str(), + orig_cfd_to_flush->GetNumQueuedForFlush()); + + TEST_SYNC_POINT( + "DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush"); + auto s = FlushMemTable(cfd_to_flush, flush_options, + FlushReason::kWriteBufferManagerInitiated); + + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] write buffer manager initialize flush finished, status: %s\n", + cfd_to_flush->GetName().c_str(), s.ToString().c_str()); + + return s.ok(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 94f36e8629..1967111e79 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -2089,6 +2089,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, delete impl; *dbptr = nullptr; } + + if (s.ok()) { + auto wbm = db_options.write_buffer_manager.get(); + auto db_impl = static_cast(*dbptr); + + if (wbm && wbm->IsInitiatingFlushes()) { + // Registering regardless of wbm->enabled() since the buffer size may be + // set later making the WBM enabled, but we will not re-register again + // However, notifications will only be received when the wbm is enabled + auto cb = [db_impl](size_t min_size_to_flush) { + return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush); + }; + wbm->RegisterFlushInitiator(db_impl, cb); + db_impl->is_registered_for_flush_initiation_rqsts_ = true; + } + } + return s; } } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 89a054e4c0..781f69f26d 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1901,6 +1901,9 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue void DBImpl::WriteBufferManagerStallWrites() { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Write-Buffer-Manager Stalls Writes"); + mutex_.AssertHeld(); // First block future writer threads who want to add themselves to the queue // of WriteThread. @@ -1915,7 +1918,11 @@ void DBImpl::WriteBufferManagerStallWrites() { write_buffer_manager_->BeginWriteStall(wbm_stall_.get()); wbm_stall_->Block(); + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Write-Buffer-Manager Stall Writes END"); + mutex_.Lock(); + // Stall has ended. Signal writer threads so that they can add // themselves to the WriteThread queue for writes. write_thread_.EndWriteStall(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 0be8aaccfb..787fcaa9c2 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -303,6 +303,14 @@ class DBTestSharedWriteBufferAcrossCFs }; TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { + // When using the old interface (configuring options.db_write_buffer_size + // rather than creating a WBM and setting options.write_buffer_manager, the + // WBM is created automatically by rocksdb and initiate_flushes is set to true + // (the default)). This test fails in that case. + if (use_old_interface_) { + return; + } + Options options = CurrentOptions(); options.arena_block_size = 4096; auto flush_listener = std::make_shared(); @@ -333,9 +341,13 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { if (use_old_interface_) { options.db_write_buffer_size = 120000; // this is the real limit } else if (!cost_cache_) { - options.write_buffer_manager.reset(new WriteBufferManager(114285)); + options.write_buffer_manager.reset( + new WriteBufferManager(114285, {}, WriteBufferManager::kDfltAllowStall, + false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset(new WriteBufferManager(114285, cache)); + options.write_buffer_manager.reset(new WriteBufferManager( + 114285, cache, WriteBufferManager::kDfltAllowStall, + false /* initiate_flushes */)); } options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); @@ -514,7 +526,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { options.write_buffer_size = 500000; // this is never hit // Use a write buffer total size so that the soft limit is about // 105000. - options.write_buffer_manager.reset(new WriteBufferManager(120000)); + options.write_buffer_manager.reset(new WriteBufferManager( + 120000, {} /* cache */, WriteBufferManager::kDfltAllowStall, + false /* initiate_flushes */)); CreateAndReopenWithCF({"cf1", "cf2"}, options); ASSERT_OK(DestroyDB(dbname2, options)); diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index 2942445471..cb4f9a2127 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -7,6 +7,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + #include "db/db_test_util.h" #include "db/write_thread.h" #include "port/stack_trace.h" @@ -14,10 +17,12 @@ namespace ROCKSDB_NAMESPACE { class DBWriteBufferManagerTest : public DBTestBase, - public testing::WithParamInterface { + public ::testing::WithParamInterface { public: DBWriteBufferManagerTest() : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } bool cost_cache_; }; @@ -27,14 +32,13 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } WriteOptions wo; @@ -70,14 +74,13 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } WriteOptions wo; wo.disableWAL = true; @@ -197,14 +200,13 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } CreateAndReopenWithCF({"cf1", "cf2"}, options); @@ -314,14 +316,13 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } CreateAndReopenWithCF({"cf1", "cf2"}, options); @@ -456,14 +457,13 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } WriteOptions wo; wo.disableWAL = true; @@ -618,14 +618,13 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, cache, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, cache, true, false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(100000, nullptr, true)); + options.write_buffer_manager.reset(new WriteBufferManager( + 100000, nullptr, true, false /* initiate_flushes */)); } CreateAndReopenWithCF({"cf1", "cf2"}, options); @@ -801,11 +800,12 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) { cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset(new WriteBufferManager( - 512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */)); + 512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */, + false /* initiate_flushes */)); } else { - options.write_buffer_manager.reset( - new WriteBufferManager(512 << 10 /* buffer_size (512KB) */, - nullptr /* cache */, false /* allow_stall */)); + options.write_buffer_manager.reset(new WriteBufferManager( + 512 << 10 /* buffer_size (512KB) */, nullptr /* cache */, + false /* allow_stall */, false /* initiate_flushes */)); } Reopen(options); @@ -846,9 +846,79 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) { delete shared_wbm_db; } +class DBWriteBufferManagerTest1 : public DBTestBase, + public ::testing::WithParamInterface { + public: + DBWriteBufferManagerTest1() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } + bool cost_cache_; +}; +// =============================================================================================================== +class DBWriteBufferManagerFlushTests + : public DBTestBase, + public ::testing::WithParamInterface { + public: + DBWriteBufferManagerFlushTests() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } + bool cost_cache_; +}; + +TEST_P(DBWriteBufferManagerFlushTests, DISABLED_WbmFlushesSingleDBSingleCf) { + constexpr size_t kQuota = 100 * 1000; + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = kQuota; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + + auto allow_stall_ = false; + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, cache, allow_stall_, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, nullptr, allow_stall_, true)); + } + auto* wbm = options.write_buffer_manager.get(); + size_t flush_step_size = + kQuota / wbm->GetFlushInitiationOptions().max_num_parallel_flushes; + + WriteOptions wo; + wo.disableWAL = true; + + DestroyAndReopen(options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush", + "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::" + "Flushing"}}); + + // Reach the flush step by writing to two cf-s, no flush + ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo)); + ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo)); + + TEST_SYNC_POINT( + "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::Flushing"); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1, + ::testing::Bool()); + +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerFlushTests, + DBWriteBufferManagerFlushTests, + ::testing::Values(false)); } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job.cc b/db/flush_job.cc index 9d70a9b847..fabbf31011 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -77,6 +77,8 @@ const char* GetFlushReasonString(FlushReason flush_reason) { return "Error Recovery"; case FlushReason::kWalFull: return "WAL Full"; + case FlushReason::kWriteBufferManagerInitiated: + return "Write Buffer Manager Initiated"; default: return "Invalid"; } diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 685393db9e..6eb38f2feb 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -58,6 +58,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "test_util/testutil.h" #include "util/coding.h" #include "util/compression.h" @@ -102,6 +103,8 @@ DECLARE_bool(verbose); DECLARE_bool(progress_reports); DECLARE_uint64(db_write_buffer_size); DECLARE_bool(allow_wbm_stalls); +DECLARE_bool(initiate_wbm_flushes); +DECLARE_uint32(max_num_parallel_flushes); DECLARE_int32(write_buffer_size); DECLARE_int32(max_write_buffer_number); DECLARE_int32(min_write_buffer_number_to_merge); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 60a642f05d..44acfd309c 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -137,6 +137,19 @@ DEFINE_uint64(db_write_buffer_size, DEFINE_bool(allow_wbm_stalls, false, "Enable WBM write stalls and delays"); +DEFINE_bool(initiate_wbm_flushes, false, + "WBM will proactively initiate flushes (Speedb)." + "If false, WBM-related flushes will be initiated using the " + "ShouldFlush() service " + "of the WBM."); + +DEFINE_uint32(max_num_parallel_flushes, + ROCKSDB_NAMESPACE::WriteBufferManager::FlushInitiationOptions:: + kDfltMaxNumParallelFlushes, + "In case FLAGGS_initiate_wbm_flushes is true, this flag will " + "overwrite the default " + "max number of parallel flushes."); + DEFINE_int32( write_buffer_size, static_cast(ROCKSDB_NAMESPACE::Options().write_buffer_size), diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 4abe6fa90c..50f86be466 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2392,6 +2392,10 @@ void StressTest::PrintEnv() const { fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent); fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n", FLAGS_db_write_buffer_size); + fprintf(stdout, "Allow WBM Stalls : %s\n", + FLAGS_allow_wbm_stalls ? "true" : "false"); + fprintf(stdout, "Initiate WBM Flushes : %s\n", + FLAGS_initiate_wbm_flushes ? "true" : "false"); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Iterations : %lu\n", (unsigned long)FLAGS_num_iterations); @@ -3051,8 +3055,14 @@ void InitializeOptionsFromFlags( FLAGS_num_file_reads_for_auto_readahead; options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); if (FLAGS_db_write_buffer_size > 0) { + WriteBufferManager::FlushInitiationOptions flush_initiation_options; + if (FLAGS_max_num_parallel_flushes > 0U) { + flush_initiation_options.max_num_parallel_flushes = + FLAGS_max_num_parallel_flushes; + } options.write_buffer_manager.reset(new WriteBufferManager( - FLAGS_db_write_buffer_size, {} /* cache */, FLAGS_allow_wbm_stalls)); + FLAGS_db_write_buffer_size, {} /* cache */, FLAGS_allow_wbm_stalls, + FLAGS_initiate_wbm_flushes, flush_initiation_options)); } options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 9f1e1b083b..773fd84978 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -356,10 +356,18 @@ int db_stress_tool(int argc, char** argv) { } std::unique_ptr shared; - if ((FLAGS_db_write_buffer_size == 0) && FLAGS_allow_wbm_stalls) { - fprintf(stderr, - "-allow_wbm_stalls is useless if db_write_buffer_size == 0\n"); - exit(1); + if (FLAGS_db_write_buffer_size == 0) { + if (FLAGS_allow_wbm_stalls) { + fprintf(stderr, + "-allow_wbm_stalls is useless if db_write_buffer_size == 0\n"); + exit(1); + } + if (FLAGS_initiate_wbm_flushes) { + fprintf( + stderr, + "-initiate_wbm_flushes is useless if db_write_buffer_size == 0\n"); + exit(1); + } } std::unique_ptr stress; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 87bc678693..3c8793e069 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -178,6 +178,7 @@ enum class FlushReason : int { // will not be called to avoid many small immutable memtables. kErrorRecoveryRetryFlush = 0xc, kWalFull = 0xd, + kWriteBufferManagerInitiated = 0xe, }; // TODO: In the future, BackgroundErrorReason will only be used to indicate diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 89296d001b..4a2166b0d3 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -16,12 +16,20 @@ #include #include #include +#include #include +#include +#include +#include +#include #include "rocksdb/cache.h" namespace ROCKSDB_NAMESPACE { +struct Options; class CacheReservationManager; +class InstrumentedMutex; +class InstrumentedCondVar; // Interface to block and signal DB instances, intended for RocksDB // internal use only. Each DB instance contains ptr to StallInterface. @@ -35,6 +43,22 @@ class StallInterface { }; class WriteBufferManager final { + public: + // TODO: Need to find an alternative name as it is misleading + // we start flushes in kStartFlushPercentThreshold / number of parallel + // flushes + static constexpr uint64_t kStartFlushPercentThreshold = 80U; + + struct FlushInitiationOptions { + static constexpr size_t kDfltMaxNumParallelFlushes = 4U; + + FlushInitiationOptions() {} + size_t max_num_parallel_flushes = kDfltMaxNumParallelFlushes; + }; + + static constexpr bool kDfltAllowStall = false; + static constexpr bool kDfltInitiateFlushes = true; + public: // Parameters: // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped. @@ -47,9 +71,20 @@ class WriteBufferManager final { // allow_stall: if set true, it will enable stalling of writes when // memory_usage() exceeds buffer_size. It will wait for flush to complete and // memory usage to drop down. - explicit WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache = {}, - bool allow_stall = false); + // + // initiate_flushes: if set true, the WBM will proactively request registered + // DB-s to flush. The mechanism is based on initiating an increasing number of + // flushes as the memory usage increases. If set false, WBM clients need to + // call ShouldFlush() and the WBM will indicate if current memory usage merits + // a flush. Currently the ShouldFlush() mechanism is used only in the + // write-path of a DB. + explicit WriteBufferManager( + size_t _buffer_size, std::shared_ptr cache = {}, + bool allow_stall = kDfltAllowStall, + bool initiate_flushes = kDfltInitiateFlushes, + const FlushInitiationOptions& flush_initiation_options = + FlushInitiationOptions()); + // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; @@ -108,13 +143,18 @@ class WriteBufferManager final { // Check if stall is active and can be ended. MaybeEndWriteStall(); + if (enabled()) { + if (initiate_flushes_) { + InitFlushInitiationVars(new_size); + } + } } // Below functions should be called by RocksDB internally. // Should only be called from write thread bool ShouldFlush() const { - if (enabled()) { + if ((initiate_flushes_ == false) && enabled()) { if (mutable_memtable_memory_usage() > mutable_limit_.load(std::memory_order_relaxed)) { return true; @@ -187,6 +227,32 @@ class WriteBufferManager final { std::string GetPrintableOptions() const; + public: + bool IsInitiatingFlushes() const { return initiate_flushes_; } + const FlushInitiationOptions& GetFlushInitiationOptions() const { + return flush_initiation_options_; + } + + public: + using InitiateFlushRequestCb = std::function; + + void RegisterFlushInitiator(void* initiator, InitiateFlushRequestCb request); + void DeregisterFlushInitiator(void* initiator); + + void FlushStarted(bool wbm_initiated); + void FlushEnded(bool wbm_initiated); + + public: + size_t TEST_GetNumFlushesToInitiate() const { + return num_flushes_to_initiate_; + } + size_t TEST_GetNumRunningFlushes() const { return num_running_flushes_; } + size_t TEST_GetNextCandidateInitiatorIdx() const { + return next_candidate_initiator_idx_; + } + + void TEST_WakeupFlushInitiationThread(); + private: std::atomic buffer_size_; std::atomic mutable_limit_; @@ -207,7 +273,95 @@ class WriteBufferManager final { // while holding mu_, but it can be read without a lock. std::atomic stall_active_; - void ReserveMemWithCache(size_t mem); - void FreeMemWithCache(size_t mem); + // Return the new memory usage + size_t ReserveMemWithCache(size_t mem); + size_t FreeMemWithCache(size_t mem); + + private: + struct InitiatorInfo { + void* initiator = nullptr; + InitiateFlushRequestCb cb; + }; + + static constexpr uint64_t kInvalidInitiatorIdx = + std::numeric_limits::max(); + + private: + void InitFlushInitiationVars(size_t quota); + void InitiateFlushesThread(); + bool InitiateAdditionalFlush(); + void WakeUpFlushesThread(); + void TerminateFlushesThread(); + void RecalcFlushInitiationSize(); + void ReevaluateNeedForMoreFlushesNoLockHeld(size_t curr_memory_used); + void ReevaluateNeedForMoreFlushesLockHeld(size_t curr_memory_used); + uint64_t FindInitiator(void* initiator) const; + + void WakeupFlushInitiationThreadNoLockHeld(); + void WakeupFlushInitiationThreadLockHeld(); + + // Heuristic to decide if another flush is needed taking into account + // only memory issues (ignoring number of flushes issues). + // May be called NOT under the flushes_mu_ lock + // + // NOTE: Memory is not necessarily freed at the end of a flush for various + // reasons. For now, the memory is considered dirty until it is actually + // freed. For that reason we do NOT initiate another flush immediatley once a + // flush ends, we wait until the total unflushed memory (curr_memory_used - + // memory_being_freed_) exceeds a threshold. + bool ShouldInitiateAnotherFlushMemOnly(size_t curr_memory_used) const { + return (curr_memory_used - memory_being_freed_ >= + additional_flush_step_size_ / 2 && + curr_memory_used >= additional_flush_initiation_size_); + } + + // This should be called only under the flushes_mu_ lock + bool ShouldInitiateAnotherFlush(size_t curr_memory_used) const { + return (((num_running_flushes_ + num_flushes_to_initiate_) < + flush_initiation_options_.max_num_parallel_flushes) && + ShouldInitiateAnotherFlushMemOnly(curr_memory_used)); + } + + void UpdateNextCandidateInitiatorIdx(); + bool IsInitiatorIdxValid(uint64_t initiator_idx) const; + + private: + // Flush Initiation Mechanism Data Members + + const bool initiate_flushes_ = false; + const FlushInitiationOptions flush_initiation_options_ = + FlushInitiationOptions(); + + // Collection of registered initiators + std::vector flush_initiators_; + // Round-robin index of the next candidate flushes initiator + uint64_t next_candidate_initiator_idx_ = kInvalidInitiatorIdx; + + // Number of flushes actually running (regardless of who initiated them) + std::atomic num_running_flushes_ = 0U; + // Number of additional flushes to initiate the mechanism deems necessary + std::atomic num_flushes_to_initiate_ = 0U; + // Threshold (bytes) from which to start initiating flushes + size_t flush_initiation_start_size_ = 0U; + size_t additional_flush_step_size_ = 0U; + std::atomic additional_flush_initiation_size_ = 0U; + // Min estimated size (in bytes) of the mutable memtable(s) for an initiator + // to start a flush when requested + size_t min_mutable_flush_size_ = 0U; + + // Trying to include instumented_mutex.h results in a compilation error + // so only forward declaration + unique_ptr instead of having a member by + // value + std::unique_ptr flushes_mu_; + std::unique_ptr flushes_initiators_mu_; + // Used to wake up the flushes initiation thread when it has work to do + std::unique_ptr flushes_wakeup_cv_; + // Allows the flush initiation thread to wake up only when there is truly + // reason to wakeup. See the thread's code for more details + bool new_flushes_wakeup_ = false; + + std::thread flushes_thread_; + bool terminate_flushes_thread_ = false; }; + } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index b0ee832aa5..88cb6a04af 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -9,18 +9,22 @@ #include "rocksdb/write_buffer_manager.h" +#include #include #include "cache/cache_entry_roles.h" #include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" +#include "monitoring/instrumented_mutex.h" #include "rocksdb/status.h" +#include "test_util/sync_point.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { -WriteBufferManager::WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache, - bool allow_stall) +WriteBufferManager::WriteBufferManager( + size_t _buffer_size, std::shared_ptr cache, bool allow_stall, + bool initiate_flushes, + const FlushInitiationOptions& flush_initiation_options) : buffer_size_(_buffer_size), mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), @@ -28,7 +32,12 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, memory_being_freed_(0U), cache_res_mgr_(nullptr), allow_stall_(allow_stall), - stall_active_(false) { + stall_active_(false), + initiate_flushes_(initiate_flushes), + flush_initiation_options_(flush_initiation_options), + flushes_mu_(new InstrumentedMutex), + flushes_initiators_mu_(new InstrumentedMutex), + flushes_wakeup_cv_(new InstrumentedCondVar(flushes_mu_.get())) { if (cache) { // Memtable's memory usage tends to fluctuate frequently // therefore we set delayed_decrease = true to save some dummy entry @@ -37,6 +46,9 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, CacheReservationManagerImpl>( cache, true /* delayed_decrease */); } + if (initiate_flushes_) { + InitFlushInitiationVars(buffer_size()); + } } WriteBufferManager::~WriteBufferManager() { @@ -44,6 +56,7 @@ WriteBufferManager::~WriteBufferManager() { std::unique_lock lock(mu_); assert(queue_.empty()); #endif + TerminateFlushesThread(); } std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { @@ -55,15 +68,27 @@ std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { } void WriteBufferManager::ReserveMem(size_t mem) { + auto is_enabled = enabled(); + size_t new_memory_used = 0U; + if (cache_res_mgr_ != nullptr) { - ReserveMemWithCache(mem); - } else if (enabled()) { - memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = ReserveMemWithCache(mem); + } else if (is_enabled) { + auto old_memory_used = + memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = old_memory_used + mem; + } + if (is_enabled) { + // Checking outside the locks is not reliable, but avoids locking + // unnecessarily which is expensive + if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) { + ReevaluateNeedForMoreFlushesNoLockHeld(new_memory_used); + } } } // Should only be called from write thread -void WriteBufferManager::ReserveMemWithCache(size_t mem) { +size_t WriteBufferManager::ReserveMemWithCache(size_t mem) { assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. @@ -79,6 +104,8 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; } void WriteBufferManager::ScheduleFreeMem(size_t mem) { @@ -105,13 +132,15 @@ void WriteBufferManager::FreeMemAborted(size_t mem) { void WriteBufferManager::FreeMem(size_t mem) { const auto is_enabled = enabled(); + size_t new_memory_used = 0U; if (cache_res_mgr_ != nullptr) { - FreeMemWithCache(mem); + new_memory_used = FreeMemWithCache(mem); } else if (is_enabled) { - [[maybe_unused]] const auto curr_memory_used = + auto old_memory_used = memory_used_.fetch_sub(mem, std::memory_order_relaxed); - assert(curr_memory_used >= mem); + assert(old_memory_used >= mem); + new_memory_used = old_memory_used - mem; } if (is_enabled) { @@ -126,17 +155,25 @@ void WriteBufferManager::FreeMem(size_t mem) { // Check if stall is active and can be ended. MaybeEndWriteStall(); + + if (is_enabled) { + // Checking outside the locks is not reliable, but avoids locking + // unnecessarily which is expensive + if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) { + ReevaluateNeedForMoreFlushesNoLockHeld(new_memory_used); + } + } } -void WriteBufferManager::FreeMemWithCache(size_t mem) { +size_t WriteBufferManager::FreeMemWithCache(size_t mem) { assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. std::lock_guard lock(cache_res_mgr_mu_); - const auto curr_memory_used = memory_used_.load(std::memory_order_relaxed); - assert(curr_memory_used >= mem); - size_t new_mem_used = curr_memory_used - mem; + const auto old_mem_used = memory_used_.load(std::memory_order_relaxed); + assert(old_mem_used >= mem); + size_t new_mem_used = old_mem_used - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); @@ -145,6 +182,8 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; } void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { @@ -235,7 +274,302 @@ std::string WriteBufferManager::GetPrintableOptions() const { allow_stall_); ret.append(buffer); + snprintf(buffer, kBufferSize, "%*s: %d\n", field_width, + "wbm.initiate_flushes", IsInitiatingFlushes()); + ret.append(buffer); + return ret; } +// ================================================================================================ +void WriteBufferManager::RegisterFlushInitiator( + void* initiator, InitiateFlushRequestCb request) { + { + InstrumentedMutexLock lock(flushes_initiators_mu_.get()); + assert(FindInitiator(initiator) == kInvalidInitiatorIdx); + + flush_initiators_.push_back({initiator, request}); + if (flush_initiators_.size() == 1) { + assert(next_candidate_initiator_idx_ == kInvalidInitiatorIdx); + next_candidate_initiator_idx_ = 0U; + } + + assert(next_candidate_initiator_idx_ < flush_initiators_.size()); + } + + // flushes_initiators_mu_ is held but not flushes_mu_ + WakeupFlushInitiationThreadNoLockHeld(); +} + +void WriteBufferManager::DeregisterFlushInitiator(void* initiator) { + InstrumentedMutexLock lock(flushes_initiators_mu_.get()); + auto initiator_idx = FindInitiator(initiator); + assert(IsInitiatorIdxValid(initiator_idx)); + + flush_initiators_.erase(flush_initiators_.begin() + initiator_idx); + + // If the deregistered initiator was the next candidate and also the last + // one, update the next candidate (possibly none left) + assert(next_candidate_initiator_idx_ != kInvalidInitiatorIdx); + if (next_candidate_initiator_idx_ >= flush_initiators_.size()) { + UpdateNextCandidateInitiatorIdx(); + } + + // No need to wake up the flush initiation thread +} + +void WriteBufferManager::InitFlushInitiationVars(size_t quota) { + assert(initiate_flushes_); + + { + InstrumentedMutexLock lock(flushes_mu_.get()); + additional_flush_step_size_ = + quota * kStartFlushPercentThreshold / 100 / + flush_initiation_options_.max_num_parallel_flushes; + flush_initiation_start_size_ = additional_flush_step_size_; + min_mutable_flush_size_ = std::min( + quota / (2 * flush_initiation_options_.max_num_parallel_flushes), + 64 * (1 << 20)); + RecalcFlushInitiationSize(); + } + + if (flushes_thread_.joinable() == false) { + flushes_thread_ = + std::thread(&WriteBufferManager::InitiateFlushesThread, this); + } +} + +void WriteBufferManager::InitiateFlushesThread() { + while (true) { + // Should return true when the waiting should stop (no spurious wakeups + // guaranteed) + auto StopWaiting = [this]() { + return (new_flushes_wakeup_ && + (terminate_flushes_thread_ || (num_flushes_to_initiate_ > 0U))); + }; + + InstrumentedMutexLock lock(flushes_mu_.get()); + while (StopWaiting() == false) { + flushes_wakeup_cv_->Wait(); + } + + new_flushes_wakeup_ = false; + + if (terminate_flushes_thread_) { + break; + } + + // The code below tries to initiate num_flushes_to_initiate_ flushes by + // invoking its registered initiators, and requesting them to initiate a + // flush of a certain minimum size. The initiation is done in iterations. An + // iteration is an attempt to give evey initiator an opportunity to flush, + // in a round-robin ordering. An initiator may or may not be able to + // initiate a flush. Reasons for not initiating could be: + // - The flush is less than the specified minimum size. + // - The initiator is in the process of shutting down or being disposed of. + // + // The assumption is that in case flush initiation stopped when + // num_flushes_to_initiate_ == 0, there will be some future event that will + // wake up this thread and initiation attempts will be retried: + // - Initiator will be enabled + // - A flush in progress will end + // - The memory_used() will increase above additional_flush_initiation_size_ + + // Two iterations: + // 1. Flushes of a min size. + // 2. Flushes of any size + constexpr size_t kNumIters = 2U; + const std::array kMinFlushSizes{min_mutable_flush_size_, + 0U}; + + auto iter = 0U; + while ((iter < kMinFlushSizes.size()) && (num_flushes_to_initiate_ > 0U)) { + auto num_repeated_failures_to_initiate = 0U; + while (num_flushes_to_initiate_ > 0U) { + bool was_flush_initiated = false; + { + // Unlocking the flushed_mu_ since flushing (via the initiator cb) may + // call a WBM service (e.g., ReserveMem()), that, in turn, needs to + // flushes_mu_lock the same mutex => will get stuck + InstrumentedMutexUnlock flushes_mu_unlocker(flushes_mu_.get()); + + InstrumentedMutexLock initiators_lock(flushes_initiators_mu_.get()); + // Once we are under the flushes_initiators_mu_ lock, we may check: + // 1. Has the last initiator deregistered? + // 2. Have all existing initiators failed to initiate a flush? + if (flush_initiators_.empty() || + (num_repeated_failures_to_initiate >= flush_initiators_.size())) { + break; + } + assert(IsInitiatorIdxValid(next_candidate_initiator_idx_)); + auto& initiator = flush_initiators_[next_candidate_initiator_idx_]; + UpdateNextCandidateInitiatorIdx(); + + // Assuming initiator would flush (flushes_mu_lock is unlocked and + // called initiator may call another method that relies on these + // counters) Not recalculating flush initiation size since the + // increment & decrement cancel each other with respect to the recalc + ++num_running_flushes_; + --num_flushes_to_initiate_; + + // TODO: Use a weak-pointer for the registered initiators. That would + // allow us to release the flushes_initiators_mu_ mutex before calling + // the callback (which may take a long time). + was_flush_initiated = initiator.cb(kMinFlushSizes[iter]); + } + + if (!was_flush_initiated) { + // No flush was initiated => undo the counters update + --num_running_flushes_; + ++num_flushes_to_initiate_; + ++num_repeated_failures_to_initiate; + } else { + num_repeated_failures_to_initiate = 0U; + } + } + ++iter; + } + TEST_SYNC_POINT_CALLBACK( + "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt", + &num_flushes_to_initiate_); + } +} + +void WriteBufferManager::TerminateFlushesThread() { + { + flushes_mu_->Lock(); + + terminate_flushes_thread_ = true; + WakeupFlushInitiationThreadLockHeld(); + } + + if (flushes_thread_.joinable()) { + flushes_thread_.join(); + } +} + +void WriteBufferManager::FlushStarted(bool wbm_initiated) { + // num_running_flushes_ is incremented in our thread when initiating flushes + // => Already accounted for + if (wbm_initiated || !enabled()) { + return; + } + + flushes_mu_->Lock(); + + ++num_running_flushes_; + size_t curr_memory_used = memory_usage(); + RecalcFlushInitiationSize(); + ReevaluateNeedForMoreFlushesLockHeld(curr_memory_used); +} + +void WriteBufferManager::FlushEnded(bool /* wbm_initiated */) { + if (!enabled()) { + return; + } + + flushes_mu_->Lock(); + + // The WBM may be enabled after a flush has started. In that case + // the WBM will not be aware of the number of running flushes at the time + // it is enabled. The counter will become valid once all of the flushes + // that were running when it was enabled will have completed. + if (num_running_flushes_ > 0U) { + --num_running_flushes_; + } + size_t curr_memory_used = memory_usage(); + RecalcFlushInitiationSize(); + ReevaluateNeedForMoreFlushesLockHeld(curr_memory_used); +} + +void WriteBufferManager::RecalcFlushInitiationSize() { + flushes_mu_->AssertHeld(); + + if (num_running_flushes_ + num_flushes_to_initiate_ >= + flush_initiation_options_.max_num_parallel_flushes) { + additional_flush_initiation_size_ = buffer_size(); + } else { + additional_flush_initiation_size_ = + flush_initiation_start_size_ + + additional_flush_step_size_ * + (num_running_flushes_ + num_flushes_to_initiate_); + } +} + +void WriteBufferManager::ReevaluateNeedForMoreFlushesNoLockHeld( + size_t curr_memory_used) { + flushes_mu_->Lock(); + ReevaluateNeedForMoreFlushesLockHeld(curr_memory_used); +} + +void WriteBufferManager::ReevaluateNeedForMoreFlushesLockHeld( + size_t curr_memory_used) { + assert(enabled()); + flushes_mu_->AssertHeld(); + + if (ShouldInitiateAnotherFlush(curr_memory_used)) { + // need to schedule more + ++num_flushes_to_initiate_; + RecalcFlushInitiationSize(); + WakeupFlushInitiationThreadLockHeld(); + } else { + flushes_mu_->Unlock(); + } +} + +uint64_t WriteBufferManager::FindInitiator(void* initiator) const { + flushes_initiators_mu_->AssertHeld(); + + for (auto i = 0U; i < flush_initiators_.size(); ++i) { + if (flush_initiators_[i].initiator == initiator) { + return i; + } + } + + return kInvalidInitiatorIdx; +} + +void WriteBufferManager::WakeupFlushInitiationThreadNoLockHeld() { + flushes_mu_->Lock(); + WakeupFlushInitiationThreadLockHeld(); +} + +// Assumed the lock is held +// Releases the lock upon exit +void WriteBufferManager::WakeupFlushInitiationThreadLockHeld() { + flushes_mu_->AssertHeld(); + + new_flushes_wakeup_ = true; + + // Done modifying the shared data. Release the lock so that when the flush + // initiation thread it may acquire the mutex immediately + flushes_mu_->Unlock(); + flushes_wakeup_cv_->Signal(); +} + +void WriteBufferManager::UpdateNextCandidateInitiatorIdx() { + flushes_initiators_mu_->AssertHeld(); + + if (flush_initiators_.empty() == false) { + if (next_candidate_initiator_idx_ != kInvalidInitiatorIdx) { + next_candidate_initiator_idx_ = + ((next_candidate_initiator_idx_ + 1) % flush_initiators_.size()); + } else { + next_candidate_initiator_idx_ = 0U; + } + } else { + next_candidate_initiator_idx_ = kInvalidInitiatorIdx; + } +} + +bool WriteBufferManager::IsInitiatorIdxValid(uint64_t initiator_idx) const { + flushes_initiators_mu_->AssertHeld(); + + return (initiator_idx < flush_initiators_.size()); +} + +void WriteBufferManager::TEST_WakeupFlushInitiationThread() { + WakeupFlushInitiationThreadNoLockHeld(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index 292d4c3f97..2d34da2a3f 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -9,7 +9,16 @@ #include "rocksdb/write_buffer_manager.h" +#include +#include +#include +#include +#include +#include + #include "rocksdb/advanced_cache.h" +#include "rocksdb/cache.h" +#include "test_util/sync_point.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { @@ -28,10 +37,12 @@ void ScheduleBeginAndFreeMem(WriteBufferManager& wbf, size_t size) { BeginAndFree(wbf, size); } } // namespace + TEST_F(WriteBufferManagerTest, ShouldFlush) { // A write buffer manager of size 10MB - std::unique_ptr wbf( - new WriteBufferManager(10 * 1024 * 1024)); + std::unique_ptr wbf(new WriteBufferManager( + 10 * 1024 * 1024, {} /* cache */, WriteBufferManager::kDfltAllowStall, + false /* initiate_flushes */)); wbf->ReserveMem(8 * 1024 * 1024); ASSERT_FALSE(wbf->ShouldFlush()); @@ -101,8 +112,9 @@ TEST_F(ChargeWriteBufferTest, Basic) { co.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr cache = NewLRUCache(co); // A write buffer manager of size 50MB - std::unique_ptr wbf( - new WriteBufferManager(50 * 1024 * 1024, cache)); + std::unique_ptr wbf(new WriteBufferManager( + 50 * 1024 * 1024, cache, WriteBufferManager::kDfltAllowStall, + false /* initiate_flushes */)); // Allocate 333KB will allocate 512KB, memory_used_ = 333KB wbf->ReserveMem(333 * 1024); @@ -114,8 +126,8 @@ TEST_F(ChargeWriteBufferTest, Basic) { // Allocate another 512KB, memory_used_ = 845KB wbf->ReserveMem(512 * 1024); // 2 more dummy entries are added for size 512 KB - // since ceil((memory_used_ - dummy_entries_in_cache_usage) % kSizeDummyEntry) - // = 2 + // since ceil((memory_used_ - dummy_entries_in_cache_usage) % + // kSizeDummyEntry) = 2 ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); @@ -159,8 +171,8 @@ TEST_F(ChargeWriteBufferTest, Basic) { // Free 20MB, memory_used_ = 31565KB // It will releae 80 dummy entries from cache since // since memory_used_ < dummy_entries_in_cache_usage * (3/4) - // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) - // = 80 + // and floor((dummy_entries_in_cache_usage - memory_used_) % + // kSizeDummyEntry) = 80 BeginAndFree(*wbf, 20 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 124 * kSizeDummyEntry); ASSERT_GE(cache->GetPinnedUsage(), 124 * 256 * 1024); @@ -181,8 +193,8 @@ TEST_F(ChargeWriteBufferTest, Basic) { // Free 20MB, memory_used_ = 11069KB // It will releae 80 dummy entries from cache // since memory_used_ < dummy_entries_in_cache_usage * (3/4) - // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) - // = 80 + // and floor((dummy_entries_in_cache_usage - memory_used_) % + // kSizeDummyEntry) = 80 ScheduleBeginAndFreeMem(*wbf, 20 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); @@ -306,6 +318,761 @@ TEST_F(ChargeWriteBufferTest, BasicWithCacheFull) { 46 * kSizeDummyEntry + kMetaDataChargeOverhead); } +#define VALIDATE_USAGE_STATE(memory_change_size, expected_state, \ + expected_factor) \ + ValidateUsageState(__LINE__, memory_change_size, expected_state, \ + expected_factor) + +class WriteBufferManagerTestWithParams + : public WriteBufferManagerTest, + public ::testing::WithParamInterface> { + public: + void SetUp() override { + wbm_enabled_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + allow_stall_ = std::get<2>(GetParam()); + } + + bool wbm_enabled_; + bool cost_cache_; + bool allow_stall_; +}; + +// ========================================================================== +#define CALL_WRAPPER(func) \ + func; \ + ASSERT_FALSE(HasFailure()); + +// #1: Quota (size_t). 0 == WBM disabled +// #2: Cost to cache (Boolean) +class WriteBufferManagerFlushInitiationTest + : public WriteBufferManagerTest, + public ::testing::WithParamInterface> { + public: + void SetUp() override { + quota_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + allow_stall_ = std::get<2>(GetParam()); + + wbm_enabled_ = (quota_ > 0U); + cache_ = NewLRUCache(4 * 1024 * 1024, 2); + max_num_parallel_flushes_ = + WriteBufferManager::FlushInitiationOptions().max_num_parallel_flushes; + + CreateWbm(); + SetupAndEnableTestPoints(); + + actual_num_cbs_ = 0U; + expected_num_cbs_ = 0U; + validation_num_ = 0U; + expected_num_flushes_to_initiate_ = 0U; + expected_num_running_flushes_ = 0U; + } + + void TearDown() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(expected_cb_initiators_.empty()); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()); + ASSERT_TRUE(flush_cb_results_.empty()); + + initiators_.clear(); + } + + bool IsWbmDisabled() const { return (wbm_enabled_ == false); } + + void CreateWbm() { + auto wbm_quota = (wbm_enabled_ ? quota_ : 0U); + WriteBufferManager::FlushInitiationOptions initiation_options; + initiation_options.max_num_parallel_flushes = max_num_parallel_flushes_; + + ASSERT_GT(max_num_parallel_flushes_, 0U); + flush_step_size_ = quota_ / max_num_parallel_flushes_; + + if (cost_cache_) { + wbm_.reset(new WriteBufferManager(wbm_quota, cache_, allow_stall_, true, + initiation_options)); + } else { + wbm_.reset(new WriteBufferManager(wbm_quota, nullptr, allow_stall_, true, + initiation_options)); + } + ASSERT_EQ(wbm_->enabled(), wbm_enabled_); + ASSERT_TRUE(wbm_->IsInitiatingFlushes()); + } + + uint64_t CreateInitiator() { + auto initiator = std::make_unique(++next_initiator_id_); + auto initiator_id = *initiator; + initiators_.push_back(std::move(initiator)); + return initiator_id; + } + + void RegisterInitiator(uint64_t initiator_id) { + auto initiator = FindInitiator(initiator_id); + ASSERT_NE(initiator, nullptr); + if (initiator != nullptr) { + auto cb = + std::bind(&WriteBufferManagerFlushInitiationTest::FlushRequestCb, + this, std::placeholders::_1, initiator); + wbm_->RegisterFlushInitiator(initiator, cb); + } + } + + uint64_t CreateAndRegisterInitiator() { + auto initiator_id = CreateInitiator(); + RegisterInitiator(initiator_id); + return initiator_id; + } + + std::optional FindInitiatorIdx(uint64_t initiator_id) { + for (auto i = 0U; i < initiators_.size(); ++i) { + if (*initiators_[i] == initiator_id) { + return i; + } + } + + return {}; + } + + uint64_t* FindInitiator(uint64_t initiator_id) { + auto initiator_idx = FindInitiatorIdx(initiator_id); + if (initiator_idx.has_value()) { + return initiators_[initiator_idx.value()].get(); + } else { + ADD_FAILURE(); + return nullptr; + } + } + + void DeregisterInitiator(uint64_t initiator_id) { + auto initiator_idx = FindInitiatorIdx(initiator_id); + ASSERT_TRUE(initiator_idx.has_value()); + + if (initiator_idx.has_value()) { + wbm_->DeregisterFlushInitiator(initiators_[initiator_idx.value()].get()); + initiators_.erase(initiators_.begin() + initiator_idx.value()); + } + } + + struct ExpectedCbInfo { + uint64_t initiator_id; + size_t min_size_to_flush; + bool flush_cb_result; + }; + + void AddExpectedCbsInfos(const std::vector& cbs_infos) { + ASSERT_TRUE(expected_cb_initiators_.empty()); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()); + ASSERT_TRUE(flush_cb_results_.empty()); + + if (IsWbmDisabled()) { + return; + } + + for (const auto& cb_info : cbs_infos) { + auto initiator = FindInitiator(cb_info.initiator_id); + ASSERT_NE(initiator, nullptr); + expected_cb_initiators_.push_back(initiator); + + expected_cb_min_size_to_flush_.push_back(cb_info.min_size_to_flush); + flush_cb_results_.push_back(cb_info.flush_cb_result); + } + actual_num_cbs_ = 0U; + expected_num_cbs_ = cbs_infos.size(); + + ++validation_num_; + std::string test_point_name_suffix = std::to_string(validation_num_); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" + + test_point_name_suffix, + "ValidateState::WaitUntilValidtionPossible:" + + test_point_name_suffix}}); + } + + // Flush initiaion callback registered with the WBM + bool FlushRequestCb(size_t min_size_to_flush, void* initiator) { + EXPECT_TRUE(wbm_enabled_); + + ++actual_num_cbs_; + + if (expected_cb_min_size_to_flush_.empty() == false) { + EXPECT_EQ(expected_cb_min_size_to_flush_[0], min_size_to_flush); + expected_cb_min_size_to_flush_.erase( + expected_cb_min_size_to_flush_.begin()); + } else { + EXPECT_FALSE(expected_cb_min_size_to_flush_.empty()); + } + + if (expected_cb_initiators_.empty() == false) { + EXPECT_EQ(expected_cb_initiators_[0], initiator); + expected_cb_initiators_.erase(expected_cb_initiators_.begin()); + } else { + EXPECT_FALSE(expected_cb_initiators_.empty()); + } + + if (flush_cb_results_.empty() == false) { + bool result = flush_cb_results_[0]; + flush_cb_results_.erase(flush_cb_results_.begin()); + return result; + } else { + EXPECT_FALSE(flush_cb_results_.empty()); + // Arbitrarily return true as we must return a bool to compile + return true; + } + }; + + // Sync Test Point callback called when the flush initiation thread + // completes initating all flushes and resumes waiting for the condition + // variable to be signalled again + void DoneInitiationsAttemptTestPointCb(void* /* arg */) { + if (actual_num_cbs_ == expected_num_cbs_) { + auto sync_point_name = + "DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" + + std::to_string(validation_num_); + TEST_SYNC_POINT(sync_point_name); + } + } + + void SetupAndEnableTestPoints() { + if (IsWbmDisabled()) { + return; + } + + SyncPoint::GetInstance()->SetCallBack( + "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt", + [&](void* arg) { DoneInitiationsAttemptTestPointCb(arg); }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + + void ValidateState(bool wait_on_sync_point) { + if (wbm_enabled_ && wait_on_sync_point) { + auto sync_point_name = "ValidateState::WaitUntilValidtionPossible:" + + std::to_string(validation_num_); + TEST_SYNC_POINT(sync_point_name); + } + + ASSERT_EQ(wbm_->TEST_GetNumFlushesToInitiate(), + expected_num_flushes_to_initiate_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), expected_num_running_flushes_); + + ASSERT_TRUE(expected_cb_initiators_.empty()) + << "Num entries:" << expected_cb_initiators_.size(); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()) + << "Num entries:" << expected_cb_min_size_to_flush_.size(); + ASSERT_TRUE(flush_cb_results_.empty()) + << "Num entries:" << flush_cb_results_.size(); + } + + void EndFlush(bool wbm_initiated, size_t released_size, + bool wait_on_sync_point = false) { + wbm_->FreeMem(released_size); + wbm_->FlushEnded(wbm_initiated /* wbm_initiated */); + DecNumRunningFlushes(); + ValidateState(wait_on_sync_point); + } + + void StartAndEndFlush(bool wbm_initiated, size_t released_size) { + wbm_->ScheduleFreeMem(released_size); + wbm_->FreeMemBegin(released_size); + + // "Run" the flush to completion & release the memory + wbm_->FlushStarted(wbm_initiated /* wbm_initiated */); + if ((wbm_initiated == false) && wbm_enabled_) { + ++expected_num_running_flushes_; + } + EndFlush(wbm_initiated, released_size); + } + + void IncNumRunningFlushes() { + if (wbm_enabled_) { + ++expected_num_running_flushes_; + } + } + + void DecNumRunningFlushes() { + if (wbm_enabled_) { + --expected_num_running_flushes_; + } + } + + void IncNumFlushesToInitiate() { + if (wbm_enabled_) { + ++expected_num_flushes_to_initiate_; + } + } + + void DecNumFlushesToInitiate() { + if (wbm_enabled_) { + --expected_num_flushes_to_initiate_; + } + } + + protected: + size_t CalcExpectedMinSizeToFlush() { + return std::min(quota_ / (2 * max_num_parallel_flushes_), + 64 * (1 << 20)); + } + + protected: + std::unique_ptr wbm_; + + size_t quota_ = 0U; + bool wbm_enabled_; + bool cost_cache_; + std::shared_ptr cache_; + bool allow_stall_ = false; + size_t max_num_parallel_flushes_; + size_t flush_step_size_ = 0U; + + std::vector> initiators_; + uint64_t next_initiator_id_ = 0U; + std::vector expected_cb_initiators_; + std::vector expected_cb_min_size_to_flush_; + std::vector flush_cb_results_; + size_t actual_num_cbs_ = 0; + size_t expected_num_cbs_ = 0U; + size_t expected_num_flushes_to_initiate_ = 0U; + size_t expected_num_running_flushes_ = 0U; + size_t validation_num_ = 0U; +}; + +TEST_P(WriteBufferManagerFlushInitiationTest, Basic) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, NonWbmInitiatedFlush) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + + // Reach the 1st step => No need to initiate a flush (one is already + // running) + wbm_->ReserveMem(flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // End the non-wbm flush without releasing memory, just for testing purposes + // Expecting a wbm-initiated flush request since we are still over the step + wbm_->FlushEnded(false /* wbm_initiated */); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the wbm-initiated flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, MaxNumParallelFlushes) { + // Replace the WBM with a new WBM that is configured with our max num of + // parallel flushes + max_num_parallel_flushes_ = 3U; + ASSERT_NE(max_num_parallel_flushes_, + wbm_->GetFlushInitiationOptions().max_num_parallel_flushes); + CreateWbm(); + ASSERT_EQ(wbm_->GetFlushInitiationOptions().max_num_parallel_flushes, + max_num_parallel_flushes_); + + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + // Start 3 (max) number of non-wbm flushes + for (auto i = 0U; i < max_num_parallel_flushes_; ++i) { + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + } + + // Reserve memory to allow for up to 3 (max) wbm-initiated flushes + // However, 3 (max) are already running => no wbm-initaited flush expected + wbm_->ReserveMem(max_num_parallel_flushes_ * flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + // Start another (total of 4 > max) non-wbm flush + wbm_->ReserveMem(2 * flush_step_size_); + + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(false)); + + // End one of the non-wbm flushes 3 (max) still running, and usage requires + // max flushes + CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_)); + + // End another one of the non-wbm flushes => 2 (< max) running => + // Expecting one wbm-initiated + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + // Increasing since expecteing wbm to initiate it + IncNumRunningFlushes(); + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_, + true /* wait_on_sync_point */)); + + wbm_->ReserveMem(2 * flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + // End a wbm-initiated flushes => 2 (< max) running => Expecting one + // wbm-initiated + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + // Increasing since expecteing wbm to initiate it + IncNumRunningFlushes(); + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + CALL_WRAPPER(EndFlush(true /* wbm_initiated */, flush_step_size_, + true /* wait_on_sync_point */)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, JumpToQuota) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(quota_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, quota_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + FailureToStartFlushWhenRequested) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + // Setup two cb-s to fail to start the flush (flush_cb_result == false) + // First with CalcExpectedMinSizeToFlush() size, Second with 0 + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id, 0U, false /* flush_cb_result */}})); + + // Reach the 1st step => expecting the 2 requests set up above + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(true)); + + // Setup another two identical cb-s + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id, 0U, false /* flush_cb_result */}})); + + // Reserve a bit more, but still within the same step. This will initiate + // the next 2 request set up just above + wbm_->TEST_WakeupFlushInitiationThread(); + CALL_WRAPPER(ValidateState(true)); + + // Now, allow the second request to succeed + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id, 0U, true /* flush_cb_result */}})); + + // Reserve a bit more, but still within the same step. This will initiate + // the next 2 request set up just above + wbm_->TEST_WakeupFlushInitiationThread(); + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + DeregisterInitiator(initiator_id); +} + +// TODO - Update the test - Currently fails +TEST_P(WriteBufferManagerFlushInitiationTest, DISABLED_FlushInitiationSteps) { + // Too much (useless) effort to adapt to the disabled case so just skipping + if (IsWbmDisabled()) { + return; + } + auto initiator_id = CreateAndRegisterInitiator(); + + // Increase the usage gradually in half-steps, each time expecting another + // flush to be initiated + for (auto i = 0U; i < max_num_parallel_flushes_; ++i) { + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(true)); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + IncNumRunningFlushes(); + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(true)); + } + ASSERT_EQ(wbm_->memory_usage(), quota_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_); + + // Increase the usage over the quota. Not expecting any initiation activity + wbm_->ReserveMem(flush_step_size_ / 2); + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(false)); + + // Start all of the WBM flushes + some more that are NOT WBM flushes. + // No new flush should initiate + auto wbm_initiated = true; + size_t num_non_wbm_running_flushes = 0U; + for (auto i = 0U; i < 2 * max_num_parallel_flushes_; ++i) { + wbm_->FlushStarted(wbm_initiated); + if (wbm_initiated == false) { + IncNumRunningFlushes(); + ++num_non_wbm_running_flushes; + } + wbm_initiated = !wbm_initiated; + } + ASSERT_EQ(expected_num_running_flushes_ - num_non_wbm_running_flushes, + max_num_parallel_flushes_); + CALL_WRAPPER(ValidateState(false)); + + // Release flushes + memory so that we are at the quota with max num + // of parallel flushes + while (expected_num_running_flushes_ > max_num_parallel_flushes_) { + EndFlush(wbm_initiated, 0U /* released_size */); + wbm_initiated = !wbm_initiated; + } + wbm_->FreeMem(flush_step_size_); + ASSERT_EQ(wbm_->memory_usage(), quota_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_); + CALL_WRAPPER(ValidateState(false)); + + // Decrease just below the current flush step size + wbm_->FreeMem(1U); + + while (wbm_->memory_usage() >= flush_step_size_) { + EndFlush(true, 0U /* released_size */); + CALL_WRAPPER(ValidateState(false)); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + IncNumRunningFlushes(); + EndFlush(false, 0U /* released_size */, true /* wait_on_sync_point */); + + wbm_->FreeMem(flush_step_size_); + } + ASSERT_EQ(wbm_->memory_usage(), flush_step_size_ - 1); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 1U); + + // End the last remaining flush and release all used memory + EndFlush(true, flush_step_size_ - 1 /* released_size */); + ASSERT_EQ(wbm_->memory_usage(), 0U); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 0U); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, RegisteringLate) { + // Reach the 1st step, but no registered initiators + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(false)); + + // Register an initiator and expect it to receive the initiation request + auto initiator_id = CreateInitiator(); + CALL_WRAPPER(AddExpectedCbsInfos({{initiator_id, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + RegisterInitiator(initiator_id); + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, Deregistering) { + // Register a single initiator + auto initiator_id1 = CreateAndRegisterInitiator(); + + // initiator1 fails to initiate + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id1, 0U, false /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(true)); + + // Deregisters and comes initiator2 + DeregisterInitiator(initiator_id1); + auto initiator_id2 = CreateInitiator(); + + // Set initiator2 to initiate the flush + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id2, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + RegisterInitiator(initiator_id2); + + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, TwoInitiatorsBasic) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // Expect the 1st request to reach initiator1 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id2, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // Expect the 2nd request to reach initiator2 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator1 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + // "Run" the flush of initiator2 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); + DeregisterInitiator(initiator_id1); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + TwoInitiatorsFirstFailsToInitiate) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id2, CalcExpectedMinSizeToFlush(), + false /* flush_cb_result */}, + {initiator_id1, 0U, false /* flush_cb_result */}, + {initiator_id2, 0U, true /* flush_cb_result */}})); + + // Expect the 1st request to reach initiator2 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator1 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + // Expect the 2nd request to reach initiator1 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator2 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); + DeregisterInitiator(initiator_id1); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + TwoInitiatorsDeregisteringWhileBeingNextToFlush) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + // Initiator1 initiates, initiator2 is next + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + if (wbm_enabled_) { + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 1U); + } + + // Initiator2 will be deregistered => prepare another initiation for + // initiator1 + CALL_WRAPPER( + AddExpectedCbsInfos({{initiator_id1, CalcExpectedMinSizeToFlush(), + true /* flush_cb_result */}})); + + DeregisterInitiator(initiator_id2); + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U); + + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U); + + // "Run" both flushes to completion & release the memory + for (auto i = 0U; i < 2; ++i) { + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + } + + DeregisterInitiator(initiator_id1); +} + +INSTANTIATE_TEST_CASE_P(WriteBufferManagerTestWithParams, + WriteBufferManagerTestWithParams, + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool())); + +// Run the flush initiation tests in all combinations of: +// 1. WBM Enabled (buffer size > 0) / WBM Disabled (0 buffer size) +// 2. With and without costing to cache +// 3. Allow / Disallow delays and stalls +INSTANTIATE_TEST_CASE_P(WriteBufferManagerFlushInitiationTest, + WriteBufferManagerFlushInitiationTest, + ::testing::Combine(::testing::Values(10 * 1000, 0), + ::testing::Bool(), + ::testing::Bool())); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 842388235d..3c3e799319 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -74,6 +74,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_buffer_manager.h" #include "speedb/version.h" #include "test_util/testutil.h" #include "test_util/transaction_test_util.h" @@ -539,6 +540,19 @@ DEFINE_bool(cost_write_buffer_to_cache, false, DEFINE_bool(allow_wbm_stalls, false, "Enable WBM write stalls and delays"); +DEFINE_bool(initiate_wbm_flushes, false, + "WBM will proactively initiate flushes (Speedb)." + "If false, WBM-related flushes will be initiated using the " + "ShouldFlush() service " + "of the WBM."); + +DEFINE_uint32(max_num_parallel_flushes, + ROCKSDB_NAMESPACE::WriteBufferManager::FlushInitiationOptions:: + kDfltMaxNumParallelFlushes, + "In case FLAGGS_initiate_wbm_flushes is true, this flag will " + "overwrite the default " + "max number of parallel flushes."); + DEFINE_int64(arena_block_size, ROCKSDB_NAMESPACE::Options().arena_block_size, "The size, in bytes, of one block in arena memory allocation."); @@ -3544,6 +3558,12 @@ class Benchmark { std::string name; std::unique_ptr filter; while (std::getline(benchmark_stream, name, ',')) { + if (open_options_.write_buffer_manager) { + fprintf(stderr, "\nBEFORE Benchmark (%s): %lu OF %lu\n\n", name.c_str(), + open_options_.write_buffer_manager->memory_usage(), + open_options_.write_buffer_manager->buffer_size()); + } + // Sanitize parameters num_ = FLAGS_num; reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); @@ -3989,6 +4009,12 @@ class Benchmark { if (post_process_method != nullptr) { (this->*post_process_method)(); } + + if (open_options_.write_buffer_manager) { + fprintf(stderr, "\nAFTER Benchmark (%s): %lu OF %lu\n", name.c_str(), + open_options_.write_buffer_manager->memory_usage(), + open_options_.write_buffer_manager->buffer_size()); + } } if (secondary_update_thread_) { @@ -4332,13 +4358,6 @@ class Benchmark { FLAGS_compression_use_zstd_dict_trainer; options.max_open_files = FLAGS_open_files; - if ((FLAGS_db_write_buffer_size == 0) && FLAGS_allow_wbm_stalls) { - ErrorExit("-allow_wbm_stalls is useless if db_write_buffer_size == 0"); - } - if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { - options.write_buffer_manager.reset(new WriteBufferManager( - FLAGS_db_write_buffer_size, cache_, FLAGS_allow_wbm_stalls)); - } options.arena_block_size = FLAGS_arena_block_size; options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; @@ -4800,6 +4819,27 @@ class Benchmark { options.track_and_verify_wals_in_manifest = FLAGS_track_and_verify_wals_in_manifest; + if (FLAGS_db_write_buffer_size == 0) { + if (FLAGS_allow_wbm_stalls) { + ErrorExit("-allow_wbm_stalls is useless if db_write_buffer_size == 0"); + } + if (FLAGS_initiate_wbm_flushes) { + ErrorExit( + "-initiate_wbm_flushes is useless if db_write_buffer_size == 0"); + } + } + if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { + WriteBufferManager::FlushInitiationOptions flush_initiation_options; + if (FLAGS_max_num_parallel_flushes > 0U) { + flush_initiation_options.max_num_parallel_flushes = + FLAGS_max_num_parallel_flushes; + } + + options.write_buffer_manager.reset(new WriteBufferManager( + FLAGS_db_write_buffer_size, cache_, FLAGS_allow_wbm_stalls, + FLAGS_initiate_wbm_flushes, flush_initiation_options)); + } + // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; options.min_blob_size = FLAGS_min_blob_size; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 86159a98ac..00c0399c47 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -163,6 +163,7 @@ # [0, 0, 1024 * 1024]), "db_write_buffer_size" : lambda: random.choice( [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024, 1024 * 1024 * 1024]), + "initiate_wbm_flushes" : lambda: random.choice([0, 1]), "avoid_unnecessary_blocking_io": random.randint(0, 1), "write_dbid_to_manifest": random.randint(0, 1), "avoid_flush_during_recovery": lambda: random.choice( @@ -761,6 +762,11 @@ def finalize_and_sanitize(src_params, counter): if dest_params.get("filter_uri") != "": dest_params["bloom_bits"] = random.choice([random.randint(1,19), random.lognormvariate(2.3, 1.3)]) + + # If initiate_wbm_flushes is enabled, db_write_buffer_size must be > 0, otherwise db_stress crashes + if dest_params.get("initiate_wbm_flushes") == 1: + dest_params["db_write_buffer_size"]= random.choice([1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024, 1024 * 1024 * 1024]) + return dest_params