diff --git a/db/column_family_test.cc b/db/column_family_test.cc index c0574ee550..a38d1a9bd1 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -3061,8 +3061,6 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) { TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { SpecialEnv env(Env::Default()); - // Allow both of flush and purge job to schedule. - env.SetBackgroundThreads(2, Env::HIGH); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset( @@ -3096,9 +3094,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"ColumnFamilyTest::IteratorCloseWALFile2:0", "DBImpl::BGWorkPurge:start"}, - {"ColumnFamilyTest::IteratorCloseWALFile2:2", + {"ColumnFamilyTest::IteratorCloseWALFile2:1", "DBImpl::BackgroundCallFlush:start"}, - {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -3110,22 +3107,37 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { ASSERT_EQ(2, env.num_open_wal_file_.load()); // Deleting the iterator will clear its super version, triggering // closing all files - it->Seek(""); + it->Seek(""); // purge (x2) ASSERT_OK(it->status()); ASSERT_EQ(2, env.num_open_wal_file_.load()); ASSERT_EQ(0, env.delete_count_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); + + // Fill the low priority pool in order to ensure that all background purges + // finished before we continue + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task, + Env::Priority::LOW); + task.WaitUntilSleeping(); + } + // Release and wait for all of the tasks to finish + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } + ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); WaitForFlush(1); ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - delete it; + delete it; // purge ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); Reopen(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 85255f0242..c41128179e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3005,6 +3005,7 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) { options.max_bytes_for_level_multiplier = 2; options.compression = kNoCompression; options.max_subcompactions = max_subcompactions_; + options.max_background_compactions = 1; env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); @@ -3101,17 +3102,22 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) { ASSERT_EQ("0,1", FilesPerLevel(0)); // block compactions - test::SleepingBackgroundTask sleeping_task; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, - Env::Priority::LOW); + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &task, + Env::Priority::LOW); + } options.max_bytes_for_level_base = 1024 * 1024; // 1 MB Reopen(options); std::unique_ptr iterator(db_->NewIterator(ReadOptions())); ASSERT_EQ("0,1", FilesPerLevel(0)); // let compactions go - sleeping_task.WakeUp(); - sleeping_task.WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } // this should execute L1->L2 (move) ASSERT_OK(dbfull()->TEST_WaitForCompact()); @@ -9215,9 +9221,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -9252,8 +9261,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { // CompactRange should return before the compaction has the chance to run compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,1", FilesPerLevel(0)); } @@ -9272,9 +9283,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -9314,8 +9328,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { auto s = db_->Close(); ASSERT_OK(s); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { @@ -9332,9 +9348,12 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -9371,8 +9390,10 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { // manual compaction thread should return with Incomplete(). compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 0cf9e7cf92..67867440ba 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -139,6 +139,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { options.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); Reopen(options); env_->SetBackgroundThreads(0, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); std::thread::id tid; int num_flushes = 0, num_compactions = 0; @@ -2049,6 +2050,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { options.create_if_missing = true; options.listeners.push_back(listener); // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; // Allow 2 immutable memtables. options.max_write_buffer_number = 3; @@ -3065,6 +3067,7 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { options.env = fault_injection_env.get(); // Set a larger value than default so that RocksDB can schedule concurrent // background flush threads. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; options.max_write_buffer_number = 8; CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 5be2cafda8..c84d55b8c3 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2838,7 +2838,11 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, int max_background_jobs, bool parallelize_compactions) { BGJobLimits res; - if (max_background_flushes == -1 && max_background_compactions == -1) { + const int flushes = std::max(1, max_background_flushes); + const int compactions = std::max(1, max_background_compactions); + + if ((max_background_flushes == -1 && max_background_compactions == -1) || + (max_background_jobs > flushes + compactions)) { // for our first stab implementing max_background_jobs, simply allocate a // quarter of the threads to flushes. res.max_flushes = std::max(1, max_background_jobs / 4); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index c3910a9787..65b0f34dfe 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -630,6 +630,9 @@ TEST_F(DBOptionsTest, SetBackgroundJobs) { Options options; options.create_if_missing = true; options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); options.env = env_; Reopen(options); diff --git a/db/db_test.cc b/db/db_test.cc index 3b6012617b..b3e347e228 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4326,9 +4326,6 @@ TEST_F(DBTest, ConcurrentMemtableNotSupported) { TEST_F(DBTest, SanitizeNumThreads) { for (int attempt = 0; attempt < 2; attempt++) { - const size_t kTotalTasks = 8; - test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; - Options options = CurrentOptions(); if (attempt == 0) { options.max_background_compactions = 3; @@ -4337,11 +4334,17 @@ TEST_F(DBTest, SanitizeNumThreads) { options.create_if_missing = true; DestroyAndReopen(options); - for (size_t i = 0; i < kTotalTasks; i++) { + const size_t low_task_count = + options.env->GetBackgroundThreads(Env::Priority::LOW) + 1; + const size_t high_task_count = + options.env->GetBackgroundThreads(Env::Priority::HIGH) + 2; + std::vector sleeping_tasks(low_task_count + + high_task_count); + for (size_t i = 0; i < sleeping_tasks.size(); ++i) { // Insert 5 tasks to low priority queue and 5 tasks to high priority queue - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_tasks[i], - (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); + env_->Schedule( + &test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], + (i < low_task_count) ? Env::Priority::LOW : Env::Priority::HIGH); } // Wait until 10s for they are scheduled. @@ -4358,9 +4361,9 @@ TEST_F(DBTest, SanitizeNumThreads) { // pool size 2, total task 4. Queue size should be 2. ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH)); - for (size_t i = 0; i < kTotalTasks; i++) { - sleeping_tasks[i].WakeUp(); - sleeping_tasks[i].WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); } ASSERT_OK(Put("abc", "def")); @@ -5413,10 +5416,13 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Block compaction - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } ASSERT_EQ(NumTableFilesAtLevel(0), 0); int count = 0; Random rnd(301); @@ -5426,14 +5432,18 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } // Stop trigger = 8 ASSERT_EQ(count, 8); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0. // Block compaction thread again. Perform the put and memtable flushes @@ -5444,23 +5454,29 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Block compaction again - sleeping_task_low.Reset(); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } count = 0; while (count < 64) { ASSERT_OK(Put(Key(count), rnd.RandomString(1024), wo)); ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } ASSERT_EQ(count, 6); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Test disable_auto_compactions // Compaction thread is unblocked but auto compaction is disabled. Write @@ -6742,11 +6758,14 @@ TEST_F(DBTest, SoftLimit) { ASSERT_OK(Put(Key(0), "")); - test::SleepingBackgroundTask sleeping_task_low; + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); // Block compactions - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3. for (int i = 0; i < 3; i++) { @@ -6760,9 +6779,11 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); - sleeping_task_low.Reset(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + sleeping_task.Reset(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Now there is one L1 file but doesn't trigger soft_rate_limit @@ -6779,14 +6800,16 @@ TEST_F(DBTest, SoftLimit) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void* /*arg*/) { // Schedule a sleeping task. - sleeping_task_low.Reset(); + sleeping_task_low[0].Reset(); env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_low, Env::Priority::LOW); + &sleeping_task_low[0], Env::Priority::LOW); }); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3 for (int i = 0; i < 3; i++) { ASSERT_OK(Put(Key(10 + i), std::string(5000, 'x'))); @@ -6800,8 +6823,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB // Given level multiplier 10, estimated pending compaction is around 100KB @@ -6822,8 +6845,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB // L2 size is 360KB, so the estimated level fanout 4, estimated pending @@ -6833,8 +6856,8 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); @@ -6849,10 +6872,12 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WaitUntilSleeping(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBTest, LastWriteBufferDelay) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 0d2af51fc0..36176826c3 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1761,6 +1761,9 @@ TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { options.track_and_verify_wals_in_manifest = true; // The following make sure there are two bg flush threads. options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); DestroyAndReopen(options); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index fe3065aa6c..967d2942ed 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -248,9 +248,12 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice)); // 3 sst after compaction with live iterator CheckFileTypeCounts(dbname_, 0, 3, 1); - test::SleepingBackgroundTask sleeping_task_before; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_before, Env::Priority::LOW); + std::vector sleeping_task_before( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_before) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } delete itr; test::SleepingBackgroundTask sleeping_task_after; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, @@ -258,14 +261,19 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { // Make sure no purges are executed foreground CheckFileTypeCounts(dbname_, 0, 3, 1); - sleeping_task_before.WakeUp(); - sleeping_task_before.WaitUntilDone(); + sleeping_task_before[0].WakeUp(); + sleeping_task_before[0].WaitUntilDone(); // Make sure all background purges are executed sleeping_task_after.WakeUp(); sleeping_task_after.WaitUntilDone(); // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); + + for (size_t i = 1; i < sleeping_task_before.size(); ++i) { + sleeping_task_before[i].WakeUp(); + sleeping_task_before[i].WaitUntilDone(); + } } TEST_F(DeleteFileTest, PurgeDuringOpen) { @@ -330,16 +338,21 @@ TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) { CheckFileTypeCounts(dbname_, 0, 1, 1); delete cfh; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // If background purge is enabled, the file should still be there. CheckFileTypeCounts(dbname_, 0, bg_purge ? 1 : 0, 1); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeCFDropTest:1"); // Execute background purges. - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // The file should have been deleted. CheckFileTypeCounts(dbname_, 0, 0, 1); }; @@ -399,13 +412,18 @@ TEST_F(DeleteFileTest, BackgroundPurgeCopyOptions) { CheckFileTypeCounts(dbname_, 0, 3, 1); delete itr; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + } // Make sure all background purges are executed - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); } @@ -445,9 +463,14 @@ TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); delete itr1; + for (int i = 0; + i < std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)); ++i) { + env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::LOW); + } env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::HIGH); delete itr2; env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::HIGH); + env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::LOW); Close(); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d11ccc62f5..90154d038d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -740,10 +740,10 @@ struct DBOptions { // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads // - // Default: -1 + // Default: 8 // // Dynamically changeable through SetDBOptions() API. - int max_background_compactions = -1; + int max_background_compactions = 8; // This value represents the maximum number of threads that will // concurrently perform a compaction job by breaking it into multiple,