diff --git a/db/column_family.cc b/db/column_family.cc index e33a7e7b32..6fc527e77e 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -872,7 +872,6 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, namespace { const uint64_t Gb = 1ull << 30; const uint64_t Mb = 1ull << 20; -const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. const int kMemtablePenalty = 10; } // namespace @@ -899,8 +898,8 @@ std::unique_ptr ColumnFamilyData::DynamicSetupDelay( compaction_needed_bytes, mutable_cf_options, write_stall_cause); assert(rate_divider >= 1); auto write_rate = static_cast(max_write_rate / rate_divider); - if (write_rate < kMinWriteRate) { - write_rate = kMinWriteRate; + if (write_rate < WriteController::kMinWriteRate) { + write_rate = WriteController::kMinWriteRate; } // GetDelayToken returns a DelayWriteToken and also sets the diff --git a/db/write_controller.cc b/db/write_controller.cc index 70c8ce7e65..023eff3131 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -24,7 +24,7 @@ std::unique_ptr WriteController::GetStopToken() { std::unique_ptr WriteController::GetDelayToken( uint64_t write_rate) { { - std::lock_guard lock(mu_); + std::lock_guard lock(metrics_mu_); if (0 == total_delayed_++) { // Starting delay, so reset counters. next_refill_time_ = 0; @@ -41,12 +41,12 @@ std::unique_ptr WriteController::GetDelayToken( uint64_t WriteController::TEST_GetMapMinRate() { return GetMapMinRate(); } void WriteController::AddToDbRateMap(CfIdToRateMap* cf_map) { - std::lock_guard lock(mu_for_map_); + std::lock_guard lock(map_mu_); db_id_to_write_rate_map_.insert(cf_map); } void WriteController::RemoveFromDbRateMap(CfIdToRateMap* cf_map) { - std::lock_guard lock(mu_for_map_); + std::lock_guard lock(map_mu_); db_id_to_write_rate_map_.erase(cf_map); } @@ -78,7 +78,7 @@ bool WriteController::IsMinRate(uint32_t id, CfIdToRateMap* cf_map) { void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate( uint32_t id, CfIdToRateMap* cf_map) { - std::lock_guard lock(mu_for_map_); + std::lock_guard lock(map_mu_); bool was_min = IsMinRate(id, cf_map); cf_map->erase(id); if (was_min) { @@ -94,7 +94,7 @@ void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate( uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id, CfIdToRateMap* cf_map, uint64_t cf_write_rate) { - std::lock_guard lock(mu_for_map_); + std::lock_guard lock(map_mu_); bool was_min = IsMinRate(id, cf_map); cf_map->insert_or_assign(id, cf_write_rate); if (cf_write_rate <= delayed_write_rate()) { @@ -107,7 +107,7 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id, } void WriteController::WaitOnCV(const ErrorHandler& error_handler) { - std::unique_lock lock(stop_mutex_); + std::unique_lock lock(stop_mu_); while (error_handler.GetBGError().ok() && IsStopped()) { TEST_SYNC_POINT("WriteController::WaitOnCV"); stop_cv_.wait(lock); @@ -135,10 +135,11 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) { return 0; } - auto credits = credit_in_bytes_.load(); - while (credits >= num_bytes) { - if (credit_in_bytes_.compare_exchange_weak(credits, credits - num_bytes)) - return 0; + std::lock_guard lock(metrics_mu_); + + if (credit_in_bytes_ >= num_bytes) { + credit_in_bytes_ -= num_bytes; + return 0; } // The frequency to get time inside DB mutex is less than one per refill // interval. @@ -148,8 +149,6 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) { // Refill every 1 ms const uint64_t kMicrosPerRefill = 1000; - std::lock_guard lock(mu_); - if (next_refill_time_ == 0) { // Start with an initial allotment of bytes for one interval next_refill_time_ = time_now; @@ -188,7 +187,7 @@ uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) { void WriteController::NotifyCV() { assert(total_stopped_ >= 1); { - std::lock_guard lock(stop_mutex_); + std::lock_guard lock(stop_mu_); --total_stopped_; } stop_cv_.notify_all(); diff --git a/db/write_controller.h b/db/write_controller.h index a3fc0e8e8d..095ec6d64d 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -29,18 +29,21 @@ class WriteController { explicit WriteController(bool dynamic_delay, uint64_t _delayed_write_rate = 1024u * 1024u * 16u, int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) - : total_stopped_(0), + : dynamic_delay_(dynamic_delay), + total_stopped_(0), total_delayed_(0), total_compaction_pressure_(0), credit_in_bytes_(0), next_refill_time_(0), - dynamic_delay_(dynamic_delay), low_pri_rate_limiter_( NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { set_max_delayed_write_rate(_delayed_write_rate); } ~WriteController() = default; + static constexpr uint64_t kMinWriteRate = + 16 * 1024u; // Minimum write rate 16KB/s. + // When an actor (column family) requests a stop token, all writes will be // stopped until the stop token is released (deleted) std::unique_ptr GetStopToken(); @@ -92,8 +95,6 @@ class WriteController { bool is_dynamic_delay() const { return dynamic_delay_; } - std::mutex& GetMapMutex() { return mu_for_map_; } - using CfIdToRateMap = std::unordered_map; void AddToDbRateMap(CfIdToRateMap* cf_map); @@ -115,8 +116,20 @@ class WriteController { bool IsMinRate(uint32_t id, CfIdToRateMap* cf_map); // returns the min rate from db_id_to_write_rate_map_ + // REQUIRES: write_controller map_mu_ mutex held. uint64_t GetMapMinRate(); + // Whether Speedb's dynamic delay is used + bool dynamic_delay_; + + std::mutex map_mu_; + std::unordered_set db_id_to_write_rate_map_; + + // The mutex used by stop_cv_ + std::mutex stop_mu_; + std::condition_variable stop_cv_; + // end of methods and members used when dynamic_delay_ == true. + uint64_t NowMicrosMonotonic(SystemClock* clock); friend class WriteControllerToken; @@ -129,7 +142,7 @@ class WriteController { std::atomic total_compaction_pressure_; // mutex to protect below 4 members - std::mutex mu_; + std::mutex metrics_mu_; // Number of bytes allowed to write without delay std::atomic credit_in_bytes_; // Next time that we can add more credit of bytes @@ -139,16 +152,6 @@ class WriteController { // Current write rate (bytes / second) std::atomic delayed_write_rate_; - // Whether Speedb's dynamic delay is used - bool dynamic_delay_; - - std::mutex mu_for_map_; - std::unordered_set db_id_to_write_rate_map_; - - std::condition_variable stop_cv_; - // The mutex used by stop_cv_ - std::mutex stop_mutex_; - std::unique_ptr low_pri_rate_limiter_; };