Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TiKV flow control #384

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();

if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
Expand All @@ -965,7 +966,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) {
Expand All @@ -977,7 +979,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
Expand All @@ -987,7 +990,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped,
Expand All @@ -1002,7 +1006,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
Expand All @@ -1022,7 +1027,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
Expand All @@ -1048,7 +1054,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
assert(write_stall_condition == WriteStallCondition::kNormal);
assert(write_stall_condition == WriteStallCondition::kNormal ||
mutable_cf_options.disable_write_stall);
if (vstorage->l0_delay_trigger_count() >=
GetL0FileCountForCompactionSpeedup(
mutable_cf_options.level0_file_num_compaction_trigger,
Expand Down
82 changes: 81 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,87 @@ TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
}
}

TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) {
const std::string kValue(1024, 'v');
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.disable_write_stall = true;
options.write_buffer_size = 1024 * 1024 * 10;
options.compression = CompressionType::kNoCompression;
options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.env = env_;

DestroyAndReopen(options);
int i = 0;
for (; i < 1024; i++) {
Put(Key(i), kValue);
}
Flush();
for (; i < 1024 * 2; i++) {
Put(Key(i), kValue);
}
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);

options.hard_pending_compaction_bytes_limit = l0_size;
options.soft_pending_compaction_bytes_limit = l0_size;

Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());

SyncPoint::GetInstance()->LoadDependency(
{{"DBOptionsTest::EnableAutoCompactionButDisableStall:1",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
"DBOptionsTest::EnableAutoCompactionButDisableStall:2"},
{"DBOptionsTest::EnableAutoCompactionButDisableStall:3",
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
// Block background compaction.
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2");

ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());

TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3");

// Background compaction executed.
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());
}

TEST_F(DBOptionsTest, SetOptionsDisableWriteStall) {
Options options;
options.create_if_missing = true;
options.disable_write_stall = false;
options.env = env_;
Reopen(options);
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);

ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "true"}}));
ASSERT_EQ(true, dbfull()->GetOptions().disable_write_stall);
ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "false"}}));
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);
}

TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
Expand Down Expand Up @@ -1430,7 +1511,6 @@ TEST_F(DBOptionsTest, ChangeCompression) {
SyncPoint::GetInstance()->DisableProcessing();
}


TEST_F(DBOptionsTest, BottommostCompressionOptsWithFallbackType) {
// Verify the bottommost compression options still take effect even when the
// bottommost compression type is left at its default value. Verify for both
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ struct ExternalFileIngestionInfo {
SequenceNumber global_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
// Level inside the DB we picked for the external file.
int picked_level;
};

// Result of auto background error recovery
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
bool disable_auto_compactions = false;

// Disable write stall mechanism.
//
// Dynamically changeable through SetOptions() API
bool disable_write_stall = false;

// This is a factory that provides TableFactory objects.
// Default: a block-based table factory that provides a default
// implementation of TableBuilder and TableReader with default
Expand Down
6 changes: 6 additions & 0 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableCFOptions, disable_auto_compactions),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"disable_write_stall",
{offsetof(struct MutableCFOptions, disable_write_stall),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
Expand Down Expand Up @@ -1069,6 +1073,8 @@ void MutableCFOptions::Dump(Logger* log) const {
: prefix_extractor->GetId().c_str());
ROCKS_LOG_INFO(log, " disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_INFO(log, " disable_write_stall: %d",
disable_write_stall);
ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64,
soft_pending_compaction_bytes_limit);
ROCKS_LOG_INFO(log, " hard_pending_compaction_bytes_limit: %" PRIu64,
Expand Down
3 changes: 3 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct MutableCFOptions {
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions),
disable_write_stall(options.disable_write_stall),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
Expand Down Expand Up @@ -196,6 +197,7 @@ struct MutableCFOptions {
prefix_extractor(nullptr),
experimental_mempurge_threshold(0.0),
disable_auto_compactions(false),
disable_write_stall(false),
soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0),
level0_file_num_compaction_trigger(0),
Expand Down Expand Up @@ -280,6 +282,7 @@ struct MutableCFOptions {

// Compaction related options
bool disable_auto_compactions;
bool disable_write_stall;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger;
Expand Down
2 changes: 2 additions & 0 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
hard_pending_compaction_bytes_limit);
ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_HEADER(log, " Options.disable_write_stall: %d",
disable_write_stall);

const auto& it_compaction_style =
compaction_style_to_string.find(compaction_style);
Expand Down
1 change: 1 addition & 0 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,

// Compaction related options
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
cf_opts->disable_write_stall = moptions.disable_write_stall;
cf_opts->soft_pending_compaction_bytes_limit =
moptions.soft_pending_compaction_bytes_limit;
cf_opts->hard_pending_compaction_bytes_limit =
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"compaction_pri=kMinOverlappingRatio;"
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"
"disable_write_stall=false;"
"report_bg_io_stats=true;"
"ttl=60;"
"periodic_compaction_seconds=3600;"
Expand Down
Loading