From 12a4b3b0f0ebcdf1a87c40c9a3f20742176db65c Mon Sep 17 00:00:00 2001 From: Udi Date: Sun, 11 Sep 2022 17:28:28 +0300 Subject: [PATCH] Slowdown writes based on WBM's dirty memory usage (#114) --- db/column_family.cc | 9 +- db/column_family_test.cc | 3 +- db/db_impl/db_impl.h | 11 ++ db/db_impl/db_impl_write.cc | 44 ++++++ db/db_test.cc | 12 +- db/db_write_buffer_manager_test.cc | 134 ++++++++++++++++- db/write_controller.cc | 4 +- db/write_controller.h | 28 +++- db/write_controller_test.cc | 89 ++++++++--- include/rocksdb/write_buffer_manager.h | 76 ++++++++-- memtable/write_buffer_manager.cc | 195 ++++++++++++++++++++++--- memtable/write_buffer_manager_test.cc | 161 ++++++++++++++++++++ 12 files changed, 703 insertions(+), 63 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 7558d2f45b..a401bb308a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -799,7 +799,8 @@ std::unique_ptr SetupDelay( } } } - return write_controller->GetDelayToken(write_rate); + return write_controller->GetDelayToken(WriteController::DelaySource::kCF, + write_rate); } int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, @@ -1020,8 +1021,10 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( // increase signal. if (needed_delay) { uint64_t write_rate = write_controller->delayed_write_rate(); - write_controller->set_delayed_write_rate(static_cast( - static_cast(write_rate) * kDelayRecoverSlowdownRatio)); + write_controller->set_delayed_write_rate( + WriteController::DelaySource::kCF, + static_cast(static_cast(write_rate) * + kDelayRecoverSlowdownRatio)); // Set the low pri limit to be 1/4 the delayed write rate. // Note we don't reset this value even after delay condition is relased. // Low-pri rate will continue to apply if there is a compaction diff --git a/db/column_family_test.cc b/db/column_family_test.cc index c55eb12905..e05354eb9a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2697,7 +2697,8 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); mutable_cf_options.disable_auto_compactions = true; - dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate); + dbfull()->TEST_write_controler().set_delayed_write_rate( + WriteController::DelaySource::kCF, kBaseRate); RecalculateWriteStallConditions(cfd, mutable_cf_options); ASSERT_TRUE(!IsDbWriteStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 97e3d1b8a9..88640bf7b0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1094,6 +1094,10 @@ class DBImpl : public DB { PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const; #endif // !ROCKSDB_LITE + bool TEST_has_write_controller_token() const { + return (write_controller_token_.get() != nullptr); + } + #endif // NDEBUG // persist stats to column family "_persistent_stats" @@ -2393,6 +2397,13 @@ class DBImpl : public DB { // Pointer to WriteBufferManager stalling interface. std::unique_ptr wbm_stall_; + + // Members used for WBM's required delay + std::unique_ptr write_controller_token_; + WriteBufferManager::UsageState wbm_spdb_usage_state_ = + WriteBufferManager::UsageState::kNone; + uint64_t wbm_spdb_delayed_write_factor_ = + WriteBufferManager::kNoneDelayedWriteFactor; }; extern Options SanitizeOptions(const std::string& db, const Options& src, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 39657d4623..ca30b4e039 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1028,6 +1028,27 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { } } +namespace { + +std::unique_ptr SetupDelayFromFactor( + WriteController& write_controller, uint64_t delay_factor) { + assert(delay_factor > 0U); + constexpr uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. + + auto max_write_rate = write_controller.max_delayed_write_rate(); + + auto wbm_write_rate = max_write_rate; + if (max_write_rate >= kMinWriteRate) { + // If user gives rate less than kMinWriteRate, don't adjust it. + wbm_write_rate = max_write_rate / delay_factor; + } + + return write_controller.GetDelayToken(WriteController::DelaySource::kWBM, + wbm_write_rate); +} + +} // namespace + Status DBImpl::PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, WriteContext* write_context) { @@ -1071,6 +1092,29 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); PERF_TIMER_GUARD(write_pre_and_post_process_time); + // Handle latest WBM calculated write delay, if applicable + if (status.ok() && write_buffer_manager_ && + write_buffer_manager_->IsDelayAllowed()) { + auto [new_usage_state, new_delayed_write_factor] = + write_buffer_manager_->GetUsageStateInfo(); + + if (UNLIKELY( + (wbm_spdb_usage_state_ != new_usage_state) || + (wbm_spdb_delayed_write_factor_ != new_delayed_write_factor))) { + if (new_usage_state != WriteBufferManager::UsageState::kDelay) { + write_controller_token_.reset(); + } else if ((wbm_spdb_usage_state_ != + WriteBufferManager::UsageState::kDelay) || + (wbm_spdb_delayed_write_factor_ != new_delayed_write_factor)) { + write_controller_token_ = + SetupDelayFromFactor(write_controller_, new_delayed_write_factor); + } + + wbm_spdb_usage_state_ = new_usage_state; + wbm_spdb_delayed_write_factor_ = new_delayed_write_factor; + } + } + if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.NeedsDelay()))) { PERF_TIMER_STOP(write_pre_and_post_process_time); diff --git a/db/db_test.cc b/db/db_test.cc index 1f4e64ae59..d8fe9faf68 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -235,7 +235,8 @@ TEST_F(DBTest, SkipDelay) { // when we do Put // TODO(myabandeh): this is time dependent and could potentially make // the test flaky - auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + auto token = dbfull()->TEST_write_controler().GetDelayToken( + WriteController::DelaySource::kCF, 1); std::atomic sleep_count(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DelayWrite:Sleep", @@ -262,7 +263,8 @@ TEST_F(DBTest, SkipDelay) { ASSERT_GE(wait_count.load(), 0); token.reset(); - token = dbfull()->TEST_write_controler().GetDelayToken(1000000); + token = dbfull()->TEST_write_controler().GetDelayToken( + WriteController::DelaySource::kCF, 1000000); wo.no_slowdown = false; ASSERT_OK(dbfull()->Put(wo, "foo3", large_value)); ASSERT_GE(sleep_count.load(), 1); @@ -297,7 +299,8 @@ TEST_F(DBTest, MixedSlowdownOptions) { // when we do Put // TODO(myabandeh): this is time dependent and could potentially make // the test flaky - auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + auto token = dbfull()->TEST_write_controler().GetDelayToken( + WriteController::DelaySource::kCF, 1); std::atomic sleep_count(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DelayWrite:BeginWriteStallDone", [&](void* /*arg*/) { @@ -351,7 +354,8 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) { // when we do Put // TODO(myabandeh): this is time dependent and could potentially make // the test flaky - auto token = dbfull()->TEST_write_controler().GetDelayToken(1); + auto token = dbfull()->TEST_write_controler().GetDelayToken( + WriteController::DelaySource::kCF, 1); std::atomic sleep_count(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DelayWrite:Sleep", [&](void* /*arg*/) { diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index c1e8f7100e..af723d193b 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -14,10 +14,12 @@ namespace ROCKSDB_NAMESPACE { class DBWriteBufferManagerTest : public DBTestBase, - public testing::WithParamInterface { + public ::testing::WithParamInterface { public: DBWriteBufferManagerTest() : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } bool cost_cache_; }; @@ -27,7 +29,6 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -70,7 +71,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -197,7 +197,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -314,7 +313,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -456,7 +454,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -618,7 +615,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { options.write_buffer_size = 500000; // this is never hit std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); ASSERT_LT(cache->GetUsage(), 256 * 1024); - cost_cache_ = GetParam(); if (cost_cache_) { options.write_buffer_manager.reset( @@ -780,9 +776,133 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +class DBWriteBufferManagerTest1 : public DBTestBase, + public ::testing::WithParamInterface { + public: + DBWriteBufferManagerTest1() + : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {} + + void SetUp() override { cost_cache_ = GetParam(); } + bool cost_cache_; +}; + +TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossCFs) { + constexpr size_t kQuota = 100 * 1000; + constexpr size_t kDelayThreshold = + WriteBufferManager::kStartDelayPercentThreshold * kQuota / 100; + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = kQuota; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, nullptr, true)); + } + WriteOptions wo; + wo.disableWAL = true; + + CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options); + + // Reach the delay threshold by writing to two cf-s, no flush + ASSERT_OK(Put(0, Key(1), DummyString(kDelayThreshold / 2), wo)); + ASSERT_OK(Put(1, Key(1), DummyString(kDelayThreshold / 2), wo)); + + // Write another byte to trigger writing and a delay token in the write + // controller + auto& write_controller = dbfull()->TEST_write_controler(); + ASSERT_FALSE(dbfull()->TEST_has_write_controller_token()); + ASSERT_FALSE(write_controller.NeedsDelay()); + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_TRUE(dbfull()->TEST_has_write_controller_token()); + ASSERT_TRUE(write_controller.NeedsDelay()); + + Flush(1); + + // Delay token should be released when the next write arrives + ASSERT_TRUE(dbfull()->TEST_has_write_controller_token()); + ASSERT_TRUE(write_controller.NeedsDelay()); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + ASSERT_FALSE(write_controller.NeedsDelay()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossDBs) { + constexpr size_t kQuota = 500 * 1000; + constexpr size_t kDelayThreshold = + WriteBufferManager::kStartDelayPercentThreshold * kQuota / 100; + + std::vector dbnames; + std::vector dbs; + int num_dbs = 3; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, cache, true)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(kQuota, nullptr, true)); + } + + CreateAndReopenWithCF({"cf1", "cf2"}, options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + WriteOptions wo; + wo.disableWAL = true; + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(kDelayThreshold / num_dbs))); + } + + // Write another byte to trigger writing and a delay token in the write + // controller + auto& write_controller = dbfull()->TEST_write_controler(); + ASSERT_FALSE(dbfull()->TEST_has_write_controller_token()); + ASSERT_FALSE(write_controller.NeedsDelay()); + + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_TRUE(dbfull()->TEST_has_write_controller_token()); + ASSERT_TRUE(write_controller.NeedsDelay()); + + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); +INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1, + ::testing::Bool()); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/write_controller.cc b/db/write_controller.cc index c5f7443752..66190f34d4 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -20,7 +20,7 @@ std::unique_ptr WriteController::GetStopToken() { } std::unique_ptr WriteController::GetDelayToken( - uint64_t write_rate) { + DelaySource source, uint64_t write_rate) { if (0 == total_delayed_++) { // Starting delay, so reset counters. next_refill_time_ = 0; @@ -29,7 +29,7 @@ std::unique_ptr WriteController::GetDelayToken( // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in // next_refill_time_ will be based on an old rate. This rate will apply // for subsequent additional debts and for the next refill. - set_delayed_write_rate(write_rate); + set_delayed_write_rate(source, write_rate); return std::unique_ptr(new DelayWriteToken(this)); } diff --git a/db/write_controller.h b/db/write_controller.h index 88bd1417f1..85871b279b 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -7,8 +7,10 @@ #include +#include #include #include + #include "rocksdb/rate_limiter.h" namespace ROCKSDB_NAMESPACE { @@ -21,6 +23,11 @@ class WriteControllerToken; // All of the methods here (including WriteControllerToken's destructors) need // to be called while holding DB mutex class WriteController { + public: + enum class DelaySource { kCF = 0, kWBM = 1, kNumSources }; + static constexpr auto kNumDelaySources = + static_cast(DelaySource::kNumSources); + public: explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) @@ -32,6 +39,8 @@ class WriteController { low_pri_rate_limiter_( NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { set_max_delayed_write_rate(_delayed_write_rate); + std::fill(delayed_write_rates_.begin(), delayed_write_rates_.end(), + max_delayed_write_rate()); } ~WriteController() = default; @@ -43,7 +52,7 @@ class WriteController { // write needs to call GetDelay() with number of bytes writing to the DB, // which returns number of microseconds to sleep. std::unique_ptr GetDelayToken( - uint64_t delayed_write_rate); + DelaySource source, uint64_t delayed_write_rate); // When an actor (column family) requests a moderate token, compaction // threads will be increased std::unique_ptr GetCompactionPressureToken(); @@ -58,14 +67,19 @@ class WriteController { // num_bytes: how many number of bytes to put into the DB. // Prerequisite: DB mutex held. uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes); - void set_delayed_write_rate(uint64_t write_rate) { + void set_delayed_write_rate(DelaySource source, uint64_t write_rate) { // avoid divide 0 if (write_rate == 0) { write_rate = 1u; } else if (write_rate > max_delayed_write_rate()) { write_rate = max_delayed_write_rate(); } - delayed_write_rate_ = write_rate; + auto source_value = static_cast(source); + assert(source_value < delayed_write_rates_.size()); + delayed_write_rates_[source_value] = write_rate; + + delayed_write_rate_ = *std::min_element(delayed_write_rates_.begin(), + delayed_write_rates_.end()); } void set_max_delayed_write_rate(uint64_t write_rate) { @@ -84,6 +98,13 @@ class WriteController { RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } + // Not thread-safe + uint64_t TEST_delayed_write_rate(DelaySource source) const { + auto source_value = static_cast(source); + assert(source_value < delayed_write_rates_.size()); + return delayed_write_rates_[source_value]; + } + private: uint64_t NowMicrosMonotonic(SystemClock* clock); @@ -103,6 +124,7 @@ class WriteController { // Write rate set when initialization or by `DBImpl::SetDBOptions` uint64_t max_delayed_write_rate_; // Current write rate (bytes / second) + std::array delayed_write_rates_; uint64_t delayed_write_rate_; std::unique_ptr low_pri_rate_limiter_; diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index 1f7cf999aa..170428c6b5 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -41,38 +41,86 @@ TEST_F(WriteControllerTest, BasicAPI) { EXPECT_EQ(0, controller.GetDelay(clock_.get(), 100 MB)); // set, get - controller.set_delayed_write_rate(20 MBPS); + controller.set_delayed_write_rate(WriteController::DelaySource::kCF, 20 MBPS); EXPECT_EQ(controller.delayed_write_rate(), 20 MBPS); EXPECT_FALSE(controller.IsStopped()); EXPECT_FALSE(controller.NeedsDelay()); EXPECT_EQ(0, controller.GetDelay(clock_.get(), 100 MB)); + controller.set_delayed_write_rate(WriteController::DelaySource::kWBM, + 30 MBPS); + ASSERT_EQ(controller.delayed_write_rate(), 20 MBPS); + + controller.set_delayed_write_rate(WriteController::DelaySource::kWBM, + 10 MBPS); + ASSERT_EQ(controller.delayed_write_rate(), 10 MBPS); + + controller.set_delayed_write_rate(WriteController::DelaySource::kWBM, + 35 MBPS); + ASSERT_EQ(controller.delayed_write_rate(), 20 MBPS); + + controller.set_delayed_write_rate(WriteController::DelaySource::kWBM, + controller.max_delayed_write_rate()); + ASSERT_EQ(controller.delayed_write_rate(), 20 MBPS); + { // set with token, get - auto delay_token_0 = controller.GetDelayToken(10 MBPS); + auto delay_token_0 = + controller.GetDelayToken(WriteController::DelaySource::kCF, 10 MBPS); + EXPECT_EQ(controller.delayed_write_rate(), 10 MBPS); + EXPECT_FALSE(controller.IsStopped()); + EXPECT_TRUE(controller.NeedsDelay()); + + delay_token_0.reset(); + EXPECT_EQ(controller.delayed_write_rate(), 10 MBPS); + EXPECT_FALSE(controller.IsStopped()); + EXPECT_FALSE(controller.NeedsDelay()); + EXPECT_EQ(0, controller.GetDelay(clock_.get(), 20 MB)); + + delay_token_0 = + controller.GetDelayToken(WriteController::DelaySource::kCF, 10 MBPS); EXPECT_EQ(controller.delayed_write_rate(), 10 MBPS); EXPECT_FALSE(controller.IsStopped()); EXPECT_TRUE(controller.NeedsDelay()); + // test with delay EXPECT_EQ(2 SECS, controller.GetDelay(clock_.get(), 20 MB)); clock_->now_micros_ += 2 SECS; // pay the "debt" - auto delay_token_1 = controller.GetDelayToken(2 MBPS); + auto delay_token_1 = + controller.GetDelayToken(WriteController::DelaySource::kCF, 2 MBPS); EXPECT_EQ(10 SECS, controller.GetDelay(clock_.get(), 20 MB)); clock_->now_micros_ += 10 SECS; // pay the "debt" - auto delay_token_2 = controller.GetDelayToken(1 MBPS); + auto delay_token_2 = + controller.GetDelayToken(WriteController::DelaySource::kWBM, 1 MBPS); EXPECT_EQ(20 SECS, controller.GetDelay(clock_.get(), 20 MB)); clock_->now_micros_ += 20 SECS; // pay the "debt" - auto delay_token_3 = controller.GetDelayToken(20 MBPS); + auto delay_token_3 = + controller.GetDelayToken(WriteController::DelaySource::kCF, 20 MBPS); + EXPECT_EQ(20 SECS, controller.GetDelay(clock_.get(), 20 MB)); + clock_->now_micros_ += 20 SECS; // pay the "debt" + + auto delay_token_4 = + controller.GetDelayToken(WriteController::DelaySource::kWBM, 20 MBPS); + EXPECT_EQ(1 SECS, controller.GetDelay(clock_.get(), 20 MB)); + clock_->now_micros_ += 1 SECS; // pay the "debt" + + auto delay_token_5 = + controller.GetDelayToken(WriteController::DelaySource::kWBM, 30 MBPS); EXPECT_EQ(1 SECS, controller.GetDelay(clock_.get(), 20 MB)); clock_->now_micros_ += 1 SECS; // pay the "debt" - // 60M is more than the max rate of 40M. Max rate will be used. EXPECT_EQ(controller.delayed_write_rate(), 20 MBPS); - auto delay_token_4 = - controller.GetDelayToken(controller.delayed_write_rate() * 3); + + auto delay_token_6 = controller.GetDelayToken( + WriteController::DelaySource::kCF, controller.delayed_write_rate() * 3); + auto delay_token_7 = + controller.GetDelayToken(WriteController::DelaySource::kWBM, + controller.delayed_write_rate() * 3); + + // 60M is more than the max rate of 40M. Max rate will be used. EXPECT_EQ(controller.delayed_write_rate(), 40 MBPS); EXPECT_EQ(static_cast(0.5 SECS), controller.GetDelay(clock_.get(), 20 MB)); @@ -111,8 +159,8 @@ TEST_F(WriteControllerTest, StartFilled) { // Attempt to write two things that combined would be allowed within // a single refill interval - auto delay_token_0 = - controller.GetDelayToken(controller.delayed_write_rate()); + auto delay_token_0 = controller.GetDelayToken( + WriteController::DelaySource::kCF, controller.delayed_write_rate()); // Verify no delay because write rate has not been exceeded within // refill interval. @@ -142,7 +190,8 @@ TEST_F(WriteControllerTest, DebtAccumulation) { // would reset the debt on every GetDelayToken.) uint64_t debt = 0; for (unsigned i = 0; i < tokens.size(); ++i) { - tokens[i] = controller.GetDelayToken((i + 1u) MBPS); + tokens[i] = controller.GetDelayToken(WriteController::DelaySource::kCF, + (i + 1u) MBPS); uint64_t delay = controller.GetDelay(clock_.get(), 63 MB); ASSERT_GT(delay, debt); uint64_t incremental = delay - debt; @@ -159,7 +208,8 @@ TEST_F(WriteControllerTest, DebtAccumulation) { // Debt is accumulated in time, not in bytes, so this new write // limit is not applied to prior requested delays, even it they are // in progress. - tokens[i] = controller.GetDelayToken((i + 1u) MBPS); + tokens[i] = controller.GetDelayToken(WriteController::DelaySource::kCF, + (i + 1u) MBPS); uint64_t delay = controller.GetDelay(clock_.get(), 63 MB); ASSERT_GT(delay, debt); uint64_t incremental = delay - debt; @@ -187,7 +237,8 @@ TEST_F(WriteControllerTest, DebtAccumulation) { } // All tokens released. // Verify that releasing all tokens pays down debt, even with no time passage. - tokens[0] = controller.GetDelayToken(1 MBPS); + tokens[0] = + controller.GetDelayToken(WriteController::DelaySource::kCF, 1 MBPS); ASSERT_EQ(0U, controller.GetDelay(clock_.get(), 100u /*small bytes*/)); } @@ -198,7 +249,8 @@ TEST_F(WriteControllerTest, CreditAccumulation) { std::array, 10> tokens; // Ensure started - tokens[0] = controller.GetDelayToken(1 MBPS); + tokens[0] = + controller.GetDelayToken(WriteController::DelaySource::kCF, 1 MBPS); ASSERT_EQ(10 SECS, controller.GetDelay(clock_.get(), 10 MB)); clock_->now_micros_ += 10 SECS; @@ -208,7 +260,8 @@ TEST_F(WriteControllerTest, CreditAccumulation) { // Spend some credit (burst of I/O) for (unsigned i = 0; i < tokens.size(); ++i) { - tokens[i] = controller.GetDelayToken((i + 1u) MBPS); + tokens[i] = controller.GetDelayToken(WriteController::DelaySource::kCF, + (i + 1u) MBPS); ASSERT_EQ(0U, controller.GetDelay(clock_.get(), 63 MB)); // In WriteController, credit is accumulated in bytes, not in time. // After an "unnecessary" delay, all of our time credit will be @@ -218,7 +271,8 @@ TEST_F(WriteControllerTest, CreditAccumulation) { credit -= 63 MB; } // Spend remaining credit - tokens[0] = controller.GetDelayToken(1 MBPS); + tokens[0] = + controller.GetDelayToken(WriteController::DelaySource::kCF, 1 MBPS); ASSERT_EQ(0U, controller.GetDelay(clock_.get(), credit)); // Verify ASSERT_EQ(10 SECS, controller.GetDelay(clock_.get(), 10 MB)); @@ -235,7 +289,8 @@ TEST_F(WriteControllerTest, CreditAccumulation) { // All tokens released. // Verify credit is wiped away on new delay. - tokens[0] = controller.GetDelayToken(1 MBPS); + tokens[0] = + controller.GetDelayToken(WriteController::DelaySource::kCF, 1 MBPS); ASSERT_EQ(10 SECS, controller.GetDelay(clock_.get(), 10 MB)); } diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 89296d001b..e7e3ca3184 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "rocksdb/cache.h" @@ -35,6 +36,16 @@ class StallInterface { }; class WriteBufferManager final { + public: + // Delay Mechanism (allow_delays_and_stalls==true) definitions + static constexpr uint64_t kStartDelayPercentThreshold = 80U; + + enum class UsageState { kNone, kDelay, kStop }; + + static constexpr uint64_t kNoneDelayedWriteFactor = 0U; + static constexpr uint64_t kMaxDelayedWriteFactor = 200U; + static constexpr uint64_t kStopDelayedWriteFactor = kMaxDelayedWriteFactor; + public: // Parameters: // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped. @@ -44,12 +55,23 @@ class WriteBufferManager final { // cost the memory allocated to the cache. It can be used even if _buffer_size // = 0. // - // allow_stall: if set true, it will enable stalling of writes when - // memory_usage() exceeds buffer_size. It will wait for flush to complete and - // memory usage to drop down. + // allow_delays_and_stalls: if set true, it will enable delays and stall as + // described below. + // Delays: if set to true, it will start delaying of writes when + // memory_usage() exceeds the kStartDelayPercentThreshold percent threshold + // of the buffer size. The WBM calculates a delay factor that is increasing + // as memory_usage() increases. When applicable, the WBM will notify its + // registered clients about the applicable delay factor. Clients are + // expected to set their respective delayed write rates accordingly. When + // memory_usage() reaches buffer_size(), the (optional) WBM stall mechanism + // kicks in if enabled. (see allow_delays_and_stalls above) + // Stalls: stalling of writes when memory_usage() exceeds buffer_size. It + // will wait for flush to complete and + // memory usage to drop down. explicit WriteBufferManager(size_t _buffer_size, std::shared_ptr cache = {}, - bool allow_stall = false); + bool allow_delays_and_stalls = true); + // No copying allowed WriteBufferManager(const WriteBufferManager&) = delete; WriteBufferManager& operator=(const WriteBufferManager&) = delete; @@ -108,6 +130,9 @@ class WriteBufferManager final { // Check if stall is active and can be ended. MaybeEndWriteStall(); + if (enabled()) { + UpdateUsageState(memory_usage(), 0 /* mem_changed_size */, new_size); + } } // Below functions should be called by RocksDB internally. @@ -135,11 +160,12 @@ class WriteBufferManager final { // We stall the writes untill memory_usage drops below buffer_size. When the // function returns true, all writer threads (including one checking this // condition) across all DBs will be stalled. Stall is allowed only if user - // pass allow_stall = true during WriteBufferManager instance creation. + // pass allow_delays_and_stalls = true during WriteBufferManager instance + // creation. // // Should only be called by RocksDB internally . bool ShouldStall() const { - if (!allow_stall_ || !enabled()) { + if (!allow_delays_and_stalls_ || !enabled()) { return false; } @@ -187,6 +213,36 @@ class WriteBufferManager final { std::string GetPrintableOptions() const; + public: + bool IsDelayAllowed() const { return allow_delays_and_stalls_; } + std::pair GetUsageStateInfo() const { + return ParseCodedUsageState(GetCodedUsageState()); + } + + private: + // The usage + delay factor are coded in a single (atomic) uint64_t value as + // follows: kNone - as 0 (kNoneCodedUsageState) kStop - as 1 + max delay + // factor (kStopCodedUsageState) kDelay - as the delay factor itself, which + // will actually be used for the delay token + static constexpr uint64_t kNoneCodedUsageState = 0U; + static constexpr uint64_t kStopCodedUsageState = kMaxDelayedWriteFactor + 1; + + void UpdateUsageState(size_t new_memory_used, ssize_t mem_changed_size, + size_t quota); + + uint64_t CalcNewCodedUsageState(size_t new_memory_used, + ssize_t memory_changed_size, size_t quota, + uint64_t old_coded_usage_state); + + uint64_t GetCodedUsageState() const { + return coded_usage_state_.load(std::memory_order_relaxed); + } + + static uint64_t CalcCodedUsageState(UsageState usage_state, + uint64_t delay_factor); + static std::pair ParseCodedUsageState( + uint64_t coded_usage_state); + private: std::atomic buffer_size_; std::atomic mutable_limit_; @@ -202,12 +258,14 @@ class WriteBufferManager final { std::list queue_; // Protects the queue_ and stall_active_. std::mutex mu_; - bool allow_stall_; + bool allow_delays_and_stalls_ = true; // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall() // while holding mu_, but it can be read without a lock. std::atomic stall_active_; + std::atomic coded_usage_state_ = kNoneCodedUsageState; - void ReserveMemWithCache(size_t mem); - void FreeMemWithCache(size_t mem); + // Return the new memory usage + size_t ReserveMemWithCache(size_t mem); + size_t FreeMemWithCache(size_t mem); }; } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 052af05b4a..70779fa99c 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -20,14 +20,14 @@ namespace ROCKSDB_NAMESPACE { WriteBufferManager::WriteBufferManager(size_t _buffer_size, std::shared_ptr cache, - bool allow_stall) + bool allow_delays_and_stalls) : buffer_size_(_buffer_size), mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), memory_inactive_(0), memory_being_freed_(0U), cache_res_mgr_(nullptr), - allow_stall_(allow_stall), + allow_delays_and_stalls_(allow_delays_and_stalls), stall_active_(false) { #ifndef ROCKSDB_LITE if (cache) { @@ -59,22 +59,31 @@ std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { } void WriteBufferManager::ReserveMem(size_t mem) { + auto is_enabled = enabled(); + size_t new_memory_used = 0U; + if (cache_res_mgr_ != nullptr) { - ReserveMemWithCache(mem); - } else if (enabled()) { - memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = ReserveMemWithCache(mem); + } else if (is_enabled) { + auto old_memory_used = + memory_used_.fetch_add(mem, std::memory_order_relaxed); + new_memory_used = old_memory_used + mem; + } + if (is_enabled) { + UpdateUsageState(new_memory_used, mem, buffer_size()); } } // Should only be called from write thread -void WriteBufferManager::ReserveMemWithCache(size_t mem) { +size_t WriteBufferManager::ReserveMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. std::lock_guard lock(cache_res_mgr_mu_); - size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; + size_t old_mem_used = memory_used_.load(std::memory_order_relaxed); + size_t new_mem_used = old_mem_used + mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); @@ -84,8 +93,11 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; #else (void)mem; + return 0U; #endif // ROCKSDB_LITE } @@ -113,13 +125,15 @@ void WriteBufferManager::FreeMemAborted(size_t mem) { void WriteBufferManager::FreeMem(size_t mem) { const auto is_enabled = enabled(); + size_t new_memory_used = 0U; if (cache_res_mgr_ != nullptr) { - FreeMemWithCache(mem); + new_memory_used = FreeMemWithCache(mem); } else if (is_enabled) { - [[maybe_unused]] const auto curr_memory_used = + auto old_memory_used = memory_used_.fetch_sub(mem, std::memory_order_relaxed); - assert(curr_memory_used >= mem); + assert(old_memory_used >= mem); + new_memory_used = old_memory_used - mem; } if (is_enabled) { @@ -130,22 +144,29 @@ void WriteBufferManager::FreeMem(size_t mem) { assert(curr_memory_inactive >= mem); assert(curr_memory_being_freed >= mem); + + UpdateUsageState(new_memory_used, -mem, buffer_size()); } // Check if stall is active and can be ended. MaybeEndWriteStall(); + + if (is_enabled) { + UpdateUsageState(new_memory_used, -mem, buffer_size()); + } } -void WriteBufferManager::FreeMemWithCache(size_t mem) { +size_t WriteBufferManager::FreeMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE assert(cache_res_mgr_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. std::lock_guard lock(cache_res_mgr_mu_); - const auto curr_memory_used = memory_used_.load(std::memory_order_relaxed); - assert(curr_memory_used >= mem); - size_t new_mem_used = curr_memory_used - mem; + size_t old_mem_used = memory_used_.load(std::memory_order_relaxed); + assert(old_mem_used >= mem); + + size_t new_mem_used = old_mem_used - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used); @@ -154,14 +175,17 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { // [TODO] We'll need to improve it in the future and figure out what to do on // error s.PermitUncheckedError(); + + return new_mem_used; #else (void)mem; + return 0U; #endif // ROCKSDB_LITE } void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { assert(wbm_stall != nullptr); - assert(allow_stall_); + assert(allow_delays_and_stalls_); // Allocate outside of the lock. std::list new_node = {wbm_stall}; @@ -186,7 +210,7 @@ void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { void WriteBufferManager::MaybeEndWriteStall() { // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock // the writers. - if (!allow_stall_) { + if (!allow_delays_and_stalls_) { return; } @@ -218,7 +242,7 @@ void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) { // Deallocate the removed nodes outside of the lock. std::list cleanup; - if (enabled() && allow_stall_) { + if (enabled() && allow_delays_and_stalls_) { std::unique_lock lock(mu_); for (auto it = queue_.begin(); it != queue_.end();) { auto next = std::next(it); @@ -246,4 +270,141 @@ std::string WriteBufferManager::GetPrintableOptions() const { return ret; } +namespace { + +uint64_t CalcDelayFactor(size_t quota, size_t updated_memory_used, + size_t usage_start_delay_threshold) { + assert(updated_memory_used >= usage_start_delay_threshold); + double extra_used_memory = updated_memory_used - usage_start_delay_threshold; + double max_used_memory = quota - usage_start_delay_threshold; + + auto delay_factor = + (WriteBufferManager::kMaxDelayedWriteFactor * extra_used_memory) / + max_used_memory; + if (delay_factor < 1U) { + delay_factor = 1U; + } + return delay_factor; +} + +} // Unnamed Namespace + +uint64_t WriteBufferManager::CalcNewCodedUsageState( + size_t new_memory_used, ssize_t memory_changed_size, size_t quota, + uint64_t old_coded_usage_state) { + auto [old_usage_state, old_delay_factor] = + ParseCodedUsageState(old_coded_usage_state); + + auto new_usage_state = old_usage_state; + auto new_delay_factor = old_delay_factor; + auto usage_start_delay_threshold = + (WriteBufferManager::kStartDelayPercentThreshold * quota) / 100; + auto change_steps = quota / 100; + + if (new_memory_used < usage_start_delay_threshold) { + new_usage_state = WriteBufferManager::UsageState::kNone; + } else if (new_memory_used >= quota) { + new_usage_state = WriteBufferManager::UsageState::kStop; + } else { + new_usage_state = WriteBufferManager::UsageState::kDelay; + } + + auto calc_new_delay_factor = false; + + if (new_usage_state != old_usage_state) { + if (new_usage_state == WriteBufferManager::UsageState::kDelay) { + calc_new_delay_factor = true; + } + } else if (new_usage_state == WriteBufferManager::UsageState::kDelay) { + if (memory_changed_size == 0) { + calc_new_delay_factor = true; + } else { + auto old_memory_used = new_memory_used - memory_changed_size; + // Calculate & notify only if the change is more than one "step" + if ((old_memory_used / change_steps) != + (new_memory_used / change_steps)) { + calc_new_delay_factor = true; + } + } + } + + if (calc_new_delay_factor) { + new_delay_factor = + CalcDelayFactor(quota, new_memory_used, usage_start_delay_threshold); + } + + return CalcCodedUsageState(new_usage_state, new_delay_factor); +} + +uint64_t WriteBufferManager::CalcCodedUsageState(UsageState usage_state, + uint64_t delay_factor) { + switch (usage_state) { + case UsageState::kNone: + return kNoneCodedUsageState; + case UsageState::kDelay: + assert((delay_factor > kNoneCodedUsageState) && + (delay_factor <= kStopCodedUsageState)); + + if (delay_factor <= kNoneCodedUsageState) { + return kNoneCodedUsageState + 1; + } else if (delay_factor > kStopCodedUsageState) { + delay_factor = kStopCodedUsageState; + } + return delay_factor; + case UsageState::kStop: + return kStopCodedUsageState; + default: + assert(0); + // We should never get here (BUG). + return kNoneCodedUsageState; + } +} + +auto WriteBufferManager::ParseCodedUsageState(uint64_t coded_usage_state) + -> std::pair { + if (coded_usage_state <= kNoneCodedUsageState) { + return {UsageState::kNone, kNoneDelayedWriteFactor}; + } else if (coded_usage_state < kStopCodedUsageState) { + return {UsageState::kDelay, coded_usage_state}; + } else { + return {UsageState::kStop, kStopDelayedWriteFactor}; + } +} + +void WriteBufferManager::UpdateUsageState(size_t new_memory_used, + ssize_t memory_changed_size, + size_t quota) { + assert(enabled()); + if (allow_delays_and_stalls_ == false) { + return; + } + + auto done = false; + auto old_coded_usage_state = coded_usage_state_.load(); + auto new_coded_usage_state = old_coded_usage_state; + while (done == false) { + new_coded_usage_state = CalcNewCodedUsageState( + new_memory_used, memory_changed_size, quota, old_coded_usage_state); + + if (old_coded_usage_state != new_coded_usage_state) { + // Try to update the usage state with the usage state calculated by the + // current thread. Failure (has_update_succeeded == false) means one or + // more threads have updated the current state, rendering our own + // calculation irrelevant. In case has_update_succeeded==false, + // old_coded_usage_state will be the value of the state that was updated + // by the other thread(s). + done = coded_usage_state_.compare_exchange_strong(old_coded_usage_state, + new_coded_usage_state); + + if (done == false) { + // Retry. However, + new_memory_used = memory_usage(); + memory_changed_size = 0U; + } + } else { + done = true; + } + } +} + } // namespace ROCKSDB_NAMESPACE diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index 3ac893be02..5487a4acd0 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "rocksdb/write_buffer_manager.h" + +#include "rocksdb/cache.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { @@ -304,6 +306,165 @@ TEST_F(WriteBufferManagerTest, CacheFull) { } #endif // ROCKSDB_LITE + +#define VALIDATE_USAGE_STATE(memory_change_size, expected_state, \ + expected_factor) \ + ValidateUsageState(__LINE__, memory_change_size, expected_state, \ + expected_factor) + +class WriteBufferManagerTestWithParams + : public WriteBufferManagerTest, + public ::testing::WithParamInterface> { + public: + void SetUp() override { + wbm_enabled_ = std::get<0>(GetParam()); + cost_cache_ = std::get<1>(GetParam()); + allow_delays_and_stalls_ = std::get<2>(GetParam()); + } + + bool wbm_enabled_; + bool cost_cache_; + bool allow_delays_and_stalls_; +}; +// Test that the write buffer manager sends the expected usage notifications +TEST_P(WriteBufferManagerTestWithParams, UsageNotifications) { + constexpr size_t kQuota = 10 * 1000; + constexpr size_t kStepSize = kQuota / 100; + constexpr size_t kDelayThreshold = + WriteBufferManager::kStartDelayPercentThreshold * kQuota / 100; + constexpr size_t kMaxUsed = kQuota - kDelayThreshold; + + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + + std::unique_ptr wbf; + + auto wbm_quota = (wbm_enabled_ ? kQuota : 0U); + if (cost_cache_) { + wbf.reset( + new WriteBufferManager(wbm_quota, cache, allow_delays_and_stalls_)); + } else { + wbf.reset( + new WriteBufferManager(wbm_quota, nullptr, allow_delays_and_stalls_)); + } + ASSERT_EQ(wbf->enabled(), wbm_enabled_); + + size_t expected_usage = 0U; + auto ExpectedDelayFactor = [&](uint64_t extra_used) { + return (extra_used * WriteBufferManager::kMaxDelayedWriteFactor) / kMaxUsed; + }; + + auto ValidateUsageState = [&](unsigned long line, size_t memory_change_size, + WriteBufferManager::UsageState expected_state, + uint64_t expected_factor) { + const auto location_str = + "write_buffer_manager_test.cc:" + std::to_string(line) + "\n"; + + if (wbm_enabled_ || cost_cache_) { + expected_usage += memory_change_size; + } + ASSERT_EQ(wbf->memory_usage(), expected_usage) << location_str; + + if (wbm_enabled_ && allow_delays_and_stalls_) { + auto [actual_state, actual_delay_factor] = wbf->GetUsageStateInfo(); + ASSERT_EQ(actual_state, expected_state) << location_str; + ASSERT_EQ(actual_delay_factor, expected_factor) << location_str; + } + }; + + // Initial state + VALIDATE_USAGE_STATE(0, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + auto FreeMem = [&, this](size_t mem) { + wbf->ScheduleFreeMem(mem); + wbf->FreeMemBegin(mem); + wbf->FreeMem(mem); + }; + + // Jump straight to quota + wbf->ReserveMem(kQuota); + VALIDATE_USAGE_STATE(kQuota, WriteBufferManager::UsageState::kStop, + WriteBufferManager::kStopDelayedWriteFactor); + + // And back to 0 again + FreeMem(kQuota); + VALIDATE_USAGE_STATE(-kQuota, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + // Small reservations, below soft limit + wbf->ReserveMem(1000); + VALIDATE_USAGE_STATE(1000, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + wbf->ReserveMem(2000); + VALIDATE_USAGE_STATE(2000, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + FreeMem(3000); + VALIDATE_USAGE_STATE(-3000, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + // 0 => soft limit + wbf->ReserveMem(kDelayThreshold); + VALIDATE_USAGE_STATE(kDelayThreshold, WriteBufferManager::UsageState::kDelay, + 1U); + + // A bit more, but still within the same "step" => same delay factor + wbf->ReserveMem(kStepSize - 1); + VALIDATE_USAGE_STATE(kStepSize - 1, WriteBufferManager::UsageState::kDelay, + 1U); + + // Cross the step => Delay factor updated + wbf->ReserveMem(1); + VALIDATE_USAGE_STATE(1, WriteBufferManager::UsageState::kDelay, + ExpectedDelayFactor(kStepSize)); + + // Free all => None + FreeMem(expected_usage); + VALIDATE_USAGE_STATE(-expected_usage, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + // None -> Stop (usage == quota) + wbf->ReserveMem(kQuota); + VALIDATE_USAGE_STATE(kQuota, WriteBufferManager::UsageState::kStop, + WriteBufferManager::kMaxDelayedWriteFactor); + + // Increasing the quota, usage as is => Now in the none + wbf->SetBufferSize(wbm_quota * 2); + VALIDATE_USAGE_STATE(0, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); + + // Restoring the quota + wbf->SetBufferSize(wbm_quota); + VALIDATE_USAGE_STATE(0, WriteBufferManager::UsageState::kStop, + WriteBufferManager::kMaxDelayedWriteFactor); + + // 1 byte below quota => Delay with max factor + FreeMem(1); + VALIDATE_USAGE_STATE(-1, WriteBufferManager::UsageState::kDelay, + ExpectedDelayFactor(kMaxUsed - 1)); + + // An entire step below => delay factor updated + FreeMem(kStepSize); + VALIDATE_USAGE_STATE(-kStepSize, WriteBufferManager::UsageState::kDelay, + ExpectedDelayFactor(kMaxUsed - 1 - kStepSize)); + + // Again in the top "step" + wbf->ReserveMem(1); + VALIDATE_USAGE_STATE(1, WriteBufferManager::UsageState::kDelay, + ExpectedDelayFactor(kMaxUsed - kStepSize)); + + // And back to 0 to wrap it up + FreeMem(expected_usage); + VALIDATE_USAGE_STATE(-expected_usage, WriteBufferManager::UsageState::kNone, + WriteBufferManager::kNoneDelayedWriteFactor); +} + +INSTANTIATE_TEST_CASE_P(WriteBufferManagerTestWithParams, + WriteBufferManagerTestWithParams, + ::testing::Combine(testing::Bool(), testing::Bool(), + testing::Bool())); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {