Skip to content

Commit

Permalink
Add support for TiKV flow control
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
  • Loading branch information
v01dstar committed Sep 27, 2024
1 parent d308a00 commit d60ab01
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 8 deletions.
21 changes: 14 additions & 7 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();

if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
Expand All @@ -965,7 +966,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) {
Expand All @@ -977,7 +979,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
Expand All @@ -987,7 +990,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped,
Expand All @@ -1002,7 +1006,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
Expand All @@ -1022,7 +1027,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
Expand All @@ -1048,7 +1054,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
assert(write_stall_condition == WriteStallCondition::kNormal);
assert(write_stall_condition == WriteStallCondition::kNormal ||
mutable_cf_options.disable_write_stall);
if (vstorage->l0_delay_trigger_count() >=
GetL0FileCountForCompactionSpeedup(
mutable_cf_options.level0_file_num_compaction_trigger,
Expand Down
82 changes: 81 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,87 @@ TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
}
}

TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) {
const std::string kValue(1024, 'v');
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.disable_write_stall = true;
options.write_buffer_size = 1024 * 1024 * 10;
options.compression = CompressionType::kNoCompression;
options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.env = env_;

DestroyAndReopen(options);
int i = 0;
for (; i < 1024; i++) {
Put(Key(i), kValue);
}
Flush();
for (; i < 1024 * 2; i++) {
Put(Key(i), kValue);
}
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);

options.hard_pending_compaction_bytes_limit = l0_size;
options.soft_pending_compaction_bytes_limit = l0_size;

Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());

SyncPoint::GetInstance()->LoadDependency(
{{"DBOptionsTest::EnableAutoCompactionButDisableStall:1",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
"DBOptionsTest::EnableAutoCompactionButDisableStall:2"},
{"DBOptionsTest::EnableAutoCompactionButDisableStall:3",
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
// Block background compaction.
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2");

ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());

TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3");

// Background compaction executed.
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());
}

TEST_F(DBOptionsTest, SetOptionsDisableWriteStall) {
Options options;
options.create_if_missing = true;
options.disable_write_stall = false;
options.env = env_;
Reopen(options);
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);

ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "true"}}));
ASSERT_EQ(true, dbfull()->GetOptions().disable_write_stall);
ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "false"}}));
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);
}

TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
Expand Down Expand Up @@ -1430,7 +1511,6 @@ TEST_F(DBOptionsTest, ChangeCompression) {
SyncPoint::GetInstance()->DisableProcessing();
}


TEST_F(DBOptionsTest, BottommostCompressionOptsWithFallbackType) {
// Verify the bottommost compression options still take effect even when the
// bottommost compression type is left at its default value. Verify for both
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ struct ExternalFileIngestionInfo {
SequenceNumber global_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
// Level inside the DB we picked for the external file.
int picked_level;
};

// Result of auto background error recovery
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
bool disable_auto_compactions = false;

// Disable write stall mechanism.
//
// Dynamically changeable through SetOptions() API
bool disable_write_stall = false;

// This is a factory that provides TableFactory objects.
// Default: a block-based table factory that provides a default
// implementation of TableBuilder and TableReader with default
Expand Down
6 changes: 6 additions & 0 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableCFOptions, disable_auto_compactions),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"disable_write_stall",
{offsetof(struct MutableCFOptions, disable_write_stall),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
Expand Down Expand Up @@ -1069,6 +1073,8 @@ void MutableCFOptions::Dump(Logger* log) const {
: prefix_extractor->GetId().c_str());
ROCKS_LOG_INFO(log, " disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_INFO(log, " disable_write_stall: %d",
disable_write_stall);
ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64,
soft_pending_compaction_bytes_limit);
ROCKS_LOG_INFO(log, " hard_pending_compaction_bytes_limit: %" PRIu64,
Expand Down
3 changes: 3 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct MutableCFOptions {
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions),
disable_write_stall(options.disable_write_stall),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
Expand Down Expand Up @@ -196,6 +197,7 @@ struct MutableCFOptions {
prefix_extractor(nullptr),
experimental_mempurge_threshold(0.0),
disable_auto_compactions(false),
disable_write_stall(false),
soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0),
level0_file_num_compaction_trigger(0),
Expand Down Expand Up @@ -280,6 +282,7 @@ struct MutableCFOptions {

// Compaction related options
bool disable_auto_compactions;
bool disable_write_stall;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger;
Expand Down
2 changes: 2 additions & 0 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
hard_pending_compaction_bytes_limit);
ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_HEADER(log, " Options.disable_write_stall: %d",
disable_write_stall);

const auto& it_compaction_style =
compaction_style_to_string.find(compaction_style);
Expand Down
1 change: 1 addition & 0 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,

// Compaction related options
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
cf_opts->disable_write_stall = moptions.disable_write_stall;
cf_opts->soft_pending_compaction_bytes_limit =
moptions.soft_pending_compaction_bytes_limit;
cf_opts->hard_pending_compaction_bytes_limit =
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"compaction_pri=kMinOverlappingRatio;"
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"
"disable_write_stall=false;"
"report_bg_io_stats=true;"
"ttl=60;"
"periodic_compaction_seconds=3600;"
Expand Down

0 comments on commit d60ab01

Please sign in to comment.