Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pace up limiter before triggering write stall #207

Merged
merged 5 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ ColumnFamilyData::ColumnFamilyData(
}
}

RecalculateWriteStallConditions(mutable_cf_options_);
RecalculateWriteStallConditions(mutable_cf_options_, ioptions_.rate_limiter);
}

// DB mutex held
Expand Down Expand Up @@ -657,7 +657,7 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

if (level0_file_num_compaction_trigger < 0) {
return std::numeric_limits<int>::max();
return port::kMaxInt32;
}

const int64_t twice_level0_trigger =
Expand Down Expand Up @@ -720,7 +720,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
}

WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
Expand Down Expand Up @@ -875,6 +875,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
4);
}
}
if (rate_limiter) {
// pace up limiter when close to write stall
if (write_stall_condition != WriteStallCondition::kNormal ||
vstorage->l0_delay_trigger_count() >=
0.8 * mutable_cf_options.level0_slowdown_writes_trigger ||
vstorage->estimated_compaction_needed_bytes() >=
0.6 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp();
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
}
return write_stall_condition;
Expand Down Expand Up @@ -1105,8 +1115,8 @@ void ColumnFamilyData::InstallSuperVersion(
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
super_version_->write_stall_condition = RecalculateWriteStallConditions(
mutable_cf_options, ioptions_.rate_limiter);

if (old_superversion != nullptr) {
// Reset SuperVersions cached in thread local storage.
Expand Down
3 changes: 2 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ class ColumnFamilyData {
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
RateLimiter* rate_limiter = nullptr);

void set_initialized() { initialized_.store(true); }

Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class RateLimiter {
return true;
}

virtual void PaceUp() {}

protected:
Mode GetMode() { return mode_; }

Expand Down
13 changes: 13 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
tuned_time_(NowMicrosMonotonic(env_)),
duration_highpri_bytes_through_(0),
duration_bytes_through_(0),
should_pace_up_(false),
ratio_delta_(0) {
total_requests_[0] = 0;
total_requests_[1] = 0;
Expand Down Expand Up @@ -326,6 +327,12 @@ Status WriteAmpBasedRateLimiter::Tune() {
} else if (util < 95 && ratio_delta_ > 0) {
ratio_delta_ -= 1;
}
if (should_pace_up_.load(std::memory_order_relaxed)) {
if (ratio_delta_ < 60) {
ratio_delta_ += 60; // effect lasts for at least 60 * kSecondsPerTune = 1m
}
should_pace_up_.store(false, std::memory_order_relaxed);
}

int64_t new_bytes_per_sec =
(ratio + ratio_padding + ratio_delta_) *
Expand All @@ -344,6 +351,12 @@ Status WriteAmpBasedRateLimiter::Tune() {
return Status::OK();
}

void WriteAmpBasedRateLimiter::PaceUp() {
if (auto_tuned_) {
should_pace_up_.store(true, std::memory_order_relaxed);
}
}

RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
Expand Down
3 changes: 3 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
return rate_bytes_per_sec_;
}

virtual void PaceUp() override;

private:
void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Expand Down Expand Up @@ -149,6 +151,7 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
WindowSmoother<kLongTermWindowSize> long_term_highpri_bytes_sampler_;
WindowSmoother<kRecentSmoothWindowSize, kRecentSmoothWindowSize>
limit_bytes_sampler_;
std::atomic<bool> should_pace_up_;
int32_t ratio_delta_;
};

Expand Down