diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 3bbce0b71e..ccb1552fb9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -513,6 +513,13 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { } Status DBImpl::CloseHelper() { + if (is_registered_for_flush_initiation_rqsts_) { + assert(write_buffer_manager_); + assert(write_buffer_manager_->IsInitiatingFlushes()); + write_buffer_manager_->DeregisterFlushInitiator(this); + is_registered_for_flush_initiation_rqsts_ = false; + } + // Guarantee that there is no background error recovery in progress before // continuing with the shutdown mutex_.Lock(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 97e3d1b8a9..be8f931c79 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -363,6 +363,9 @@ class DBImpl : public DB { virtual Status LockWAL() override; virtual Status UnlockWAL() override; + // flush initiated by the write buffer manager to free some space + bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush); + virtual SequenceNumber GetLatestSequenceNumber() const override; // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire @@ -2393,6 +2396,8 @@ class DBImpl : public DB { // Pointer to WriteBufferManager stalling interface. std::unique_ptr wbm_stall_; + + bool is_registered_for_flush_initiation_rqsts_ = false; }; extern Options SanitizeOptions(const std::string& db, const Options& src, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 02f7d7d951..084a9aecfb 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2779,12 +2779,18 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, bg_job_limits.max_compactions, bg_flush_scheduled_, bg_compaction_scheduled_); } + // HILIK: must move here + *reason = bg_flush_args[0].cfd_->GetFlushReason(); + if (write_buffer_manager_) { + write_buffer_manager_->FlushStarted( + *reason == FlushReason::kSpeedbWriteBufferManager); + } + status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, job_context, log_buffer, thread_pri); TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush"); // All the CFDs in the FlushReq must have the same flush reason, so just // grab the first one - *reason = bg_flush_args[0].cfd_->GetFlushReason(); for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; if (cfd->UnrefAndTryDelete()) { @@ -2869,6 +2875,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { assert(num_running_flushes_ > 0); num_running_flushes_--; bg_flush_scheduled_--; + if (write_buffer_manager_) { + write_buffer_manager_->FlushEnded(reason == + FlushReason::kSpeedbWriteBufferManager); + } // See if there's more work to be done MaybeScheduleFlushOrCompaction(); atomic_flush_install_cv_.SignalAll(); @@ -3790,4 +3800,96 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) { return error_handler_.GetBGError(); } +bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) { + ColumnFamilyData* cfd_to_flush = nullptr; + + { + InstrumentedMutexLock l(&mutex_); + + if (shutdown_initiated_) { + return false; + } + + autovector cfds; + if (immutable_db_options_.atomic_flush) { + SelectColumnFamiliesForAtomicFlush(&cfds); + size_t total_size_to_flush = 0U; + for (const auto& cfd : cfds) { + if (cfd->imm()->NumNotFlushed() > 0) { + total_size_to_flush = min_size_to_flush; + break; + } else { + total_size_to_flush += cfd->mem()->ApproximateMemoryUsage(); + } + } + if (total_size_to_flush < min_size_to_flush) { + return false; + } + } else { + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; + + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if ((cfd->imm()->NumNotFlushed() != 0) || + (!cfd->mem()->IsEmpty() && + (cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) { + // We only consider active mem table, hoping immutable memtable is + // already in the process of flushing. + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; + } + } + } + if (cfd_picked != nullptr) { + cfds.push_back(cfd_picked); + } + + // // MaybeFlushStatsCF(&cfds); + } + + if (cfds.empty()) { + return false; + } + + cfd_to_flush = cfds.front(); + } + + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] write buffer manager flush started current usage %lu out of %lu", + cfd_to_flush->GetName().c_str(), + cfd_to_flush->write_buffer_mgr()->memory_usage(), + cfd_to_flush->write_buffer_mgr()->buffer_size()); + + TEST_SYNC_POINT("DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush"); + + // URQ - When the memory manager picks a cf for flushing, it considers its + // mutable memtable as well. However, here we reuest a flush of only the + // immutable ones => the freed memory by flushing is not the same as the one + // causing this cf to be picked + FlushOptions flush_options; + flush_options.allow_write_stall = true; + flush_options.wait = false; + Status s; + if (immutable_db_options_.atomic_flush) { + s = AtomicFlushMemTables({cfd_to_flush}, flush_options, + FlushReason::kSpeedbWriteBufferManager); + } else { + s = FlushMemTable(cfd_to_flush, flush_options, + FlushReason::kSpeedbWriteBufferManager); + } + + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "[%s] write buffer manager initialize flush finished, status: %s\n", + cfd_to_flush->GetName().c_str(), s.ToString().c_str()); + + return s.ok(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 64896cba47..8f835a781d 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1985,6 +1985,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } handles->clear(); } + + if (s.ok()) { + auto wbm = db_options.write_buffer_manager.get(); + auto db_impl = static_cast(*dbptr); + + if (wbm && wbm->IsInitiatingFlushes()) { + // Registering regardless of wbm->enabled() since the buffer size may be + // set later making the WBM enabled, but we will not re-register again + // However, notifications will only be received when the wbm is enabled + auto cb = [db_impl](size_t min_size_to_flush) { + return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush); + }; + wbm->RegisterFlushInitiator(db_impl, cb); + db_impl->is_registered_for_flush_initiation_rqsts_ = true; + } + } + return s; } } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 39657d4623..155ac64cf4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1093,6 +1093,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, if (write_options.no_slowdown) { status = Status::Incomplete("Write stall"); } else { + // Initiating a flush to avoid blocking forever + if (num_running_flushes_ == 0) { + WaitForPendingWrites(); + status = HandleWriteBufferManagerFlush(write_context); + } WriteBufferManagerStallWrites(); } } @@ -1709,6 +1714,10 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue void DBImpl::WriteBufferManagerStallWrites() { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Write-Buffer-Manager Stalls Writes"); + mutex_.AssertHeld(); // First block future writer threads who want to add themselves to the queue // of WriteThread. @@ -1723,7 +1732,12 @@ void DBImpl::WriteBufferManagerStallWrites() { write_buffer_manager_->BeginWriteStall(wbm_stall_.get()); wbm_stall_->Block(); + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Write-Buffer-Manager Stall Writes END"); + mutex_.Lock(); + // Stall has ended. Signal writer threads so that they can add // themselves to the WriteThread queue for writes. write_thread_.EndWriteStall(); diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index c1e8f7100e..1e25b448ad 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -7,6 +7,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + #include "db/db_test_util.h" #include "db/write_thread.h" #include "port/stack_trace.h" @@ -14,10 +17,12 @@ namespace ROCKSDB_NAMESPACE { class DBWriteBufferManagerTest : public DBTestBase, - public testing::WithParamInterface { + public ::testing::WithParamInterface { public: DBWriteBufferManagerTest() : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } bool cost_cache_; }; @@ -27,7 +32,6 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -70,7 +74,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -197,7 +200,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -314,7 +316,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -456,7 +457,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -618,7 +618,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -780,9 +779,81 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +class DBWriteBufferManagerTest1 : public DBTestBase, + public ::testing::WithParamInterface { + public: + DBWriteBufferManagerTest1() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } + bool cost_cache_; +}; +// =============================================================================================================== +class DBWriteBufferManagerFlushTests + : public DBTestBase, + public ::testing::WithParamInterface { + public: + DBWriteBufferManagerFlushTests() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } + bool cost_cache_; +}; + +TEST_P(DBWriteBufferManagerFlushTests, WbmFlushesSingleDBSingleCf) { + constexpr size_t kQuota = 100 * 1000; + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = kQuota; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + + auto allow_stall_ = false; + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, cache, allow_stall_, true)); + } else { + options.write_buffer_manager.reset(new WriteBufferManager( + kQuota, nullptr, allow_stall_, true)); + } + auto* wbm = options.write_buffer_manager.get(); + size_t flush_step_size = + kQuota / wbm->GetFlushInitiationOptions().max_num_parallel_flushes; + + WriteOptions wo; + wo.disableWAL = true; + + DestroyAndReopen(options); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush", + "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::" + "Flushing"}}); + + // Reach the flush step by writing to two cf-s, no flush + ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo)); + ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo)); + + TEST_SYNC_POINT( + "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::Flushing"); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1, + ::testing::Bool()); + +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerFlushTests, + DBWriteBufferManagerFlushTests, + ::testing::Values(false)); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 96d62d7891..199e3f219f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -77,6 +77,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) { return "Error Recovery"; case FlushReason::kWalFull: return "WAL Full"; + case FlushReason::kSpeedbWriteBufferManager: + return "Speedb Write Buffer Manager"; default: return "Invalid"; } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 5f6f890599..9d88327eba 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -169,6 +169,7 @@ enum class FlushReason : int { // will not be called to avoid many small immutable memtables. kErrorRecoveryRetryFlush = 0xc, kWalFull = 0xd, + kSpeedbWriteBufferManager = 0xe, }; // TODO: In the future, BackgroundErrorReason will only be used to indicate diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 89296d001b..1fe719ac68 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -17,10 +17,15 @@ #include #include #include +#include +#include +#include +#include #include "rocksdb/cache.h" namespace ROCKSDB_NAMESPACE { +class Options; class CacheReservationManager; // Interface to block and signal DB instances, intended for RocksDB @@ -35,6 +40,14 @@ class StallInterface { }; class WriteBufferManager final { + public: + static constexpr uint64_t kStartFlushPercentThreshold = 80U; + + struct FlushInitiationOptions { + FlushInitiationOptions() {} + size_t max_num_parallel_flushes = 4U; + }; + public: // Parameters: // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped. @@ -44,12 +57,25 @@ class WriteBufferManager final { // cost the memory allocated to the cache. It can be used even if _buffer_size // = 0. // - // allow_stall: if set true, it will enable stalling of writes when - // memory_usage() exceeds buffer_size. It will wait for flush to complete and - // memory usage to drop down. - explicit WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache = {}, - bool allow_stall = false); + // allow_delays_and_stalls: if set true, it will enable delays and stall as + // described below. + // Delays: if set to true, it will start delaying of writes when + // memory_usage() exceeds the kStartFlushPercentThreshold percent threshold + // of the buffer size. The WBM calculates a delay factor that is increasing + // as memory_usage() increases. When applicable, the WBM will notify its + // registered clients about the applicable delay factor. Clients are + // expected to set their respective delayed write rates accordingly. When + // memory_usage() reaches buffer_size(), the (optional) WBM stall mechanism + // kicks in if enabled. (see allow_delays_and_stalls above) + // Stalls: stalling of writes when memory_usage() exceeds buffer_size. It + // will wait for flush to complete and + // memory usage to drop down. + explicit WriteBufferManager( + size_t _buffer_size, std::shared_ptr cache = {}, + bool allow_stall = false, bool initiate_flushes = false, + const FlushInitiationOptions& flush_initiation_options = + FlushInitiationOptions()); + // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; @@ -108,13 +134,18 @@ class WriteBufferManager final { // Check if stall is active and can be ended. MaybeEndWriteStall(); + if (enabled()) { + if (initiate_flushes_) { + InitFlushInitiationVars(new_size); + } + } } // Below functions should be called by RocksDB internally. // Should only be called from write thread bool ShouldFlush() const { - if (enabled()) { + if ((initiate_flushes_ == false) && enabled()) { if (mutable_memtable_memory_usage() > mutable_limit_.load(std::memory_order_relaxed)) { return true; @@ -187,6 +218,32 @@ class WriteBufferManager final { std::string GetPrintableOptions() const; + public: + bool IsInitiatingFlushes() const { return initiate_flushes_; } + const FlushInitiationOptions& GetFlushInitiationOptions() const { + return flush_initiation_options_; + } + + public: + using InitiateFlushRequestCb = std::function; + + void RegisterFlushInitiator(void* initiator, InitiateFlushRequestCb request); + void DeregisterFlushInitiator(void* initiator); + + void FlushStarted(bool wbm_initiated); + void FlushEnded(bool wbm_initiated); + + public: + size_t TEST_GetNumFlushesToInitiate() const { + return num_flushes_to_initiate_; + } + size_t TEST_GetNumRunningFlushes() const { return num_running_flushes_; } + size_t TEST_GetNextCandidateInitiatorIdx() const { + return next_candidate_initiator_idx_; + } + + void TEST_WakeupFlushInitiationThread(); + private: std::atomic buffer_size_; std::atomic mutable_limit_; @@ -207,7 +264,101 @@ class WriteBufferManager final { // while holding mu_, but it can be read without a lock. std::atomic stall_active_; - void ReserveMemWithCache(size_t mem); - void FreeMemWithCache(size_t mem); + // Return the new memory usage + size_t ReserveMemWithCache(size_t mem); + size_t FreeMemWithCache(size_t mem); + + private: + struct InitiatorInfo { + void* initiator; + InitiateFlushRequestCb cb; + }; + + static constexpr uint64_t kInvalidInitiatorIdx = + std::numeric_limits::max(); + + private: + void InitFlushInitiationVars(size_t quota); + void InitiateFlushesThread(); + bool InitiateAdditionalFlush(); + void WakeUpFlushesThread(); + void TerminateFlushesThread(); + void RecalcFlushInitiationSize(); + void ReevaluateNeedForMoreFlushes(size_t curr_memory_used); + uint64_t FindInitiator(void* initiator) const; + + // Assumed the lock is held + void WakeupFlushInitiationThread() { + new_flushes_wakeup_ = true; + flushes_wakeup_cv_.notify_one(); + } + + // This is used outside the flushes_mu_ lock => only + // additional_flush_initiation_size_ needs to be atomic + // TODO + // free mem can be delayed after flush ended due to a thread that holds the version + // for now the memory is accounted as dirty (althogh it is not) + // need to move the accounting to the cache / clean where it belong + bool ShouldInitiateAnotherFlushMemOnly(size_t curr_memory_used) const { + return (curr_memory_used - memory_being_freed_ >= additional_flush_step_size_/2 && + curr_memory_used >= additional_flush_initiation_size_); + } + + // This should be called only unther the flushes_mu_ lock + bool ShouldInitiateAnotherFlush(size_t curr_memory_used) const { + return (((num_running_flushes_ + num_flushes_to_initiate_) < + flush_initiation_options_.max_num_parallel_flushes) && + ShouldInitiateAnotherFlushMemOnly(curr_memory_used)); + } + + // Assumed flushes_mu_ is locked + uint64_t CalcNextCandidateInitiatorIdx() const { + // The index is irrelevant when there are no initiators so we might as well + // set it to 0 + return (flush_initiators_.empty() ? 0U + : (next_candidate_initiator_idx_ + 1) % + flush_initiators_.size()); + } + + bool IsInitiatorIdxValid(uint64_t initiator_idx) const { + return (initiator_idx != kInvalidInitiatorIdx); + } + + private: + // Flush Initiation Data Members + + const bool initiate_flushes_ = false; + const FlushInitiationOptions flush_initiation_options_ = + FlushInitiationOptions(); + + std::vector flush_initiators_; + std::atomic num_initiators_ = 0U; + uint64_t next_candidate_initiator_idx_ = 0U; + + // Consider if this needs to be atomic + std::atomic num_flushes_to_initiate_ = 0U; + std::atomic num_running_flushes_ = 0U; + size_t flush_initiation_start_size_ = 0U; + size_t additional_flush_step_size_ = 0U; + std::atomic additional_flush_initiation_size_ = 0U; + size_t min_flush_size_ = 0U; + + std::mutex flushes_mu_; + std::mutex flushes_initiators_mu_; + std::condition_variable flushes_wakeup_cv_; + bool new_flushes_wakeup_ = false; + + std::thread flushes_thread_; + bool terminate_flushes_thread_ = false; }; + +// This is a convenience utility for users of the WriteBufferManager that +// wish to use Speedb's WBM flush initiation mechanism. +// For such users, Speedb recommends effectively disabling the existing +// mechanisms that flush based on write buffers' configureation (size, number, +// etc). So, Speedb's recommendation would be to call this function resulting +// the WBM as the sole automatic initiator of flushes. +extern void SanitizeOptionsToDisableFlushesBasedOnWriteBufferOptions( + Options& options); + } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 770224ff05..bf90a46551 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -9,18 +9,21 @@ #include "rocksdb/write_buffer_manager.h" +#include #include #include "cache/cache_entry_roles.h" #include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" #include "rocksdb/status.h" +#include "test_util/sync_point.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { -WriteBufferManager::WriteBufferManager(size_t _buffer_size, - std::shared_ptr cache, - bool allow_stall) +WriteBufferManager::WriteBufferManager( + size_t _buffer_size, std::shared_ptr cache, + bool allow_stall, bool initiate_flushes, + const FlushInitiationOptions& flush_initiation_options) : buffer_size_(_buffer_size), mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), @@ -28,7 +31,9 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, memory_being_freed_(0U), cache_res_mgr_(nullptr), allow_stall_(allow_stall), - stall_active_(false) { + stall_active_(false), + initiate_flushes_(initiate_flushes), + flush_initiation_options_(flush_initiation_options) { #ifndef ROCKSDB_LITE if (cache) { // Memtable's memory usage tends to fluctuate frequently @@ -41,6 +46,10 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, #else (void)cache; #endif // ROCKSDB_LITE + + if (initiate_flushes_) { + InitFlushInitiationVars(buffer_size()); + } } WriteBufferManager::~WriteBufferManager() { @@ -48,6 +57,7 @@ WriteBufferManager::~WriteBufferManager() { std::unique_lock lock(mu_); assert(queue_.empty()); #endif + TerminateFlushesThread(); } std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { @@ -59,23 +69,37 @@ std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { } void WriteBufferManager::ReserveMem(size_t mem) { + auto is_enabled = enabled(); + size_t new_memory_used = 0U; + if (cache_res_mgr_ != nullptr) { - ReserveMemWithCache(mem); - } else if (enabled()) { - memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = ReserveMemWithCache(mem); + } else if (is_enabled) { + auto old_memory_used = + memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = old_memory_used + mem; + } + if (is_enabled) { + // Checking outside the locks is not reliable, but avoids locking + // unnecessarily which is expensive + if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) { + std::unique_lock lock(flushes_mu_); + ReevaluateNeedForMoreFlushes(new_memory_used); + } } // // fprintf(stderr, "WBM(%p) - Reserve(%d), memory_used:%d\n", (void*)this, (int)mem, (int)memory_used_); } // Should only be called from write thread -void WriteBufferManager::ReserveMemWithCache(size_t mem) { +size_t WriteBufferManager::ReserveMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. std::lock_guard lock(cache_res_mgr_mu_); - size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; + size_t old_mem_used = memory_used_.load(std::memory_order_relaxed); + size_t new_mem_used = old_mem_used + mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); @@ -85,8 +109,11 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; #else (void)mem; + return 0U; #endif // ROCKSDB_LITE } @@ -114,13 +141,15 @@ void WriteBufferManager::FreeMemAborted(size_t mem) { void WriteBufferManager::FreeMem(size_t mem) { const auto is_enabled = enabled(); + size_t new_memory_used = 0U; if (cache_res_mgr_ != nullptr) { - FreeMemWithCache(mem); + new_memory_used = FreeMemWithCache(mem); } else if (is_enabled) { - [[maybe_unused]] const auto curr_memory_used = + auto old_memory_used = memory_used_.fetch_sub(mem, std::memory_order_relaxed); - assert(curr_memory_used >= mem); + assert(old_memory_used >= mem); + new_memory_used = old_memory_used - mem; } // // fprintf(stderr, "WBM(%p, enabled:%d, cache:%d) - Freed(%d), memory_used:%d\n", (void*)this, is_enabled, (cache_res_mgr_ != nullptr), (int)mem, (int)memory_used_); @@ -137,18 +166,27 @@ void WriteBufferManager::FreeMem(size_t mem) { // Check if stall is active and can be ended. MaybeEndWriteStall(); + + if (is_enabled) { + // Checking outside the locks is not reliable, but avoids locking + // unnecessarily which is expensive + if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) { + std::unique_lock lock(flushes_mu_); + ReevaluateNeedForMoreFlushes(new_memory_used); + } + } } -void WriteBufferManager::FreeMemWithCache(size_t mem) { +size_t WriteBufferManager::FreeMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. std::lock_guard lock(cache_res_mgr_mu_); - const auto curr_memory_used = memory_used_.load(std::memory_order_relaxed); - assert(curr_memory_used >= mem); - size_t new_mem_used = curr_memory_used - mem; + const auto old_mem_used = memory_used_.load(std::memory_order_relaxed); + assert(old_mem_used >= mem); + size_t new_mem_used = old_mem_used - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); @@ -157,8 +195,11 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; #else (void)mem; + return 0U; #endif // ROCKSDB_LITE } @@ -240,13 +281,261 @@ std::string WriteBufferManager::GetPrintableOptions() const { char buffer[kBufferSize]; // The assumed width of the callers display code - int field_width = 47; + int field_width = 85; snprintf(buffer, kBufferSize, "%*s: %" ROCKSDB_PRIszt "\n", field_width, - "size", buffer_size()); + "wbm.size", buffer_size()); + ret.append(buffer); + + snprintf(buffer, kBufferSize, "%*s: %d\n", field_width, "wbm.allow_stalls", + allow_stall_); + ret.append(buffer); + + snprintf(buffer, kBufferSize, "%*s: %d\n", field_width, + "wbm.initiate_flushes", IsInitiatingFlushes()); ret.append(buffer); return ret; } +// ================================================================================================ +void WriteBufferManager::RegisterFlushInitiator( + void* initiator, InitiateFlushRequestCb request) { + { + std::unique_lock lock(flushes_initiators_mu_); + assert(IsInitiatorIdxValid(FindInitiator(initiator)) == false); + flush_initiators_.push_back({initiator, request}); + num_initiators_ = flush_initiators_.size(); + } + + WakeupFlushInitiationThread(); +} + +void WriteBufferManager::DeregisterFlushInitiator(void* initiator) { + { + std::unique_lock lock(flushes_initiators_mu_); + + auto initiator_idx = FindInitiator(initiator); + assert(IsInitiatorIdxValid(initiator_idx)); + + // Move to the next initiator (if there is one) in case the current one + // deregisters + if (initiator_idx == next_candidate_initiator_idx_) { + next_candidate_initiator_idx_ = CalcNextCandidateInitiatorIdx(); + } + + flush_initiators_.erase(flush_initiators_.begin() + initiator_idx); + num_initiators_ = flush_initiators_.size(); + } + + WakeupFlushInitiationThread(); +} + +void WriteBufferManager::InitFlushInitiationVars(size_t quota) { + assert(initiate_flushes_); + + { + std::unique_lock lock(flushes_mu_); + additional_flush_step_size_ = + quota * kStartFlushPercentThreshold / 100 / + flush_initiation_options_.max_num_parallel_flushes; + flush_initiation_start_size_ = additional_flush_step_size_; + // TODO - Update this to a formula. If it depends on the number of initators + // => update when that number changes + min_flush_size_ = 4 * 1024U * 1024U; // 4 MB + RecalcFlushInitiationSize(); + } + + if (flushes_thread_.joinable() == false) { + flushes_thread_ = + std::thread(&WriteBufferManager::InitiateFlushesThread, this); + } +} + +void WriteBufferManager::InitiateFlushesThread() { + while (true) { + // Should return true when the waiting should stop (no spurious wakeups + // guaranteed) + auto StopWaiting = [this]() { + return (new_flushes_wakeup_ && + (terminate_flushes_thread_ || (num_flushes_to_initiate_ > 0U))); + }; + + std::unique_lock lock(flushes_mu_); + flushes_wakeup_cv_.wait(lock, StopWaiting); + new_flushes_wakeup_ = false; + + if (terminate_flushes_thread_) { + break; + } + + // The code below tries to initiate num_flushes_to_initiate_ flushes by + // invoking its registered initiators, and requesting them to initiate a + // flush of a certain minimum size. The initiation is done in iterations. An + // iteration is an attempt to give evey initiator an opportunity to flush, + // in a round-robin ordering. An initiator may or may not be able to + // initiate a flush. Reasons for not initiating could be: + // - The flush is less than the specified minimum size. + // - The initiator is in the process of shutting down or being disposed of. + // + // The assumption is that in case flush initiation stopped when + // num_flushes_to_initiate_ == 0, there will be some future event that will + // wake up this thread and initiation attempts will be retried: + // - Initiator will be enabled + // - A flush in progress will end + // - The memory_used() will increase above additional_flush_initiation_size_ + + // Two iterations: + // 1. Flushes of a min size. + // 2. Flushes of any size + constexpr size_t kNumIters = 2U; + const std::array kMinFlushSizes{min_flush_size_, 0U}; + + auto iter = 0U; + while ((iter < kMinFlushSizes.size()) && (num_flushes_to_initiate_ > 0U)) { + auto num_repeated_failures_to_initiate = 0U; + while ((num_repeated_failures_to_initiate < num_initiators_) && + (num_flushes_to_initiate_ > 0U)) { + + bool was_flush_initiated = false; + // Unlocking the flushed_mu_ since flushing (via the initiator cb) may + // call a WBM service (e.g., ReserveMem()), that, in turn, needs to lock + // the same mutex => will get stuck + lock.unlock(); + { + std::unique_lock initiators_lock(flushes_initiators_mu_); + auto& initiator = flush_initiators_[next_candidate_initiator_idx_]; + next_candidate_initiator_idx_ = CalcNextCandidateInitiatorIdx(); + + // Updating counters before initiating flushes since we have released the lock because XXX + // Not recalculating flush initiation size since the increment & + // decrement cancel each other with respect to the recalc + ++num_running_flushes_; + --num_flushes_to_initiate_; + was_flush_initiated = initiator.cb(kMinFlushSizes[iter]); + if (!was_flush_initiated) { + --num_running_flushes_; + ++num_flushes_to_initiate_; + } + } + lock.lock(); + + if (was_flush_initiated) { + num_repeated_failures_to_initiate = 0U; + } else { + ++num_repeated_failures_to_initiate; + } + } + ++iter; + } + TEST_SYNC_POINT_CALLBACK( + "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt", + &num_flushes_to_initiate_); + } +} + +void WriteBufferManager::TerminateFlushesThread() { + { + std::unique_lock lock(flushes_mu_); + terminate_flushes_thread_ = true; + WakeupFlushInitiationThread(); + } + + if (flushes_thread_.joinable()) { + flushes_thread_.join(); + } +} + +void WriteBufferManager::FlushStarted(bool wbm_initiated) { + // num_running_flushes_ is incremented in our thread when initiating flushes + // => Already accounted for + if (wbm_initiated || !enabled()) { + return; + } + + std::unique_lock lock(flushes_mu_); + + ++num_running_flushes_; + size_t curr_memory_used = memory_usage(); + RecalcFlushInitiationSize(); + ReevaluateNeedForMoreFlushes(curr_memory_used); +} + +void WriteBufferManager::FlushEnded(bool /* wbm_initiated */) { + if (!enabled()) { + return; + } + + std::unique_lock lock(flushes_mu_); + + assert(num_running_flushes_ > 0U); + --num_running_flushes_; + size_t curr_memory_used = memory_usage(); + RecalcFlushInitiationSize(); + ReevaluateNeedForMoreFlushes(curr_memory_used); +} + +void WriteBufferManager::RecalcFlushInitiationSize() { + if (num_running_flushes_ + num_flushes_to_initiate_ >= + flush_initiation_options_.max_num_parallel_flushes) { + additional_flush_initiation_size_ = buffer_size(); + } else { + additional_flush_initiation_size_ = + flush_initiation_start_size_ + + additional_flush_step_size_ * + (num_running_flushes_ + num_flushes_to_initiate_); + } +} + +void WriteBufferManager::ReevaluateNeedForMoreFlushes(size_t curr_memory_used) { + // TODO - Assert flushes_mu_ is held at this point + assert(enabled()); + + // URQ - I SUGGEST USING HERE THE AMOUNT OF MEMORY STILL NOT MARKED FOR FLUSH + // (MUTABLE + IMMUTABLE) + if (ShouldInitiateAnotherFlush(curr_memory_used)) { + // need to schedule more + ++num_flushes_to_initiate_; + RecalcFlushInitiationSize(); + WakeupFlushInitiationThread(); + } +} + +uint64_t WriteBufferManager::FindInitiator(void* initiator) const { + // Assumes lock is held on the flushes_mu_ + + auto initiator_idx = kInvalidInitiatorIdx; + for (auto i = 0U; i < flush_initiators_.size(); ++i) { + if (flush_initiators_[i].initiator == initiator) { + initiator_idx = i; + break; + } + } + + return initiator_idx; +} + +void WriteBufferManager::TEST_WakeupFlushInitiationThread() { + std::unique_lock lock(flushes_mu_); + WakeupFlushInitiationThread(); +} + +void SanitizeOptionsToDisableFlushesBasedOnWriteBufferOptions( + Options& options) { + if (options.write_buffer_size == 0) + options.write_buffer_size = options.db_write_buffer_size / 2; + options.max_write_buffer_number = + (options.db_write_buffer_size / options.write_buffer_size + 1) * 2; + options.min_write_buffer_number_to_merge = + options.max_write_buffer_number / 2; + int compaction_trigger = options.level0_file_num_compaction_trigger; + if (compaction_trigger == 0) { + compaction_trigger = options.level0_file_num_compaction_trigger = 4; + } + // URQ - Why and should it also be part of the options sanitization? + // // options.max_background_compactions = options.max_background_flushes * 2; + // // options.max_background_jobs = + // // options.max_background_flushes + options.max_background_compactions; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index 3ac893be02..f809158bbd 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -8,6 +8,16 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/write_buffer_manager.h" + +#include +#include +#include +#include +#include +#include + +#include "rocksdb/cache.h" +#include "test_util/sync_point.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { @@ -304,6 +314,746 @@ TEST_F(WriteBufferManagerTest, CacheFull) { } #endif // ROCKSDB_LITE + +#define VALIDATE_USAGE_STATE(memory_change_size, expected_state, \ + expected_factor) \ + ValidateUsageState(__LINE__, memory_change_size, expected_state, \ + expected_factor) + +class WriteBufferManagerTestWithParams + : public WriteBufferManagerTest, + public ::testing::WithParamInterface> { + public: + void SetUp() override { + wbm_enabled_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + allow_stall_ = std::get<2>(GetParam()); + } + + bool wbm_enabled_; + bool cost_cache_; + bool allow_stall_; +}; + +// ========================================================================== +#define CALL_WRAPPER(func) \ + func; \ + ASSERT_FALSE(HasFailure()); + +// #1: Quota (size_t). 0 == WBM disabled +// #2: Cost to cache (Boolean) +class WriteBufferManagerFlushInitiationTest + : public WriteBufferManagerTest, + public ::testing::WithParamInterface> { + public: + void SetUp() override { + quota_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + allow_stall_ = std::get<2>(GetParam()); + + wbm_enabled_ = (quota_ > 0U); + cache_ = NewLRUCache(4 * 1024 * 1024, 2); + max_num_parallel_flushes_ = + WriteBufferManager::FlushInitiationOptions().max_num_parallel_flushes; + + CreateWbm(); + SetupAndEnableTestPoints(); + + actual_num_cbs_ = 0U; + expected_num_cbs_ = 0U; + validation_num_ = 0U; + expected_num_flushes_to_initiate_ = 0U; + expected_num_running_flushes_ = 0U; + } + + void TearDown() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(expected_cb_initiators_.empty()); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()); + ASSERT_TRUE(flush_cb_results_.empty()); + + initiators_.clear(); + } + + bool IsWbmDisabled() const { return (wbm_enabled_ == false); } + + void CreateWbm() { + auto wbm_quota = (wbm_enabled_ ? quota_ : 0U); + WriteBufferManager::FlushInitiationOptions initiation_options; + initiation_options.max_num_parallel_flushes = max_num_parallel_flushes_; + + ASSERT_GT(max_num_parallel_flushes_, 0U); + flush_step_size_ = quota_ / max_num_parallel_flushes_; + + if (cost_cache_) { + wbm_.reset(new WriteBufferManager(wbm_quota, cache_, + allow_stall_, true, + initiation_options)); + } else { + wbm_.reset(new WriteBufferManager(wbm_quota, nullptr, + allow_stall_, true, + initiation_options)); + } + ASSERT_EQ(wbm_->enabled(), wbm_enabled_); + ASSERT_TRUE(wbm_->IsInitiatingFlushes()); + } + + uint64_t CreateInitiator() { + auto initiator = std::make_unique(++next_initiator_id_); + auto initiator_id = *initiator; + initiators_.push_back(std::move(initiator)); + return initiator_id; + } + + void RegisterInitiator(uint64_t initiator_id) { + auto initiator = FindInitiator(initiator_id); + ASSERT_NE(initiator, nullptr); + if (initiator != nullptr) { + auto cb = + std::bind(&WriteBufferManagerFlushInitiationTest::FlushRequestCb, + this, std::placeholders::_1, initiator); + wbm_->RegisterFlushInitiator(initiator, cb); + } + } + + uint64_t CreateAndRegisterInitiator() { + auto initiator_id = CreateInitiator(); + RegisterInitiator(initiator_id); + return initiator_id; + } + + std::optional FindInitiatorIdx(uint64_t initiator_id) { + for (auto i = 0U; i < initiators_.size(); ++i) { + if (*initiators_[i] == initiator_id) { + return i; + } + } + + return {}; + } + + uint64_t* FindInitiator(uint64_t initiator_id) { + auto initiator_idx = FindInitiatorIdx(initiator_id); + if (initiator_idx.has_value()) { + return initiators_[initiator_idx.value()].get(); + } else { + ADD_FAILURE(); + return nullptr; + } + } + + void DeregisterInitiator(uint64_t initiator_id) { + auto initiator_idx = FindInitiatorIdx(initiator_id); + ASSERT_TRUE(initiator_idx.has_value()); + + if (initiator_idx.has_value()) { + wbm_->DeregisterFlushInitiator(initiators_[initiator_idx.value()].get()); + initiators_.erase(initiators_.begin() + initiator_idx.value()); + } + } + + struct ExpectedCbInfo { + uint64_t initiator_id; + size_t min_size_to_flush; + bool flush_cb_result; + }; + + void AddExpectedCbsInfos(const std::vector& cbs_infos) { + ASSERT_TRUE(expected_cb_initiators_.empty()); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()); + ASSERT_TRUE(flush_cb_results_.empty()); + + if (IsWbmDisabled()) { + return; + } + + for (const auto& cb_info : cbs_infos) { + auto initiator = FindInitiator(cb_info.initiator_id); + ASSERT_NE(initiator, nullptr); + expected_cb_initiators_.push_back(initiator); + + expected_cb_min_size_to_flush_.push_back(cb_info.min_size_to_flush); + flush_cb_results_.push_back(cb_info.flush_cb_result); + } + actual_num_cbs_ = 0U; + expected_num_cbs_ = cbs_infos.size(); + + ++validation_num_; + std::string test_point_name_suffix = std::to_string(validation_num_); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" + + test_point_name_suffix, + "ValidateState::WaitUntilValidtionPossible:" + + test_point_name_suffix}}); + } + + // Flush initiaion callback registered with the WBM + bool FlushRequestCb(size_t min_size_to_flush, void* initiator) { + EXPECT_TRUE(wbm_enabled_); + + ++actual_num_cbs_; + + if (expected_cb_min_size_to_flush_.empty() == false) { + EXPECT_EQ(expected_cb_min_size_to_flush_[0], min_size_to_flush); + expected_cb_min_size_to_flush_.erase( + expected_cb_min_size_to_flush_.begin()); + } else { + EXPECT_FALSE(expected_cb_min_size_to_flush_.empty()); + } + + if (expected_cb_initiators_.empty() == false) { + EXPECT_EQ(expected_cb_initiators_[0], initiator); + expected_cb_initiators_.erase(expected_cb_initiators_.begin()); + } else { + EXPECT_FALSE(expected_cb_initiators_.empty()); + } + + if (flush_cb_results_.empty() == false) { + bool result = flush_cb_results_[0]; + flush_cb_results_.erase(flush_cb_results_.begin()); + return result; + } else { + EXPECT_FALSE(flush_cb_results_.empty()); + // Arbitrarily return true as we must return a bool to compile + return true; + } + }; + + // Sync Test Point callback called when the flush initiation thread completes + // initating all flushes and resumes waiting for the condition variable to be + // signalled again + void DoneInitiationsAttemptTestPointCb(void* /* arg */) { + if (actual_num_cbs_ == expected_num_cbs_) { + auto sync_point_name = + "DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" + + std::to_string(validation_num_); + TEST_SYNC_POINT(sync_point_name); + } + } + + void SetupAndEnableTestPoints() { + if (IsWbmDisabled()) { + return; + } + + SyncPoint::GetInstance()->SetCallBack( + "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt", + [&](void* arg) { DoneInitiationsAttemptTestPointCb(arg); }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + + void ValidateState(bool wait_on_sync_point) { + if (wbm_enabled_ && wait_on_sync_point) { + auto sync_point_name = "ValidateState::WaitUntilValidtionPossible:" + + std::to_string(validation_num_); + TEST_SYNC_POINT(sync_point_name); + } + + ASSERT_EQ(wbm_->TEST_GetNumFlushesToInitiate(), + expected_num_flushes_to_initiate_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), expected_num_running_flushes_); + + ASSERT_TRUE(expected_cb_initiators_.empty()) + << "Num entries:" << expected_cb_initiators_.size(); + ASSERT_TRUE(expected_cb_min_size_to_flush_.empty()) + << "Num entries:" << expected_cb_min_size_to_flush_.size(); + ASSERT_TRUE(flush_cb_results_.empty()) + << "Num entries:" << flush_cb_results_.size(); + } + + void EndFlush(bool wbm_initiated, size_t released_size, + bool wait_on_sync_point = false) { + wbm_->FreeMem(released_size); + wbm_->FlushEnded(wbm_initiated /* wbm_initiated */); + DecNumRunningFlushes(); + ValidateState(wait_on_sync_point); + } + + void StartAndEndFlush(bool wbm_initiated, size_t released_size) { + wbm_->ScheduleFreeMem(released_size); + wbm_->FreeMemBegin(released_size); + + // "Run" the flush to completion & release the memory + wbm_->FlushStarted(wbm_initiated /* wbm_initiated */); + if ((wbm_initiated == false) && wbm_enabled_) { + ++expected_num_running_flushes_; + } + EndFlush(wbm_initiated, released_size); + } + + void IncNumRunningFlushes() { + if (wbm_enabled_) { + ++expected_num_running_flushes_; + } + } + + void DecNumRunningFlushes() { + if (wbm_enabled_) { + --expected_num_running_flushes_; + } + } + + void IncNumFlushesToInitiate() { + if (wbm_enabled_) { + ++expected_num_flushes_to_initiate_; + } + } + + void DecNumFlushesToInitiate() { + if (wbm_enabled_) { + --expected_num_flushes_to_initiate_; + } + } + + protected: + static constexpr size_t kDeafaultMinSizeToFlush = 4 * 1024 * 1024; + + protected: + std::unique_ptr wbm_; + + size_t quota_ = 0U; + bool wbm_enabled_; + bool cost_cache_; + std::shared_ptr cache_; + bool allow_stall_ = false; + size_t max_num_parallel_flushes_; + size_t flush_step_size_ = 0U; + + std::vector> initiators_; + uint64_t next_initiator_id_ = 0U; + std::vector expected_cb_initiators_; + std::vector expected_cb_min_size_to_flush_; + std::vector flush_cb_results_; + size_t actual_num_cbs_ = 0; + size_t expected_num_cbs_ = 0U; + size_t expected_num_flushes_to_initiate_ = 0U; + size_t expected_num_running_flushes_ = 0U; + size_t validation_num_ = 0U; +}; + +TEST_P(WriteBufferManagerFlushInitiationTest, Basic) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, NonWbmInitiatedFlush) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + + // Reach the 1st step => No need to initiate a flush (one is already running) + wbm_->ReserveMem(flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // End the non-wbm flush without releasing memory, just for testing purposes + // Expecting a wbm-initiated flush request since we are still over the step + wbm_->FlushEnded(false /* wbm_initiated */); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the wbm-initiated flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, MaxNumParallelFlushes) { + // Replace the WBM with a new WBM that is configured with our max num of + // parallel flushes + max_num_parallel_flushes_ = 3U; + ASSERT_NE(max_num_parallel_flushes_, + wbm_->GetFlushInitiationOptions().max_num_parallel_flushes); + CreateWbm(); + ASSERT_EQ(wbm_->GetFlushInitiationOptions().max_num_parallel_flushes, + max_num_parallel_flushes_); + + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + // Start 3 (max) number of non-wbm flushes + for (auto i = 0U; i < max_num_parallel_flushes_; ++i) { + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + } + + // Reserve memory to allow for up to 3 (max) wbm-initiated flushes + // However, 3 (max) are already running => no wbm-initaited flush expected + wbm_->ReserveMem(max_num_parallel_flushes_ * flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + // Start another (total of 4 > max) non-wbm flush + wbm_->ReserveMem(2 * flush_step_size_); + + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + wbm_->FlushStarted(false /* wbm_initiated */); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(false)); + + // End one of the non-wbm flushes 3 (max) still running, and usage requires + // max flushes + CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_)); + + // End another one of the non-wbm flushes => 2 (< max) running => + // Expecting one wbm-initiated + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + // Increasing since expecteing wbm to initiate it + IncNumRunningFlushes(); + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_, + true /* wait_on_sync_point */)); + + wbm_->ReserveMem(2 * flush_step_size_); + CALL_WRAPPER(ValidateState(false)); + + // End a wbm-initiated flushes => 2 (< max) running => Expecting one + // wbm-initiated + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + // Increasing since expecteing wbm to initiate it + IncNumRunningFlushes(); + wbm_->ScheduleFreeMem(flush_step_size_); + wbm_->FreeMemBegin(flush_step_size_); + CALL_WRAPPER(EndFlush(true /* wbm_initiated */, flush_step_size_, + true /* wait_on_sync_point */)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, JumpToQuota) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(quota_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, quota_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + FailureToStartFlushWhenRequested) { + // Register a single initiator + auto initiator_id = CreateAndRegisterInitiator(); + + // Setup two cb-s to fail to start the flush (flush_cb_result == false) + // First with kDeafaultMinSizeToFlush size, Second with 0 + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id, 0U, false /* flush_cb_result */}})); + + // Reach the 1st step => expecting the 2 requests set up above + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(true)); + + // Setup another two identical cb-s + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id, 0U, false /* flush_cb_result */}})); + + // Reserve a bit more, but still within the same step. This will initiate + // the next 2 request set up just above + wbm_->TEST_WakeupFlushInitiationThread(); + CALL_WRAPPER(ValidateState(true)); + + // Now, allow the second request to succeed + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id, 0U, true /* flush_cb_result */}})); + + // Reserve a bit more, but still within the same step. This will initiate + // the next 2 request set up just above + wbm_->TEST_WakeupFlushInitiationThread(); + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + DeregisterInitiator(initiator_id); +} + +// TODO - Update the test - Currently fails +TEST_P(WriteBufferManagerFlushInitiationTest, DISABLED_FlushInitiationSteps) { + // Too much (useless) effort to adapt to the disabled case so just skipping + if (IsWbmDisabled()) { + return; + } + auto initiator_id = CreateAndRegisterInitiator(); + + // Increase the usage gradually in half-steps, each time expecting another + // flush to be initiated + for (auto i = 0U; i < max_num_parallel_flushes_; ++i) { + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(true)); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + IncNumRunningFlushes(); + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(true)); + } + ASSERT_EQ(wbm_->memory_usage(), quota_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_); + + // Increase the usage over the quota. Not expecting any initiation activity + wbm_->ReserveMem(flush_step_size_ / 2); + wbm_->ReserveMem(flush_step_size_ / 2); + CALL_WRAPPER(ValidateState(false)); + + // Start all of the WBM flushes + some more that are NOT WBM flushes. + // No new flush should initiate + auto wbm_initiated = true; + size_t num_non_wbm_running_flushes = 0U; + for (auto i = 0U; i < 2 * max_num_parallel_flushes_; ++i) { + wbm_->FlushStarted(wbm_initiated); + if (wbm_initiated == false) { + IncNumRunningFlushes(); + ++num_non_wbm_running_flushes; + } + wbm_initiated = !wbm_initiated; + } + ASSERT_EQ(expected_num_running_flushes_ - num_non_wbm_running_flushes, + max_num_parallel_flushes_); + CALL_WRAPPER(ValidateState(false)); + + // Release flushes + memory so that we are at the quota with max num + // of parallel flushes + while (expected_num_running_flushes_ > max_num_parallel_flushes_) { + EndFlush(wbm_initiated, 0U /* released_size */); + wbm_initiated = !wbm_initiated; + } + wbm_->FreeMem(flush_step_size_); + ASSERT_EQ(wbm_->memory_usage(), quota_); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_); + CALL_WRAPPER(ValidateState(false)); + + // Decrease just below the current flush step size + wbm_->FreeMem(1U); + + while (wbm_->memory_usage() >= flush_step_size_) { + EndFlush(true, 0U /* released_size */); + CALL_WRAPPER(ValidateState(false)); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + IncNumRunningFlushes(); + EndFlush(false, 0U /* released_size */, true /* wait_on_sync_point */); + + wbm_->FreeMem(flush_step_size_); + } + ASSERT_EQ(wbm_->memory_usage(), flush_step_size_ - 1); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 1U); + + // End the last remaining flush and release all used memory + EndFlush(true, flush_step_size_ - 1 /* released_size */); + ASSERT_EQ(wbm_->memory_usage(), 0U); + ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 0U); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, RegisteringLate) { + // Reach the 1st step, but no registered initiators + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(false)); + + // Register an initiator and expect it to receive the initiation request + auto initiator_id = CreateInitiator(); + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + RegisterInitiator(initiator_id); + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, Deregistering) { + // Register a single initiator + auto initiator_id1 = CreateAndRegisterInitiator(); + + // initiator1 fails to initiate + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id1, 0U, false /* flush_cb_result */}})); + + // Reach the 1st step => expecting a single flush to be initiated + wbm_->ReserveMem(flush_step_size_); + IncNumFlushesToInitiate(); + CALL_WRAPPER(ValidateState(true)); + + // Deregisters and comes initiator2 + DeregisterInitiator(initiator_id1); + auto initiator_id2 = CreateInitiator(); + + // Set initiator2 to initiate the flush + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id2, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + RegisterInitiator(initiator_id2); + + DecNumFlushesToInitiate(); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, TwoInitiatorsBasic) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // Expect the 1st request to reach initiator1 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id2, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // Expect the 2nd request to reach initiator2 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator1 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + // "Run" the flush of initiator2 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); + DeregisterInitiator(initiator_id1); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + TwoInitiatorsFirstFailsToInitiate) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id2, kDeafaultMinSizeToFlush, false /* flush_cb_result */}, + {initiator_id1, 0U, false /* flush_cb_result */}, + {initiator_id2, 0U, true /* flush_cb_result */}})); + + // Expect the 1st request to reach initiator2 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator1 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + // Expect the 2nd request to reach initiator1 + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + + // "Run" the flush of initiator2 to completion & release the memory + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + + DeregisterInitiator(initiator_id2); + DeregisterInitiator(initiator_id1); +} + +TEST_P(WriteBufferManagerFlushInitiationTest, + TwoInitiatorsDeregisteringWhileBeingNextToFlush) { + // Register two initiators + auto initiator_id1 = CreateAndRegisterInitiator(); + auto initiator_id2 = CreateAndRegisterInitiator(); + + // Initiator1 initiates, initiator2 is next + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + if (wbm_enabled_) { + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 1U); + } + + // Initiator2 will be deregistered => prepare another initiation for + // initiator1 + CALL_WRAPPER(AddExpectedCbsInfos( + {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}})); + + DeregisterInitiator(initiator_id2); + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U); + + wbm_->ReserveMem(flush_step_size_); + IncNumRunningFlushes(); + CALL_WRAPPER(ValidateState(true)); + ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U); + + // "Run" both flushes to completion & release the memory + for (auto i = 0U; i < 2; ++i) { + CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_)); + } + + DeregisterInitiator(initiator_id1); +} + +INSTANTIATE_TEST_CASE_P(WriteBufferManagerTestWithParams, + WriteBufferManagerTestWithParams, + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool())); + +// Run the flush initiation tests in all combinations of: +// 1. WBM Enabled (buffer size > 0) / WBM Disabled (0 buffer size) +// 2. With and without costing to cache +// 3. Allow / Disallow delays and stalls +INSTANTIATE_TEST_CASE_P(WriteBufferManagerFlushInitiationTest, + WriteBufferManagerFlushInitiationTest, + ::testing::Combine(::testing::Values(10 * 1000, 0), + ::testing::Bool(), + ::testing::Bool())); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/options/db_options.cc b/options/db_options.cc index b8020eaa80..005acef836 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -826,13 +826,12 @@ void ImmutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); - ROCKS_LOG_HEADER(log, " Options.write_buffer_manager: %p", - write_buffer_manager.get()); - if (write_buffer_manager.get()) { - ROCKS_LOG_HEADER(log, " write_buffer_manager_options:"); - ROCKS_LOG_HEADER(log, "%s", - write_buffer_manager->GetPrintableOptions().c_str()); - } + ROCKS_LOG_HEADER( + log, " Options.write_buffer_manager: %p%s%s", + write_buffer_manager.get(), (write_buffer_manager.get() ? "\n" : ""), + (write_buffer_manager.get() + ? write_buffer_manager->GetPrintableOptions().c_str() + : "")); ROCKS_LOG_HEADER(log, " Options.access_hint_on_compaction_start: %d", static_cast(access_hint_on_compaction_start)); ROCKS_LOG_HEADER( diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 48b1e7915b..0f7391e251 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -330,6 +330,22 @@ DEFINE_int64(max_scan_distance, 0, DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); +DEFINE_bool(allow_wbm_stalls, false, + "Enable WBM write stalls and delays "); +DEFINE_bool(initiate_wbm_flushes, false, + "WBM will proactively initiate flushes (Speedb)." + "If false, WBM-related flushes will be initiated using the " + "ShouldFlush() service " + "of the WBM."); +DEFINE_bool(disable_non_wbm_initated_auto_flushes, true, + "In case FLAGS_initiate_wbm_flushes is true, this flag will " + "overwrite the applilcable " + "options so that only the WBM will initiate automatic flushes."); +DEFINE_uint32(max_num_parallel_flushes, 0, + "In case FLAGGS_initiate_wbm_flushes is true, this flag will " + "overwrite the default " + "max number of parallel flushes."); + DEFINE_int64(batch_size, 1, "Batch size"); static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) { @@ -3915,10 +3931,6 @@ class Benchmark { options.env = FLAGS_env; options.max_open_files = FLAGS_open_files; - if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { - options.write_buffer_manager.reset( - new WriteBufferManager(FLAGS_db_write_buffer_size, cache_)); - } options.arena_block_size = FLAGS_arena_block_size; options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; @@ -4324,6 +4336,18 @@ class Benchmark { options.comparator = test::BytewiseComparatorWithU64TsWrapper(); } + if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { + WriteBufferManager::FlushInitiationOptions flush_initiation_options; + if (FLAGS_max_num_parallel_flushes > 0U) { + flush_initiation_options.max_num_parallel_flushes = + FLAGS_max_num_parallel_flushes; + } + + options.write_buffer_manager.reset(new WriteBufferManager( + FLAGS_db_write_buffer_size, cache_, FLAGS_allow_wbm_stalls, + FLAGS_initiate_wbm_flushes, flush_initiation_options)); + } + // Integrated BlobDB options.enable_blob_files = FLAGS_enable_blob_files; options.min_blob_size = FLAGS_min_blob_size;