Skip to content

Commit

Permalink
Slowdown writes based on WBM's dirty memory usage (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb authored and hilikspdb committed Oct 1, 2022
1 parent 36cfb09 commit 12a4b3b
Show file tree
Hide file tree
Showing 12 changed files with 703 additions and 63 deletions.
9 changes: 6 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,8 @@ std::unique_ptr<WriteControllerToken> SetupDelay(
}
}
}
return write_controller->GetDelayToken(write_rate);
return write_controller->GetDelayToken(WriteController::DelaySource::kCF,
write_rate);
}

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
Expand Down Expand Up @@ -1020,8 +1021,10 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
// increase signal.
if (needed_delay) {
uint64_t write_rate = write_controller->delayed_write_rate();
write_controller->set_delayed_write_rate(static_cast<uint64_t>(
static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
write_controller->set_delayed_write_rate(
WriteController::DelaySource::kCF,
static_cast<uint64_t>(static_cast<double>(write_rate) *
kDelayRecoverSlowdownRatio));
// Set the low pri limit to be 1/4 the delayed write rate.
// Note we don't reset this value even after delay condition is relased.
// Low-pri rate will continue to apply if there is a compaction
Expand Down
3 changes: 2 additions & 1 deletion db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2697,7 +2697,8 @@ TEST_P(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());

mutable_cf_options.disable_auto_compactions = true;
dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
dbfull()->TEST_write_controler().set_delayed_write_rate(
WriteController::DelaySource::kCF, kBaseRate);
RecalculateWriteStallConditions(cfd, mutable_cf_options);
ASSERT_TRUE(!IsDbWriteStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
Expand Down
11 changes: 11 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,10 @@ class DBImpl : public DB {
PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const;
#endif // !ROCKSDB_LITE

bool TEST_has_write_controller_token() const {
return (write_controller_token_.get() != nullptr);
}

#endif // NDEBUG

// persist stats to column family "_persistent_stats"
Expand Down Expand Up @@ -2393,6 +2397,13 @@ class DBImpl : public DB {

// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;

// Members used for WBM's required delay
std::unique_ptr<WriteControllerToken> write_controller_token_;
WriteBufferManager::UsageState wbm_spdb_usage_state_ =
WriteBufferManager::UsageState::kNone;
uint64_t wbm_spdb_delayed_write_factor_ =
WriteBufferManager::kNoneDelayedWriteFactor;
};

extern Options SanitizeOptions(const std::string& db, const Options& src,
Expand Down
44 changes: 44 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,27 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
}
}

namespace {

std::unique_ptr<WriteControllerToken> SetupDelayFromFactor(
WriteController& write_controller, uint64_t delay_factor) {
assert(delay_factor > 0U);
constexpr uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.

auto max_write_rate = write_controller.max_delayed_write_rate();

auto wbm_write_rate = max_write_rate;
if (max_write_rate >= kMinWriteRate) {
// If user gives rate less than kMinWriteRate, don't adjust it.
wbm_write_rate = max_write_rate / delay_factor;
}

return write_controller.GetDelayToken(WriteController::DelaySource::kWBM,
wbm_write_rate);
}

} // namespace

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool* need_log_sync,
WriteContext* write_context) {
Expand Down Expand Up @@ -1071,6 +1092,29 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
PERF_TIMER_GUARD(write_pre_and_post_process_time);

// Handle latest WBM calculated write delay, if applicable
if (status.ok() && write_buffer_manager_ &&
write_buffer_manager_->IsDelayAllowed()) {
auto [new_usage_state, new_delayed_write_factor] =
write_buffer_manager_->GetUsageStateInfo();

if (UNLIKELY(
(wbm_spdb_usage_state_ != new_usage_state) ||
(wbm_spdb_delayed_write_factor_ != new_delayed_write_factor))) {
if (new_usage_state != WriteBufferManager::UsageState::kDelay) {
write_controller_token_.reset();
} else if ((wbm_spdb_usage_state_ !=
WriteBufferManager::UsageState::kDelay) ||
(wbm_spdb_delayed_write_factor_ != new_delayed_write_factor)) {
write_controller_token_ =
SetupDelayFromFactor(write_controller_, new_delayed_write_factor);
}

wbm_spdb_usage_state_ = new_usage_state;
wbm_spdb_delayed_write_factor_ = new_delayed_write_factor;
}
}

if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
Expand Down
12 changes: 8 additions & 4 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ TEST_F(DBTest, SkipDelay) {
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
auto token = dbfull()->TEST_write_controler().GetDelayToken(
WriteController::DelaySource::kCF, 1);
std::atomic<int> sleep_count(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
Expand All @@ -262,7 +263,8 @@ TEST_F(DBTest, SkipDelay) {
ASSERT_GE(wait_count.load(), 0);
token.reset();

token = dbfull()->TEST_write_controler().GetDelayToken(1000000);
token = dbfull()->TEST_write_controler().GetDelayToken(
WriteController::DelaySource::kCF, 1000000);
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, "foo3", large_value));
ASSERT_GE(sleep_count.load(), 1);
Expand Down Expand Up @@ -297,7 +299,8 @@ TEST_F(DBTest, MixedSlowdownOptions) {
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
auto token = dbfull()->TEST_write_controler().GetDelayToken(
WriteController::DelaySource::kCF, 1);
std::atomic<int> sleep_count(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:BeginWriteStallDone", [&](void* /*arg*/) {
Expand Down Expand Up @@ -351,7 +354,8 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
// when we do Put
// TODO(myabandeh): this is time dependent and could potentially make
// the test flaky
auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
auto token = dbfull()->TEST_write_controler().GetDelayToken(
WriteController::DelaySource::kCF, 1);
std::atomic<int> sleep_count(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep", [&](void* /*arg*/) {
Expand Down
134 changes: 127 additions & 7 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
namespace ROCKSDB_NAMESPACE {

class DBWriteBufferManagerTest : public DBTestBase,
public testing::WithParamInterface<bool> {
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerTest()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};

Expand All @@ -27,7 +29,6 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -70,7 +71,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -197,7 +197,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -314,7 +313,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -456,7 +454,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -618,7 +615,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -780,9 +776,133 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

class DBWriteBufferManagerTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerTest1()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};

TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossCFs) {
constexpr size_t kQuota = 100 * 1000;
constexpr size_t kDelayThreshold =
WriteBufferManager::kStartDelayPercentThreshold * kQuota / 100;

Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = kQuota; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);

if (cost_cache_) {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, cache, true));
} else {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, nullptr, true));
}
WriteOptions wo;
wo.disableWAL = true;

CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);

// Reach the delay threshold by writing to two cf-s, no flush
ASSERT_OK(Put(0, Key(1), DummyString(kDelayThreshold / 2), wo));
ASSERT_OK(Put(1, Key(1), DummyString(kDelayThreshold / 2), wo));

// Write another byte to trigger writing and a delay token in the write
// controller
auto& write_controller = dbfull()->TEST_write_controler();
ASSERT_FALSE(dbfull()->TEST_has_write_controller_token());
ASSERT_FALSE(write_controller.NeedsDelay());
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
ASSERT_TRUE(dbfull()->TEST_has_write_controller_token());
ASSERT_TRUE(write_controller.NeedsDelay());

Flush(1);

// Delay token should be released when the next write arrives
ASSERT_TRUE(dbfull()->TEST_has_write_controller_token());
ASSERT_TRUE(write_controller.NeedsDelay());
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
ASSERT_FALSE(write_controller.NeedsDelay());

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossDBs) {
constexpr size_t kQuota = 500 * 1000;
constexpr size_t kDelayThreshold =
WriteBufferManager::kStartDelayPercentThreshold * kQuota / 100;

std::vector<std::string> dbnames;
std::vector<DB*> dbs;
int num_dbs = 3;

for (int i = 0; i < num_dbs; i++) {
dbs.push_back(nullptr);
dbnames.push_back(
test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
}

Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);

if (cost_cache_) {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, cache, true));
} else {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, nullptr, true));
}

CreateAndReopenWithCF({"cf1", "cf2"}, options);

for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(DestroyDB(dbnames[i], options));
ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
}
WriteOptions wo;
wo.disableWAL = true;

for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(kDelayThreshold / num_dbs)));
}

// Write another byte to trigger writing and a delay token in the write
// controller
auto& write_controller = dbfull()->TEST_write_controler();
ASSERT_FALSE(dbfull()->TEST_has_write_controller_token());
ASSERT_FALSE(write_controller.NeedsDelay());

ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
ASSERT_TRUE(dbfull()->TEST_has_write_controller_token());
ASSERT_TRUE(write_controller.NeedsDelay());

// Clean up DBs.
for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(dbs[i]->Close());
ASSERT_OK(DestroyDB(dbnames[i], options));
delete dbs[i];
}

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1,
::testing::Bool());

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
4 changes: 2 additions & 2 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
}

std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t write_rate) {
DelaySource source, uint64_t write_rate) {
if (0 == total_delayed_++) {
// Starting delay, so reset counters.
next_refill_time_ = 0;
Expand All @@ -29,7 +29,7 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
// NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
// next_refill_time_ will be based on an old rate. This rate will apply
// for subsequent additional debts and for the next refill.
set_delayed_write_rate(write_rate);
set_delayed_write_rate(source, write_rate);
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
}

Expand Down
Loading

0 comments on commit 12a4b3b

Please sign in to comment.