Skip to content

Commit

Permalink
Global delay bug fix: use credits only under mutex (#438)
Browse files Browse the repository at this point in the history
While at it, rename and reorder some members
  • Loading branch information
Yuval-Ariel authored and udi-speedb committed Nov 13, 2023
1 parent e21c0c3 commit 2dd826a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 31 deletions.
5 changes: 2 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,6 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
namespace {
const uint64_t Gb = 1ull << 30;
const uint64_t Mb = 1ull << 20;
const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
const int kMemtablePenalty = 10;
} // namespace

Expand All @@ -899,8 +898,8 @@ std::unique_ptr<WriteControllerToken> ColumnFamilyData::DynamicSetupDelay(
compaction_needed_bytes, mutable_cf_options, write_stall_cause);
assert(rate_divider >= 1);
auto write_rate = static_cast<uint64_t>(max_write_rate / rate_divider);
if (write_rate < kMinWriteRate) {
write_rate = kMinWriteRate;
if (write_rate < WriteController::kMinWriteRate) {
write_rate = WriteController::kMinWriteRate;
}

// GetDelayToken returns a DelayWriteToken and also sets the
Expand Down
25 changes: 12 additions & 13 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t write_rate) {
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::mutex> lock(metrics_mu_);
if (0 == total_delayed_++) {
// Starting delay, so reset counters.
next_refill_time_ = 0;
Expand All @@ -41,12 +41,12 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t WriteController::TEST_GetMapMinRate() { return GetMapMinRate(); }

void WriteController::AddToDbRateMap(CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
db_id_to_write_rate_map_.insert(cf_map);
}

void WriteController::RemoveFromDbRateMap(CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
db_id_to_write_rate_map_.erase(cf_map);
}

Expand Down Expand Up @@ -78,7 +78,7 @@ bool WriteController::IsMinRate(uint32_t id, CfIdToRateMap* cf_map) {

void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate(
uint32_t id, CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(id, cf_map);
cf_map->erase(id);
if (was_min) {
Expand All @@ -94,7 +94,7 @@ void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate(
uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
CfIdToRateMap* cf_map,
uint64_t cf_write_rate) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(id, cf_map);
cf_map->insert_or_assign(id, cf_write_rate);
if (cf_write_rate <= delayed_write_rate()) {
Expand All @@ -107,7 +107,7 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
}

void WriteController::WaitOnCV(const ErrorHandler& error_handler) {
std::unique_lock<std::mutex> lock(stop_mutex_);
std::unique_lock<std::mutex> lock(stop_mu_);
while (error_handler.GetBGError().ok() && IsStopped()) {
TEST_SYNC_POINT("WriteController::WaitOnCV");
stop_cv_.wait(lock);
Expand Down Expand Up @@ -135,10 +135,11 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
return 0;
}

auto credits = credit_in_bytes_.load();
while (credits >= num_bytes) {
if (credit_in_bytes_.compare_exchange_weak(credits, credits - num_bytes))
return 0;
std::lock_guard<std::mutex> lock(metrics_mu_);

if (credit_in_bytes_ >= num_bytes) {
credit_in_bytes_ -= num_bytes;
return 0;
}
// The frequency to get time inside DB mutex is less than one per refill
// interval.
Expand All @@ -148,8 +149,6 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
// Refill every 1 ms
const uint64_t kMicrosPerRefill = 1000;

std::lock_guard<std::mutex> lock(mu_);

if (next_refill_time_ == 0) {
// Start with an initial allotment of bytes for one interval
next_refill_time_ = time_now;
Expand Down Expand Up @@ -188,7 +187,7 @@ uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
void WriteController::NotifyCV() {
assert(total_stopped_ >= 1);
{
std::lock_guard<std::mutex> lock(stop_mutex_);
std::lock_guard<std::mutex> lock(stop_mu_);
--total_stopped_;
}
stop_cv_.notify_all();
Expand Down
33 changes: 18 additions & 15 deletions db/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ class WriteController {
explicit WriteController(bool dynamic_delay,
uint64_t _delayed_write_rate = 1024u * 1024u * 16u,
int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
: total_stopped_(0),
: dynamic_delay_(dynamic_delay),
total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
credit_in_bytes_(0),
next_refill_time_(0),
dynamic_delay_(dynamic_delay),
low_pri_rate_limiter_(
NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
set_max_delayed_write_rate(_delayed_write_rate);
}
~WriteController() = default;

static constexpr uint64_t kMinWriteRate =
16 * 1024u; // Minimum write rate 16KB/s.

// When an actor (column family) requests a stop token, all writes will be
// stopped until the stop token is released (deleted)
std::unique_ptr<WriteControllerToken> GetStopToken();
Expand Down Expand Up @@ -92,8 +95,6 @@ class WriteController {

bool is_dynamic_delay() const { return dynamic_delay_; }

std::mutex& GetMapMutex() { return mu_for_map_; }

using CfIdToRateMap = std::unordered_map<uint32_t, uint64_t>;

void AddToDbRateMap(CfIdToRateMap* cf_map);
Expand All @@ -115,8 +116,20 @@ class WriteController {
bool IsMinRate(uint32_t id, CfIdToRateMap* cf_map);

// returns the min rate from db_id_to_write_rate_map_
// REQUIRES: write_controller map_mu_ mutex held.
uint64_t GetMapMinRate();

// Whether Speedb's dynamic delay is used
bool dynamic_delay_;

std::mutex map_mu_;
std::unordered_set<CfIdToRateMap*> db_id_to_write_rate_map_;

// The mutex used by stop_cv_
std::mutex stop_mu_;
std::condition_variable stop_cv_;
// end of methods and members used when dynamic_delay_ == true.

uint64_t NowMicrosMonotonic(SystemClock* clock);

friend class WriteControllerToken;
Expand All @@ -129,7 +142,7 @@ class WriteController {
std::atomic<int> total_compaction_pressure_;

// mutex to protect below 4 members
std::mutex mu_;
std::mutex metrics_mu_;
// Number of bytes allowed to write without delay
std::atomic<uint64_t> credit_in_bytes_;
// Next time that we can add more credit of bytes
Expand All @@ -139,16 +152,6 @@ class WriteController {
// Current write rate (bytes / second)
std::atomic<uint64_t> delayed_write_rate_;

// Whether Speedb's dynamic delay is used
bool dynamic_delay_;

std::mutex mu_for_map_;
std::unordered_set<CfIdToRateMap*> db_id_to_write_rate_map_;

std::condition_variable stop_cv_;
// The mutex used by stop_cv_
std::mutex stop_mutex_;

std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
};

Expand Down

0 comments on commit 2dd826a

Please sign in to comment.