diff --git a/db/column_family_test.cc b/db/column_family_test.cc index c55eb12905..9a2fc25b8c 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -3128,8 +3128,6 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) { #ifndef ROCKSDB_LITE // TEST functions are not supported in lite TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { SpecialEnv env(Env::Default()); - // Allow both of flush and purge job to schedule. - env.SetBackgroundThreads(2, Env::HIGH); db_options_.env = &env; db_options_.max_background_flushes = 1; column_family_options_.memtable_factory.reset( @@ -3163,9 +3161,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"ColumnFamilyTest::IteratorCloseWALFile2:0", "DBImpl::BGWorkPurge:start"}, - {"ColumnFamilyTest::IteratorCloseWALFile2:2", + {"ColumnFamilyTest::IteratorCloseWALFile2:1", "DBImpl::BackgroundCallFlush:start"}, - {"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -3175,24 +3172,41 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) { env.delete_count_.store(0); ASSERT_EQ(2, env.num_open_wal_file_.load()); + // Deleting the iterator will clear its super version, triggering // closing all files - it->Seek(""); + it->Seek(""); // purge (x2) ASSERT_OK(it->status()); ASSERT_EQ(2, env.num_open_wal_file_.load()); ASSERT_EQ(0, env.delete_count_.load()); TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0"); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); + + // Fill the low priority pool in order to ensure that all background purges + // finished before we continue + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &task, Env::Priority::LOW); + task.WaitUntilSleeping(); + } + // Release and wait for all of the tasks to finish + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } + ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2"); + + TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1"); WaitForFlush(1); ASSERT_EQ(1, env.num_open_wal_file_.load()); ASSERT_EQ(1, env.delete_count_.load()); - delete it; + delete it; // purge ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); Reopen(); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index bc1d060d8d..6af9c016c7 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -2848,6 +2848,8 @@ TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) { options.max_bytes_for_level_multiplier = 2; options.compression = kNoCompression; options.max_subcompactions = max_subcompactions_; + options.max_background_flushes = 1; + options.max_background_compactions = 1; env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); @@ -2945,17 +2947,22 @@ TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) { ASSERT_EQ("0,1", FilesPerLevel(0)); // block compactions - test::SleepingBackgroundTask sleeping_task; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, - Env::Priority::LOW); + std::vector sleeping_tasks( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& task : sleeping_tasks) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &task, Env::Priority::LOW); + } options.max_bytes_for_level_base = 1024 * 1024; // 1 MB Reopen(options); std::unique_ptr iterator(db_->NewIterator(ReadOptions())); ASSERT_EQ("0,1", FilesPerLevel(0)); // let compactions go - sleeping_task.WakeUp(); - sleeping_task.WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); + } // this should execute L1->L2 (move) ASSERT_OK(dbfull()->TEST_WaitForCompact()); @@ -7119,9 +7126,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7156,8 +7166,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { // CompactRange should return before the compaction has the chance to run compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); ASSERT_EQ("0,1", FilesPerLevel(0)); } @@ -7176,9 +7188,12 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7218,8 +7233,10 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { auto s = db_->Close(); ASSERT_OK(s); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { @@ -7236,9 +7253,12 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { Reopen(options); // Block compaction queue - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } // generate files, but avoid trigger auto compaction for (int i = 0; i < kNumL0Files / 2; i++) { @@ -7275,8 +7295,10 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { // manual compaction thread should return with Incomplete(). compact_thread.join(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBCompactionTest, diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index a53ce21b24..19480d6c7e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -143,6 +143,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) { options.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); Reopen(options); env_->SetBackgroundThreads(0, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); std::thread::id tid; int num_flushes = 0, num_compactions = 0; @@ -1686,6 +1687,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { options.create_if_missing = true; options.listeners.push_back(listener); // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; // Allow 2 immutable memtables. options.max_write_buffer_number = 3; @@ -2706,6 +2708,7 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { options.env = fault_injection_env.get(); // Set a larger value than default so that RocksDB can schedule concurrent // background flush threads. + options.max_background_flushes = options.max_background_compactions = -1; options.max_background_jobs = 8; options.max_write_buffer_number = 8; CreateAndReopenWithCF({"pikachu"}, options); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index fe43d3c629..02f7d7d951 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2492,7 +2492,11 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, int max_background_jobs, bool parallelize_compactions) { BGJobLimits res; - if (max_background_flushes == -1 && max_background_compactions == -1) { + const int flushes = std::max(1, max_background_flushes); + const int compactions = std::max(1, max_background_compactions); + + if ((max_background_flushes == -1 && max_background_compactions == -1) || + (max_background_jobs > flushes + compactions)) { // for our first stab implementing max_background_jobs, simply allocate a // quarter of the threads to flushes. res.max_flushes = std::max(1, max_background_jobs / 4); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 360a3c561e..d25cffb184 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -615,6 +615,9 @@ TEST_F(DBOptionsTest, SetBackgroundJobs) { Options options; options.create_if_missing = true; options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); options.env = env_; Reopen(options); diff --git a/db/db_test.cc b/db/db_test.cc index 1f4e64ae59..7a9613d090 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4158,9 +4158,6 @@ TEST_F(DBTest, ConcurrentMemtableNotSupported) { TEST_F(DBTest, SanitizeNumThreads) { for (int attempt = 0; attempt < 2; attempt++) { - const size_t kTotalTasks = 8; - test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; - Options options = CurrentOptions(); if (attempt == 0) { options.max_background_compactions = 3; @@ -4169,11 +4166,14 @@ TEST_F(DBTest, SanitizeNumThreads) { options.create_if_missing = true; DestroyAndReopen(options); - for (size_t i = 0; i < kTotalTasks; i++) { + const size_t low_task_count = options.env->GetBackgroundThreads(Env::Priority::LOW) + 1; + const size_t high_task_count = options.env->GetBackgroundThreads(Env::Priority::HIGH) + 2; + std::vector sleeping_tasks(low_task_count + high_task_count); + for (size_t i = 0; i < sleeping_tasks.size(); ++i) { // Insert 5 tasks to low priority queue and 5 tasks to high priority queue env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], - (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); + (i < low_task_count) ? Env::Priority::LOW : Env::Priority::HIGH); } // Wait until 10s for they are scheduled. @@ -4190,9 +4190,9 @@ TEST_F(DBTest, SanitizeNumThreads) { // pool size 2, total task 4. Queue size should be 2. ASSERT_EQ(2U, options.env->GetThreadPoolQueueLen(Env::Priority::HIGH)); - for (size_t i = 0; i < kTotalTasks; i++) { - sleeping_tasks[i].WakeUp(); - sleeping_tasks[i].WaitUntilDone(); + for (auto& task : sleeping_tasks) { + task.WakeUp(); + task.WaitUntilDone(); } ASSERT_OK(Put("abc", "def")); @@ -5244,10 +5244,13 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Block compaction - test::SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } ASSERT_EQ(NumTableFilesAtLevel(0), 0); int count = 0; Random rnd(301); @@ -5257,14 +5260,18 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } // Stop trigger = 8 ASSERT_EQ(count, 8); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0. // Block compaction thread again. Perform the put and memtable flushes @@ -5275,23 +5282,29 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Block compaction again - sleeping_task_low.Reset(); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } count = 0; while (count < 64) { ASSERT_OK(Put(Key(count), rnd.RandomString(1024), wo)); ASSERT_OK(dbfull()->TEST_FlushMemTable(true, true)); count++; if (dbfull()->TEST_write_controler().IsStopped()) { - sleeping_task_low.WakeUp(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + } break; } } ASSERT_EQ(count, 6); // Unblock - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WaitUntilDone(); + } // Test disable_auto_compactions // Compaction thread is unblocked but auto compaction is disabled. Write @@ -6553,11 +6566,14 @@ TEST_F(DBTest, SoftLimit) { ASSERT_OK(Put(Key(0), "")); - test::SleepingBackgroundTask sleeping_task_low; + std::vector sleeping_task_low( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); // Block compactions - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3. for (int i = 0; i < 3; i++) { @@ -6571,9 +6587,11 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); - sleeping_task_low.Reset(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + sleeping_task.Reset(); + } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Now there is one L1 file but doesn't trigger soft_rate_limit @@ -6590,14 +6608,16 @@ TEST_F(DBTest, SoftLimit) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "BackgroundCallCompaction:0", [&](void* /*arg*/) { // Schedule a sleeping task. - sleeping_task_low.Reset(); + sleeping_task_low[0].Reset(); env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_low, Env::Priority::LOW); + &sleeping_task_low[0], Env::Priority::LOW); }); - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - sleeping_task_low.WaitUntilSleeping(); + for (auto& sleeping_task : sleeping_task_low) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + sleeping_task.WaitUntilSleeping(); + } // Create 3 L0 files, making score of L0 to be 3 for (int i = 0; i < 3; i++) { ASSERT_OK(Put(Key(10 + i), std::string(5000, 'x'))); @@ -6611,8 +6631,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB // Given level multiplier 10, estimated pending compaction is around 100KB @@ -6633,8 +6653,8 @@ TEST_F(DBTest, SoftLimit) { // Wake up sleep task to enable compaction to run and waits // for it to go to sleep state again to make sure one compaction // goes through. - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); // Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB // L2 size is 360KB, so the estimated level fanout 4, estimated pending @@ -6644,8 +6664,8 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WakeUp(); + sleeping_task_low[0].WaitUntilSleeping(); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); @@ -6660,10 +6680,12 @@ TEST_F(DBTest, SoftLimit) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); - sleeping_task_low.WaitUntilSleeping(); + sleeping_task_low[0].WaitUntilSleeping(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_low) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } } TEST_F(DBTest, LastWriteBufferDelay) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f681ec3be6..cd7ce9e7d6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1496,6 +1496,9 @@ TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { options.track_and_verify_wals_in_manifest = true; // The following make sure there are two bg flush threads. options.max_background_jobs = 8; + options.max_background_compactions = options.max_background_flushes = -1; + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + env_->SetBackgroundThreads(1, Env::Priority::LOW); const std::string cf1_name("cf1"); CreateAndReopenWithCF({cf1_name}, options); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 0214cc6324..62dc319740 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -250,9 +250,12 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { ASSERT_OK(db_->CompactRange(compact_options, &first_slice, &last_slice)); // 3 sst after compaction with live iterator CheckFileTypeCounts(dbname_, 0, 3, 1); - test::SleepingBackgroundTask sleeping_task_before; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_before, Env::Priority::LOW); + std::vector sleeping_task_before( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_before) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } delete itr; test::SleepingBackgroundTask sleeping_task_after; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, @@ -260,14 +263,19 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) { // Make sure no purges are executed foreground CheckFileTypeCounts(dbname_, 0, 3, 1); - sleeping_task_before.WakeUp(); - sleeping_task_before.WaitUntilDone(); + sleeping_task_before[0].WakeUp(); + sleeping_task_before[0].WaitUntilDone(); // Make sure all background purges are executed sleeping_task_after.WakeUp(); sleeping_task_after.WaitUntilDone(); // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); + + for (size_t i = 1; i < sleeping_task_before.size(); ++i) { + sleeping_task_before[i].WakeUp(); + sleeping_task_before[i].WaitUntilDone(); + } } TEST_F(DeleteFileTest, PurgeDuringOpen) { @@ -332,16 +340,21 @@ TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) { CheckFileTypeCounts(dbname_, 0, 1, 1); delete cfh; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } // If background purge is enabled, the file should still be there. CheckFileTypeCounts(dbname_, 0, bg_purge ? 1 : 0, 1); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeCFDropTest:1"); // Execute background purges. - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // The file should have been deleted. CheckFileTypeCounts(dbname_, 0, 0, 1); }; @@ -401,13 +414,18 @@ TEST_F(DeleteFileTest, BackgroundPurgeCopyOptions) { CheckFileTypeCounts(dbname_, 0, 3, 1); delete itr; - test::SleepingBackgroundTask sleeping_task_after; - env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, - &sleeping_task_after, Env::Priority::LOW); + std::vector sleeping_task_after( + std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW))); + for (auto& sleeping_task : sleeping_task_after) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task, Env::Priority::LOW); + } // Make sure all background purges are executed - sleeping_task_after.WakeUp(); - sleeping_task_after.WaitUntilDone(); + for (auto& sleeping_task : sleeping_task_after) { + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + } // 1 sst after iterator deletion CheckFileTypeCounts(dbname_, 0, 1, 1); } @@ -447,9 +465,13 @@ TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); delete itr1; + for (int i = 0; i < std::max(1, env_->GetBackgroundThreads(Env::Priority::LOW)); ++i) { + env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::LOW); + } env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::HIGH); delete itr2; env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::HIGH); + env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::LOW); Close(); TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b9bc15753d..2b0ade3808 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -672,10 +672,10 @@ struct DBOptions { // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads // - // Default: -1 + // Default: 8 // // Dynamically changeable through SetDBOptions() API. - int max_background_compactions = -1; + int max_background_compactions = 8; // This value represents the maximum number of threads that will // concurrently perform a compaction job by breaking it into multiple,