Skip to content

Commit

Permalink
Pace up limiter before triggering write stall (tikv#207)
Browse files Browse the repository at this point in the history
Proactively pace up auto-tuned rate limiter when LSM shape (L0-count/pending-bytes) is close to trigger write stall.

Test plan: sysbench wide_table 512 threads 100 columns

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie committed May 10, 2022
1 parent 6038a43 commit d08e7cc
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 6 deletions.
21 changes: 16 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,8 @@ ColumnFamilyData::ColumnFamilyData(
}
}

RecalculateWriteStallConditions(mutable_cf_options_);
RecalculateWriteStallConditions(mutable_cf_options_,
ioptions_.rate_limiter.get());
}

// DB mutex held
Expand Down Expand Up @@ -808,7 +809,7 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

if (level0_file_num_compaction_trigger < 0) {
return std::numeric_limits<int>::max();
return port::kMaxInt32;
}

const int64_t twice_level0_trigger =
Expand Down Expand Up @@ -874,7 +875,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
}

WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
Expand Down Expand Up @@ -1030,6 +1031,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
4);
}
}
if (rate_limiter) {
// pace up limiter when close to write stall
if (write_stall_condition != WriteStallCondition::kNormal ||
vstorage->l0_delay_trigger_count() >=
0.8 * mutable_cf_options.level0_slowdown_writes_trigger ||
vstorage->estimated_compaction_needed_bytes() >=
0.6 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp();
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
}
return write_stall_condition;
Expand Down Expand Up @@ -1272,8 +1283,8 @@ void ColumnFamilyData::InstallSuperVersion(
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
super_version_->write_stall_condition = RecalculateWriteStallConditions(
mutable_cf_options, ioptions_.rate_limiter.get());

if (old_superversion != nullptr) {
// Reset SuperVersions cached in thread local storage.
Expand Down
3 changes: 2 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class ColumnFamilyData {
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
RateLimiter* rate_limiter = nullptr);

void set_initialized() { initialized_.store(true); }

Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class RateLimiter : public Customizable {
return true;
}

virtual void PaceUp() {}

protected:
Mode GetMode() { return mode_; }

Expand Down
14 changes: 14 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
tuned_time_(NowMicrosMonotonic(env_)),
duration_highpri_bytes_through_(0),
duration_bytes_through_(0),
should_pace_up_(false),
ratio_delta_(0) {
total_requests_[0] = 0;
total_requests_[1] = 0;
Expand Down Expand Up @@ -325,6 +326,13 @@ Status WriteAmpBasedRateLimiter::Tune() {
} else if (util < 95 && ratio_delta_ > 0) {
ratio_delta_ -= 1;
}
if (should_pace_up_.load(std::memory_order_relaxed)) {
if (ratio_delta_ < 60) {
ratio_delta_ +=
60; // effect lasts for at least 60 * kSecondsPerTune = 1m
}
should_pace_up_.store(false, std::memory_order_relaxed);
}

int64_t new_bytes_per_sec =
(ratio + ratio_padding + ratio_delta_) *
Expand All @@ -343,6 +351,12 @@ Status WriteAmpBasedRateLimiter::Tune() {
return Status::OK();
}

void WriteAmpBasedRateLimiter::PaceUp() {
if (auto_tuned_) {
should_pace_up_.store(true, std::memory_order_relaxed);
}
}

RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
Expand Down
3 changes: 3 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
return rate_bytes_per_sec_;
}

virtual void PaceUp() override;

private:
void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Expand Down Expand Up @@ -150,6 +152,7 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
WindowSmoother<kLongTermWindowSize> long_term_highpri_bytes_sampler_;
WindowSmoother<kRecentSmoothWindowSize, kRecentSmoothWindowSize>
limit_bytes_sampler_;
std::atomic<bool> should_pace_up_;
int32_t ratio_delta_;
};

Expand Down

0 comments on commit d08e7cc

Please sign in to comment.