From d4affd7c7409d9e151d8f6b83267f198dfd3e831 Mon Sep 17 00:00:00 2001 From: Isaac Garzon Date: Wed, 19 Oct 2022 18:07:42 +0300 Subject: [PATCH] options: change the default value for compaction threads to 8 (#169) This is done through the deprecated `max_background_compactions` option, which overrides `max_background_jobs`, so in order to avoid having an adverse effect on users who set `max_background_jobs` to a high value we try to detect that case and shift back to the user's choice in case there are enough background jobs to be enough for both flushes and compactions. --- db/column_family_test.cc | 28 +++++-- db/db_compaction_test.cc | 61 +++++++++----- db/db_flush_test.cc | 3 + db/db_impl/db_impl_compaction_flush.cc | 6 +- db/db_options_test.cc | 3 + db/db_test.cc | 111 +++++++++++++++---------- db/db_wal_test.cc | 3 + db/deletefile_test.cc | 53 ++++++++---- include/rocksdb/options.h | 4 +- 9 files changed, 183 insertions(+), 89 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 3a1a3a508c..78f7470707 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -3129,8 +3129,6 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) { #ifndef ROCKSDB_LITE // TEST functions are not supported in lite TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { SpecialEnv env(Env::Default()); - // Allow both of flush and purge job to schedule. - env.SetBackgroundThreads(2, Env::HIGH); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset( @@ -3164,9 +3162,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"ColumnFamilyTest::IteratorCloseWALFile2:0", "DBImpl::BGWorkPurge:start"}, - {"ColumnFamilyTest::IteratorCloseWALFile2:2", + {"ColumnFamilyTest::IteratorCloseWALFile2:1", "DBImpl::BackgroundCallFlush:start"}, - {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -3178,22 +3175,37 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { ASSERT_EQ(2, env.num_open_wal_file_.load()); // Deleting the iterator will clear its super version, triggering // closing all files - it->Seek(""); + it->Seek(""); // purge (x2) ASSERT_OK(it->status()); ASSERT_EQ(2, env.num_open_wal_file_.load()); ASSERT_EQ(0, env.delete_count_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); + + // Fill the low priority pool in order to ensure that all background purges + // finished before we continue + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task, + Env::Priority::LOW); + task.WaitUntilSleeping(); + } + // Release and wait for all of the tasks to finish + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } + ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); WaitForFlush(1); ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - delete it; + delete it; // purge ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); Reopen(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 56a4504b04..851ebe29a3 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -2872,6 +2872,7 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) { options.max_bytes_for_level_multiplier = 2; options.compression = kNoCompression; options.max_subcompactions = max_subcompactions_; + options.max_background_compactions = 1; env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); @@ -2969,17 +2970,22 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) { ASSERT_EQ("0,1", FilesPerLevel(0)); // block compactions - test::SleepingBackgroundTask sleeping_task; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, - Env::Priority::LOW); + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task, + Env::Priority::LOW); + } options.max_bytes_for_level_base = 1024 * 1024; // 1 MB Reopen(options); std::unique_ptr iterator(db_->NewIterator(ReadOptions())); ASSERT_EQ("0,1", FilesPerLevel(0)); // let compactions go - sleeping_task.WakeUp(); - sleeping_task.WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } // this should execute L1->L2 (move) ASSERT_OK(dbfull()->TEST_WaitForCompact()); @@ -7144,9 +7150,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7181,8 +7190,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { // CompactRange should return before the compaction has the chance to run compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); ASSERT_EQ("0,1", FilesPerLevel(0)); } @@ -7201,9 +7212,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7243,8 +7257,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { auto s = db_->Close(); ASSERT_OK(s); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { @@ -7261,9 +7277,12 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7300,8 +7319,10 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { // manual compaction thread should return with Incomplete(). compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 3ca6312077..1f6f0ff461 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -143,6 +143,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { options.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); Reopen(options); env_->SetBackgroundThreads(0, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); std::thread::id tid; int num_flushes = 0, num_compactions = 0; @@ -1686,6 +1687,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { options.create_if_missing = true; options.listeners.push_back(listener); // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; // Allow 2 immutable memtables. options.max_write_buffer_number = 3; @@ -2706,6 +2708,7 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { options.env = fault_injection_env.get(); // Set a larger value than default so that RocksDB can schedule concurrent // background flush threads. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; options.max_write_buffer_number = 8; CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2f803886b4..8b8a99ad81 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2495,7 +2495,11 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, int max_background_jobs, bool parallelize_compactions) { BGJobLimits res; - if (max_background_flushes == -1 && max_background_compactions == -1) { + const int flushes = std::max(1, max_background_flushes); + const int compactions = std::max(1, max_background_compactions); + + if ((max_background_flushes == -1 && max_background_compactions == -1) || + (max_background_jobs > flushes + compactions)) { // for our first stab implementing max_background_jobs, simply allocate a // quarter of the threads to flushes. res.max_flushes = std::max(1, max_background_jobs / 4); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 46aa252112..5b0b0c9187 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -631,6 +631,9 @@ TEST_F(DBOptionsTest, SetBackgroundJobs) { Options options; options.create_if_missing = true; options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); options.env = env_; Reopen(options); diff --git a/db/db_test.cc b/db/db_test.cc index edf409fc73..55c7c7aab7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4159,9 +4159,6 @@ TEST_F(DBTest, ConcurrentMemtableNotSupported) { TEST_F(DBTest, SanitizeNumThreads) { for (int attempt = 0; attempt < 2; attempt++) { - const size_t kTotalTasks = 8; - test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; - Options options = CurrentOptions(); if (attempt == 0) { options.max_background_compactions = 3; @@ -4170,11 +4167,17 @@ TEST_F(DBTest, SanitizeNumThreads) { options.create_if_missing = true; DestroyAndReopen(options); - for (size_t i = 0; i < kTotalTasks; i++) { + const size_t low_task_count = + options.env->GetBackgroundThreads(Env::Priority::LOW) + 1; + const size_t high_task_count = + options.env->GetBackgroundThreads(Env::Priority::HIGH) + 2; + std::vector sleeping_tasks(low_task_count + + high_task_count); + for (size_t i = 0; i < sleeping_tasks.size(); ++i) { // Insert 5 tasks to low priority queue and 5 tasks to high priority queue - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_tasks[i], - (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); + env_->Schedule( + &test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], + (i < low_task_count) ? Env::Priority::LOW : Env::Priority::HIGH); } // Wait until 10s for they are scheduled. @@ -4191,9 +4194,9 @@ TEST_F(DBTest, SanitizeNumThreads) { // pool size 2, total task 4. Queue size should be 2. ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH)); - for (size_t i = 0; i < kTotalTasks; i++) { - sleeping_tasks[i].WakeUp(); - sleeping_tasks[i].WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); } ASSERT_OK(Put("abc", "def")); @@ -5246,10 +5249,13 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Block compaction - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } ASSERT_EQ(NumTableFilesAtLevel(0), 0); int count = 0; Random rnd(301); @@ -5259,14 +5265,18 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } // Stop trigger = 8 ASSERT_EQ(count, 8); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0. // Block compaction thread again. Perform the put and memtable flushes @@ -5277,23 +5287,29 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Block compaction again - sleeping_task_low.Reset(); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } count = 0; while (count < 64) { ASSERT_OK(Put(Key(count), rnd.RandomString(1024), wo)); ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } ASSERT_EQ(count, 6); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Test disable_auto_compactions // Compaction thread is unblocked but auto compaction is disabled. Write @@ -6555,11 +6571,14 @@ TEST_F(DBTest, SoftLimit) { ASSERT_OK(Put(Key(0), "")); - test::SleepingBackgroundTask sleeping_task_low; + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); // Block compactions - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3. for (int i = 0; i < 3; i++) { @@ -6573,9 +6592,11 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); - sleeping_task_low.Reset(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + sleeping_task.Reset(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Now there is one L1 file but doesn't trigger soft_rate_limit @@ -6592,14 +6613,16 @@ TEST_F(DBTest, SoftLimit) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void* /*arg*/) { // Schedule a sleeping task. - sleeping_task_low.Reset(); + sleeping_task_low[0].Reset(); env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_low, Env::Priority::LOW); + &sleeping_task_low[0], Env::Priority::LOW); }); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3 for (int i = 0; i < 3; i++) { ASSERT_OK(Put(Key(10 + i), std::string(5000, 'x'))); @@ -6613,8 +6636,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB // Given level multiplier 10, estimated pending compaction is around 100KB @@ -6635,8 +6658,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB // L2 size is 360KB, so the estimated level fanout 4, estimated pending @@ -6646,8 +6669,8 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); @@ -6662,10 +6685,12 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WaitUntilSleeping(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBTest, LastWriteBufferDelay) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 69114ab7c2..2cb41e83e4 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1496,6 +1496,9 @@ TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { options.track_and_verify_wals_in_manifest = true; // The following make sure there are two bg flush threads. options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); const std::string cf1_name("cf1"); CreateAndReopenWithCF({cf1_name}, options); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 2e40aa355c..c2f8d6a122 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -250,9 +250,12 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice)); // 3 sst after compaction with live iterator CheckFileTypeCounts(dbname_, 0, 3, 1); - test::SleepingBackgroundTask sleeping_task_before; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_before, Env::Priority::LOW); + std::vector sleeping_task_before( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_before) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } delete itr; test::SleepingBackgroundTask sleeping_task_after; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, @@ -260,14 +263,19 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { // Make sure no purges are executed foreground CheckFileTypeCounts(dbname_, 0, 3, 1); - sleeping_task_before.WakeUp(); - sleeping_task_before.WaitUntilDone(); + sleeping_task_before[0].WakeUp(); + sleeping_task_before[0].WaitUntilDone(); // Make sure all background purges are executed sleeping_task_after.WakeUp(); sleeping_task_after.WaitUntilDone(); // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); + + for (size_t i = 1; i < sleeping_task_before.size(); ++i) { + sleeping_task_before[i].WakeUp(); + sleeping_task_before[i].WaitUntilDone(); + } } TEST_F(DeleteFileTest, PurgeDuringOpen) { @@ -332,16 +340,21 @@ TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) { CheckFileTypeCounts(dbname_, 0, 1, 1); delete cfh; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // If background purge is enabled, the file should still be there. CheckFileTypeCounts(dbname_, 0, bg_purge ? 1 : 0, 1); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeCFDropTest:1"); // Execute background purges. - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // The file should have been deleted. CheckFileTypeCounts(dbname_, 0, 0, 1); }; @@ -401,13 +414,18 @@ TEST_F(DeleteFileTest, BackgroundPurgeCopyOptions) { CheckFileTypeCounts(dbname_, 0, 3, 1); delete itr; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // Make sure all background purges are executed - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); } @@ -447,9 +465,14 @@ TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); delete itr1; + for (int i = 0; + i < std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)); ++i) { + env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::LOW); + } env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::HIGH); delete itr2; env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::HIGH); + env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::LOW); Close(); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fe5fb4a6c7..4cd2f40ede 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -689,10 +689,10 @@ struct DBOptions { // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads // - // Default: -1 + // Default: 8 // // Dynamically changeable through SetDBOptions() API. - int max_background_compactions = -1; + int max_background_compactions = 8; // This value represents the maximum number of threads that will // concurrently perform a compaction job by breaking it into multiple,