Skip to content

Commit

Permalink
Add support for TiKV IO rate limiter
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
  • Loading branch information
v01dstar committed Sep 26, 2024
1 parent 012c5f1 commit 6d2a819
Show file tree
Hide file tree
Showing 16 changed files with 623 additions and 12 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ set(SOURCES
env/env.cc
env/env_chroot.cc
env/env_encryption.cc
env/env_inspected.cc
env/file_system.cc
env/file_system_tracer.cc
env/fs_remap.cc
Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"env/env.cc",
"env/env_chroot.cc",
"env/env_encryption.cc",
"env/env_inspected.cc",
"env/env_posix.cc",
"env/file_system.cc",
"env/file_system_tracer.cc",
Expand Down
21 changes: 14 additions & 7 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();

if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
Expand All @@ -965,7 +966,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) {
Expand All @@ -977,7 +979,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
Expand All @@ -987,7 +990,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped,
Expand All @@ -1002,7 +1006,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
Expand All @@ -1022,7 +1027,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
Expand All @@ -1048,7 +1054,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
assert(write_stall_condition == WriteStallCondition::kNormal);
assert(write_stall_condition == WriteStallCondition::kNormal ||
mutable_cf_options.disable_write_stall);
if (vstorage->l0_delay_trigger_count() >=
GetL0FileCountForCompactionSpeedup(
mutable_cf_options.level0_file_num_compaction_trigger,
Expand Down
10 changes: 6 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,10 +1317,11 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits = GetBGJobLimits(
new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs, /* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits =
GetBGJobLimits(new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
/* parallelize_compactions */ true);

const bool max_flushes_increased =
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
Expand Down Expand Up @@ -6414,6 +6415,7 @@ void DBImpl::NotifyOnExternalFileIngested(
info.internal_file_path = f.internal_file_path;
info.global_seqno = f.assigned_seqno;
info.table_properties = f.table_properties;
info.picked_level = f.picked_level;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileIngested(this, info);
}
Expand Down
82 changes: 81 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,87 @@ TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
}
}

TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) {
const std::string kValue(1024, 'v');
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.disable_write_stall = true;
options.write_buffer_size = 1024 * 1024 * 10;
options.compression = CompressionType::kNoCompression;
options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.env = env_;

DestroyAndReopen(options);
int i = 0;
for (; i < 1024; i++) {
Put(Key(i), kValue);
}
Flush();
for (; i < 1024 * 2; i++) {
Put(Key(i), kValue);
}
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);

options.hard_pending_compaction_bytes_limit = l0_size;
options.soft_pending_compaction_bytes_limit = l0_size;

Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());

SyncPoint::GetInstance()->LoadDependency(
{{"DBOptionsTest::EnableAutoCompactionButDisableStall:1",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
"DBOptionsTest::EnableAutoCompactionButDisableStall:2"},
{"DBOptionsTest::EnableAutoCompactionButDisableStall:3",
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
// Block background compaction.
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2");

ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());

TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3");

// Background compaction executed.
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());
}

TEST_F(DBOptionsTest, SetOptionsDisableWriteStall) {
Options options;
options.create_if_missing = true;
options.disable_write_stall = false;
options.env = env_;
Reopen(options);
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);

ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "true"}}));
ASSERT_EQ(true, dbfull()->GetOptions().disable_write_stall);
ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "false"}}));
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);
}

TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
Expand Down Expand Up @@ -1430,7 +1511,6 @@ TEST_F(DBOptionsTest, ChangeCompression) {
SyncPoint::GetInstance()->DisableProcessing();
}


TEST_F(DBOptionsTest, BottommostCompressionOptsWithFallbackType) {
// Verify the bottommost compression options still take effect even when the
// bottommost compression type is left at its default value. Verify for both
Expand Down
39 changes: 39 additions & 0 deletions env/env_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/env_encryption.h"
#include "rocksdb/env_inspected.h"
#include "test_util/testharness.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -81,6 +82,41 @@ static Env* GetTestFS() {
return fs_env;
}

class DummyFileSystemInspector : public FileSystemInspector {
public:
DummyFileSystemInspector(size_t refill_bytes = 0)
: refill_bytes_(refill_bytes) {}

Status Read(size_t len, size_t* allowed) override {
assert(allowed);
if (refill_bytes_ == 0) {
*allowed = len;
} else {
*allowed = std::min(refill_bytes_, len);
}
return Status::OK();
}

Status Write(size_t len, size_t* allowed) override {
assert(allowed);
if (refill_bytes_ == 0) {
*allowed = len;
} else {
*allowed = std::min(refill_bytes_, len);
}
return Status::OK();
}

private:
size_t refill_bytes_;
};

static Env* GetInspectedEnv() {
static std::unique_ptr<Env> inspected_env(NewFileSystemInspectedEnv(
Env::Default(), std::make_shared<DummyFileSystemInspector>(1)));
return inspected_env.get();
}

} // namespace
class EnvBasicTestWithParam
: public testing::Test,
Expand Down Expand Up @@ -118,6 +154,9 @@ INSTANTIATE_TEST_CASE_P(EncryptedEnv, EnvMoreTestWithParam,
INSTANTIATE_TEST_CASE_P(MemEnv, EnvBasicTestWithParam,
::testing::Values(&GetMemoryEnv));

INSTANTIATE_TEST_CASE_P(InspectedEnv, EnvBasicTestWithParam,
::testing::Values(&GetInspectedEnv));

namespace {

// Returns a vector of 0 or 1 Env*, depending whether an Env is registered for
Expand Down
Loading

0 comments on commit 6d2a819

Please sign in to comment.