Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global delay bug fix: check again credits under mutex #438

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,6 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
namespace {
const uint64_t Gb = 1ull << 30;
const uint64_t Mb = 1ull << 20;
const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
const int kMemtablePenalty = 10;
} // namespace

Expand All @@ -925,8 +924,8 @@ std::unique_ptr<WriteControllerToken> ColumnFamilyData::DynamicSetupDelay(
compaction_needed_bytes, mutable_cf_options, write_stall_cause);
assert(rate_divider >= 1);
auto write_rate = static_cast<uint64_t>(max_write_rate / rate_divider);
if (write_rate < kMinWriteRate) {
write_rate = kMinWriteRate;
if (write_rate < WriteController::kMinWriteRate) {
write_rate = WriteController::kMinWriteRate;
}

// GetDelayToken returns a DelayWriteToken and also sets the
Expand Down
25 changes: 12 additions & 13 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t write_rate) {
{
std::lock_guard<std::mutex> lock(mu_);
std::lock_guard<std::mutex> lock(metrics_mu_);
if (0 == total_delayed_++) {
// Starting delay, so reset counters.
next_refill_time_ = 0;
Expand All @@ -41,12 +41,12 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t WriteController::TEST_GetMapMinRate() { return GetMapMinRate(); }

void WriteController::AddToDbRateMap(CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
db_id_to_write_rate_map_.insert(cf_map);
}

void WriteController::RemoveFromDbRateMap(CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
db_id_to_write_rate_map_.erase(cf_map);
}

Expand Down Expand Up @@ -78,7 +78,7 @@ bool WriteController::IsMinRate(uint32_t id, CfIdToRateMap* cf_map) {

void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate(
uint32_t id, CfIdToRateMap* cf_map) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(id, cf_map);
cf_map->erase(id);
if (was_min) {
Expand All @@ -94,7 +94,7 @@ void WriteController::DeleteSelfFromMapAndMaybeUpdateDelayRate(
uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
CfIdToRateMap* cf_map,
uint64_t cf_write_rate) {
std::lock_guard<std::mutex> lock(mu_for_map_);
std::lock_guard<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(id, cf_map);
cf_map->insert_or_assign(id, cf_write_rate);
if (cf_write_rate <= delayed_write_rate()) {
Expand All @@ -107,7 +107,7 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
}

void WriteController::WaitOnCV(const ErrorHandler& error_handler) {
std::unique_lock<std::mutex> lock(stop_mutex_);
std::unique_lock<std::mutex> lock(stop_mu_);
while (error_handler.GetBGError().ok() && IsStopped()) {
TEST_SYNC_POINT("WriteController::WaitOnCV");
stop_cv_.wait(lock);
Expand Down Expand Up @@ -135,10 +135,11 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
return 0;
}

auto credits = credit_in_bytes_.load();
while (credits >= num_bytes) {
if (credit_in_bytes_.compare_exchange_weak(credits, credits - num_bytes))
return 0;
std::lock_guard<std::mutex> lock(metrics_mu_);

if (credit_in_bytes_ >= num_bytes) {
credit_in_bytes_ -= num_bytes;
return 0;
}
// The frequency to get time inside DB mutex is less than one per refill
// interval.
Expand All @@ -148,8 +149,6 @@ uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
// Refill every 1 ms
const uint64_t kMicrosPerRefill = 1000;

std::lock_guard<std::mutex> lock(mu_);

if (next_refill_time_ == 0) {
// Start with an initial allotment of bytes for one interval
next_refill_time_ = time_now;
Expand Down Expand Up @@ -188,7 +187,7 @@ uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
void WriteController::NotifyCV() {
assert(total_stopped_ >= 1);
{
std::lock_guard<std::mutex> lock(stop_mutex_);
std::lock_guard<std::mutex> lock(stop_mu_);
--total_stopped_;
}
stop_cv_.notify_all();
Expand Down
33 changes: 18 additions & 15 deletions db/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ class WriteController {
explicit WriteController(bool dynamic_delay,
uint64_t _delayed_write_rate = 1024u * 1024u * 16u,
int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
: total_stopped_(0),
: dynamic_delay_(dynamic_delay),
total_stopped_(0),
total_delayed_(0),
total_compaction_pressure_(0),
credit_in_bytes_(0),
next_refill_time_(0),
dynamic_delay_(dynamic_delay),
low_pri_rate_limiter_(
NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
set_max_delayed_write_rate(_delayed_write_rate);
}
~WriteController() = default;

static constexpr uint64_t kMinWriteRate =
16 * 1024u; // Minimum write rate 16KB/s.

// When an actor (column family) requests a stop token, all writes will be
// stopped until the stop token is released (deleted)
std::unique_ptr<WriteControllerToken> GetStopToken();
Expand Down Expand Up @@ -92,8 +95,6 @@ class WriteController {

bool is_dynamic_delay() const { return dynamic_delay_; }

std::mutex& GetMapMutex() { return mu_for_map_; }

using CfIdToRateMap = std::unordered_map<uint32_t, uint64_t>;

void AddToDbRateMap(CfIdToRateMap* cf_map);
Expand All @@ -115,8 +116,20 @@ class WriteController {
bool IsMinRate(uint32_t id, CfIdToRateMap* cf_map);

// returns the min rate from db_id_to_write_rate_map_
// REQUIRES: write_controller map_mu_ mutex held.
uint64_t GetMapMinRate();

// Whether Speedb's dynamic delay is used
bool dynamic_delay_;

std::mutex map_mu_;
std::unordered_set<CfIdToRateMap*> db_id_to_write_rate_map_;

// The mutex used by stop_cv_
std::mutex stop_mu_;
std::condition_variable stop_cv_;
// end of methods and members used when dynamic_delay_ == true.

uint64_t NowMicrosMonotonic(SystemClock* clock);

friend class WriteControllerToken;
Expand All @@ -129,7 +142,7 @@ class WriteController {
std::atomic<int> total_compaction_pressure_;

// mutex to protect below 4 members
std::mutex mu_;
std::mutex metrics_mu_;
// Number of bytes allowed to write without delay
std::atomic<uint64_t> credit_in_bytes_;
// Next time that we can add more credit of bytes
Expand All @@ -139,16 +152,6 @@ class WriteController {
// Current write rate (bytes / second)
std::atomic<uint64_t> delayed_write_rate_;

// Whether Speedb's dynamic delay is used
bool dynamic_delay_;

std::mutex mu_for_map_;
std::unordered_set<CfIdToRateMap*> db_id_to_write_rate_map_;

std::condition_variable stop_cv_;
// The mutex used by stop_cv_
std::mutex stop_mutex_;

std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
};

Expand Down