Skip to content

Commit

Permalink
options: change the default value for compaction threads to 8 (#169)
Browse files Browse the repository at this point in the history
This is done through the deprecated `max_background_compactions` option,
which overrides `max_background_jobs`, so in order to avoid having an
adverse effect on users who set `max_background_jobs` to a high value
we try to detect that case and shift back to the user's choice in case
there are enough background jobs to be enough for both flushes and
compactions.
  • Loading branch information
isaac-io committed Oct 24, 2022
1 parent 7566679 commit 62c6129
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 95 deletions.
28 changes: 20 additions & 8 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3129,8 +3129,6 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) {
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(
Expand Down Expand Up @@ -3164,9 +3162,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
{"ColumnFamilyTest::IteratorCloseWALFile2:1",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Expand All @@ -3178,22 +3175,37 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
it->Seek("");
it->Seek(""); // purge (x2)
ASSERT_OK(it->status());

ASSERT_EQ(2, env.num_open_wal_file_.load());
ASSERT_EQ(0, env.delete_count_.load());

TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");

// Fill the low priority pool in order to ensure that all background purges
// finished before we continue
std::vector<test::SleepingBackgroundTask> sleeping_tasks(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& task : sleeping_tasks) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task,
Env::Priority::LOW);
task.WaitUntilSleeping();
}
// Release and wait for all of the tasks to finish
for (auto& task : sleeping_tasks) {
task.WakeUp();
task.WaitUntilDone();
}

ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());

delete it;
delete it; // purge
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

Reopen();
Expand Down
79 changes: 53 additions & 26 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2874,11 +2874,14 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
options.max_subcompactions = max_subcompactions_;

env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);
// stop the compaction thread until we simulate the file creation failure.
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
sleeping_task.WaitUntilSleeping();
}

options.env = env_;

Expand Down Expand Up @@ -2908,8 +2911,8 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {

// Fail the first file creation.
env_->non_writable_count_ = 1;
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
sleeping_task_low[0].WakeUp();
sleeping_task_low[0].WaitUntilDone();

// Expect compaction to fail here as one file will fail its
// creation.
Expand All @@ -2927,6 +2930,10 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
}

env_->non_writable_count_ = 0;
for (size_t i = 1; i < sleeping_task_low.size(); ++i) {
sleeping_task_low[i].WakeUp();
sleeping_task_low[i].WaitUntilDone();
}

// Make sure RocksDB will not get into corrupted state.
Reopen(options);
Expand Down Expand Up @@ -2969,17 +2976,22 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
ASSERT_EQ("0,1", FilesPerLevel(0));

// block compactions
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_tasks(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& task : sleeping_tasks) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task,
Env::Priority::LOW);
}

options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
Reopen(options);
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
ASSERT_EQ("0,1", FilesPerLevel(0));
// let compactions go
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
for (auto& task : sleeping_tasks) {
task.WakeUp();
task.WaitUntilDone();
}

// this should execute L1->L2 (move)
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Expand Down Expand Up @@ -7144,9 +7156,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7181,8 +7196,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
// CompactRange should return before the compaction has the chance to run
compact_thread.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
ASSERT_EQ("0,1", FilesPerLevel(0));
}
Expand All @@ -7201,9 +7218,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7243,8 +7263,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
auto s = db_->Close();
ASSERT_OK(s);

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
}

TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
Expand All @@ -7261,9 +7283,12 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
Reopen(options);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
std::vector<test::SleepingBackgroundTask> sleeping_task_low(
std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)));
for (auto& sleeping_task : sleeping_task_low) {
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
}

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
Expand Down Expand Up @@ -7300,8 +7325,10 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
// manual compaction thread should return with Incomplete().
compact_thread.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
for (auto& sleeping_task : sleeping_task_low) {
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
}
}

TEST_F(DBCompactionTest,
Expand Down
3 changes: 3 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
Reopen(options);
env_->SetBackgroundThreads(0, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW);

std::thread::id tid;
int num_flushes = 0, num_compactions = 0;
Expand Down Expand Up @@ -1686,6 +1687,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_flushes = options.max_background_compactions = -1;
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Expand Down Expand Up @@ -2706,6 +2708,7 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
options.env = fault_injection_env.get();
// Set a larger value than default so that RocksDB can schedule concurrent
// background flush threads.
options.max_background_flushes = options.max_background_compactions = -1;
options.max_background_jobs = 8;
options.max_write_buffer_number = 8;
CreateAndReopenWithCF({"pikachu"}, options);
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2495,7 +2495,11 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
int max_background_jobs,
bool parallelize_compactions) {
BGJobLimits res;
if (max_background_flushes == -1 && max_background_compactions == -1) {
const int flushes = std::max(1, max_background_flushes);
const int compactions = std::max(1, max_background_compactions);

if ((max_background_flushes == -1 && max_background_compactions == -1) ||
(max_background_jobs > flushes + compactions)) {
// for our first stab implementing max_background_jobs, simply allocate a
// quarter of the threads to flushes.
res.max_flushes = std::max(1, max_background_jobs / 4);
Expand Down
3 changes: 3 additions & 0 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ TEST_F(DBOptionsTest, SetBackgroundJobs) {
Options options;
options.create_if_missing = true;
options.max_background_jobs = 8;
options.max_background_compactions = options.max_background_flushes = -1;
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
env_->SetBackgroundThreads(1, Env::Priority::LOW);
options.env = env_;
Reopen(options);

Expand Down
2 changes: 2 additions & 0 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ TEST_F(DBRangeDelTest, TableEvictedDuringScan) {
bbto.cache_index_and_filter_blocks = true;
bbto.block_cache = NewLRUCache(8 << 20);
opts.table_factory.reset(NewBlockBasedTableFactory(bbto));
opts.max_background_compactions = 1;
env_->SetBackgroundThreads(1, Env::Priority::LOW);
DestroyAndReopen(opts);

// Hold a snapshot so range deletions can't become obsolete during compaction
Expand Down
Loading

0 comments on commit 62c6129

Please sign in to comment.