diff --git a/CMakeLists.txt b/CMakeLists.txt index 47dfbce0ce..7aaad9b666 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -743,6 +743,7 @@ set(SOURCES db/db_impl/db_impl_experimental.cc db/db_impl/db_impl_readonly.cc db/db_impl/db_impl_secondary.cc + db/db_impl/compact_range_threads_mngr.cc db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc diff --git a/HISTORY.md b/HISTORY.md index e48d0e258c..6253459afa 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ## Unreleased ### New Features +* Non-Blocking Manual Compaction (CompactRange()) - Support non-blocking manual compactions by setting a new CompactRangeOptions option (async_completion_cb). When set, the CompactRange() call will return control to the caller immediately. The manual compaction iteslf will be performed in an internally created thread. The manual compaction will ALWAYS call the specified callback upon completion and provide the completion status (#597). ### Enhancements diff --git a/TARGETS b/TARGETS index 33e9189bca..cb5c78209b 100644 --- a/TARGETS +++ b/TARGETS @@ -53,6 +53,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/compaction/subcompaction_state.cc", "db/convenience.cc", "db/db_filesnapshot.cc", + "db/db_impl/compact_range_threads_mngr.cc", "db/db_impl/compacted_db_impl.cc", "db/db_impl/db_impl.cc", "db/db_impl/db_impl_compaction_flush.cc", @@ -404,6 +405,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/compaction/subcompaction_state.cc", "db/convenience.cc", "db/db_filesnapshot.cc", + "db/db_impl/compact_range_threads_mngr.cc", "db/db_impl/compacted_db_impl.cc", "db/db_impl/db_impl.cc", "db/db_impl/db_impl_compaction_flush.cc", diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 2c054009b1..e114d52e49 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -7,7 +7,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include +#include +#include #include +#include #include "compaction/compaction_picker_universal.h" #include "db/blob/blob_index.h" @@ -116,6 +123,251 @@ class DBCompactionTest : public DBTestBase { } }; +namespace { + +using CbFuture = std::future; +using ValidateCompletionStatusFunc = + std::function; + +void DefaultCompletionStatusValidation(Status completion_status, + bool expect_success, + Status* expected_completion_status) { + if (expect_success) { + ASSERT_OK(completion_status); + } else { + ASSERT_NOK(completion_status); + if (expected_completion_status != nullptr) { + ASSERT_EQ(completion_status, *expected_completion_status) + << "actual:" << completion_status.ToString() + << ", expected:" << expected_completion_status->ToString(); + } + } +} + +struct CompactRangeCompleteCb : public CompactRangeCompletedCbIf { + CompactRangeCompleteCb(bool expect_success, + Status* expected_completion_status, + std::atomic* num_times_cb_called) + : num_times_cb_called_(num_times_cb_called) { + if (expected_completion_status != nullptr) { + if (expect_success) { + assert(expected_completion_status->ok()); + } else { + assert(expected_completion_status->ok() == false); + } + } + + validate_completion_status_func_ = + std::bind(DefaultCompletionStatusValidation, std::placeholders::_1, + expect_success, expected_completion_status); + } + + CompactRangeCompleteCb(ValidateCompletionStatusFunc validation_func, + std::atomic* num_times_cb_called) + : validate_completion_status_func_(validation_func), + num_times_cb_called_(num_times_cb_called) { + my_promise_ = std::make_unique>(); + } + + ~CompactRangeCompleteCb() = default; + + CbFuture GetFuture() { return my_promise_->get_future(); } + + void CompletedCb(Status completion_status) override { + validate_completion_status_func_(completion_status); + ++(*num_times_cb_called_); + my_promise_->set_value(completion_status); + } + + ValidateCompletionStatusFunc validate_completion_status_func_; + std::atomic* num_times_cb_called_ = nullptr; + std::unique_ptr> my_promise_; +}; + +using CbPtr = std::shared_ptr; + +struct CompactRangeHelper { + CompactRangeHelper(bool blocking) : blocking_(blocking) {} + virtual ~CompactRangeHelper() = default; + + void TearDown() { + ASSERT_EQ(num_times_cb_called_, num_times_nb_compact_range_called_); + } + + // The following 3 MyCompactRange() overloads are compatible with the 3 + // DBTestBase::Compact() overloads + CbPtr MyCompact(int cf, const Slice& start, const Slice& limit, + uint32_t target_path_id, + bool wait_for_compact_range_to_complete = true) { + CompactRangeOptions compact_options; + compact_options.target_path_id = target_path_id; + return MyCompactRange(compact_options, GetCfHandle(cf), &start, &limit, + true /* expect_sucsess */, nullptr, + wait_for_compact_range_to_complete); + } + + CbPtr MyCompact(int cf, const Slice& start, const Slice& limit, + bool wait_for_compact_range_to_complete = true) { + return MyCompactRange(CompactRangeOptions(), GetCfHandle(cf), &start, + &limit, true /* expect_sucsess */, nullptr, + wait_for_compact_range_to_complete); + } + + CbPtr MyCompact(const Slice& start, const Slice& limit, + bool wait_for_compact_range_to_complete = true) { + return MyCompactRange(CompactRangeOptions(), nullptr /* cf_handle */, + &start, &limit, true /* expect_sucsess */, nullptr, + wait_for_compact_range_to_complete); + } + + CbPtr MyCompactRange(CompactRangeOptions compact_range_options, + const Slice* begin, const Slice* end, + bool expect_success, + Status* expected_completion_status = nullptr, + bool wait_for_compact_range_to_complete = true) { + auto cb_ptr = + MyCompactRange(compact_range_options, nullptr /* cf_handle */, begin, + end, expect_success, expected_completion_status, + wait_for_compact_range_to_complete); + if (cb_ptr != nullptr) { + assert(cb_to_future_map_.find(cb_ptr) != cb_to_future_map_.end()); + } + return cb_ptr; + } + + CbPtr MyCompactRange( + CompactRangeOptions compact_range_options, const Slice* begin, + const Slice* end, + ValidateCompletionStatusFunc validation_completion_status_func, + bool wait_for_compact_range_to_complete = true) { + auto cb_ptr = MyCompactRange(compact_range_options, nullptr /* cf_handle */, + begin, end, validation_completion_status_func, + wait_for_compact_range_to_complete); + if (cb_ptr != nullptr) { + assert(cb_to_future_map_.find(cb_ptr) != cb_to_future_map_.end()); + } + return cb_ptr; + } + + CbPtr MyCompactRange(CompactRangeOptions compact_range_options, + ColumnFamilyHandle* cf_handle, const Slice* begin, + const Slice* end, bool expect_success, + Status* expected_completion_status = nullptr, + bool wait_for_compact_range_to_complete = true) { + auto validate_completion_status_func = + std::bind(DefaultCompletionStatusValidation, std::placeholders::_1, + expect_success, expected_completion_status); + return MyCompactRange(compact_range_options, cf_handle, begin, end, + validate_completion_status_func, + wait_for_compact_range_to_complete); + } + + // Use a void helper function so we may call ASSERT_XXX gtest macros + void CompactRangeNonBlockingHelper(CbPtr completion_cb, + CompactRangeOptions& compact_range_options, + ColumnFamilyHandle* cf_handle, + const Slice* begin, const Slice* end) { + compact_range_options.async_completion_cb = completion_cb; + + Status status; + if (cf_handle == nullptr) { + status = GetDb()->CompactRange(compact_range_options, begin, end); + } else { + status = + GetDb()->CompactRange(compact_range_options, cf_handle, begin, end); + } + ASSERT_OK(status); + ++num_times_nb_compact_range_called_; + } + + CbPtr MyCompactRange( + CompactRangeOptions compact_range_options, ColumnFamilyHandle* cf_handle, + const Slice* begin, const Slice* end, + ValidateCompletionStatusFunc validate_completion_status_func, + bool wait_for_compact_range_to_complete = true) { + if (blocking_) { + CbPtr completion_cb = std::make_shared( + validate_completion_status_func, &num_times_cb_called_); + + CompactRangeNonBlockingHelper(completion_cb, compact_range_options, + cf_handle, begin, end); + + { + std::lock_guard lock(map_mutex_); + auto cb_future = + static_cast(completion_cb.get()) + ->GetFuture(); + + cb_to_future_map_[completion_cb] = std::move(cb_future); + } + + if (wait_for_compact_range_to_complete) { + WaitForCompactRangeToComplete(completion_cb); + return nullptr; + } else { + return completion_cb; + } + + } else { + // BLOCKING + Status status; + if (cf_handle == nullptr) { + status = GetDb()->CompactRange(compact_range_options, begin, end); + } else { + status = + GetDb()->CompactRange(compact_range_options, cf_handle, begin, end); + } + + validate_completion_status_func(status); + return {}; + } + } + + void WaitForCompactRangeToComplete(CbPtr cb_ptr) { + if (cb_ptr == nullptr) { + return; + } + + std::lock_guard lock(map_mutex_); + + auto cb_map_iter = cb_to_future_map_.find(cb_ptr); + ASSERT_NE(cb_map_iter, cb_to_future_map_.end()); + + auto& my_future = cb_map_iter->second; + auto future_wait_status = my_future.wait_for(std::chrono::seconds(10)); + ASSERT_EQ(future_wait_status, std::future_status::ready) + << "Future Status:" << static_cast(future_wait_status); + + cb_to_future_map_.erase(cb_ptr); + } + + virtual DBImpl* GetDb() = 0; + virtual ColumnFamilyHandle* GetCfHandle(int cf) = 0; + + bool blocking_ = false; + std::atomic num_times_nb_compact_range_called_ = 0U; + std::atomic num_times_cb_called_ = 0U; + std::mutex map_mutex_; + std::unordered_map cb_to_future_map_; +}; + +#define CR_HELPER_OVERRIDES \ + void TearDown() override { CompactRangeHelper::TearDown(); } \ + \ + DBImpl* GetDb() override { return dbfull(); }; \ + ColumnFamilyHandle* GetCfHandle(int cf) override { return handles_[cf]; }; + +} // namespace + +class DBCompactionTestWithMCC : public DBCompactionTest, + public CompactRangeHelper, + public testing::WithParamInterface { + public: + DBCompactionTestWithMCC() : CompactRangeHelper(GetParam()) {} + + CR_HELPER_OVERRIDES; +}; + class DBCompactionTestWithParam : public DBTestBase, public testing::WithParamInterface> { @@ -134,22 +386,56 @@ class DBCompactionTestWithParam bool exclusive_manual_compaction_; }; +class DBCompactionTestWithParamWithMCC + : public DBTestBase, + public CompactRangeHelper, + public testing::WithParamInterface> { + public: + DBCompactionTestWithParamWithMCC() + : DBTestBase("db_compaction_test", /*env_do_fsync=*/true), + CompactRangeHelper(std::get<2>(GetParam())) { + max_subcompactions_ = std::get<0>(GetParam()); + exclusive_manual_compaction_ = std::get<1>(GetParam()); + } + + // Required if inheriting from testing::WithParamInterface<> + static void SetUpTestCase() {} + static void TearDownTestCase() { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + } + + CR_HELPER_OVERRIDES; + + uint32_t max_subcompactions_; + bool exclusive_manual_compaction_; +}; + class DBCompactionTestWithBottommostParam : public DBTestBase, - public testing::WithParamInterface { + public CompactRangeHelper, + public testing::WithParamInterface< + std::tuple> { public: DBCompactionTestWithBottommostParam() - : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) { - bottommost_level_compaction_ = GetParam(); + : DBTestBase("db_compaction_test", /*env_do_fsync=*/true), + CompactRangeHelper(std::get<1>(GetParam())) { + bottommost_level_compaction_ = std::get<0>(GetParam()); } + CR_HELPER_OVERRIDES; + BottommostLevelCompaction bottommost_level_compaction_; }; -class DBCompactionDirectIOTest : public DBCompactionTest, - public ::testing::WithParamInterface { +class DBCompactionDirectIOTest + : public DBCompactionTest, + public CompactRangeHelper, + public ::testing::WithParamInterface> { public: - DBCompactionDirectIOTest() : DBCompactionTest() {} + DBCompactionDirectIOTest() + : DBCompactionTest(), CompactRangeHelper(std::get<1>(GetParam())) {} + + CR_HELPER_OVERRIDES; }; // Param = true : target level is non-empty @@ -157,9 +443,13 @@ class DBCompactionDirectIOTest : public DBCompactionTest, // is not empty. class ChangeLevelConflictsWithAuto : public DBCompactionTest, - public ::testing::WithParamInterface { + public CompactRangeHelper, + public ::testing::WithParamInterface> { public: - ChangeLevelConflictsWithAuto() : DBCompactionTest() {} + ChangeLevelConflictsWithAuto() + : DBCompactionTest(), CompactRangeHelper(std::get<1>(GetParam())) {} + + CR_HELPER_OVERRIDES; }; // Param = true: grab the compaction pressure token (enable @@ -437,7 +727,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) { SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBCompactionTest, TestTableReaderForCompaction) { +TEST_P(DBCompactionTestWithMCC, TestTableReaderForCompaction) { Options options = CurrentOptions(); options.env = env_; options.max_open_files = 20; @@ -517,7 +807,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { cro.change_level = true; cro.target_level = 2; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); // Only verifying compaction outputs issues one table cache lookup // for both data block and range deletion block). // May preload table cache too. @@ -598,7 +888,7 @@ TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) { } } -TEST_F(DBCompactionTest, CompactRangeBottomPri) { +TEST_P(DBCompactionTestWithMCC, CompactRangeBottomPri) { ASSERT_OK(Put(Key(50), "")); ASSERT_OK(Flush()); ASSERT_OK(Put(Key(100), "")); @@ -610,7 +900,7 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 2; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } ASSERT_EQ("0,0,3", FilesPerLevel(0)); @@ -643,7 +933,7 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) { }); SyncPoint::GetInstance()->EnableProcessing(); env_->SetBackgroundThreads(1, Env::Priority::BOTTOM); - ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); ASSERT_EQ(1, low_pri_count); ASSERT_EQ(1, bottom_pri_count); ASSERT_EQ("0,0,2", FilesPerLevel(0)); @@ -651,12 +941,12 @@ TEST_F(DBCompactionTest, CompactRangeBottomPri) { // Recompact bottom most level uses bottom pool CompactRangeOptions cro; cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_EQ(1, low_pri_count); ASSERT_EQ(2, bottom_pri_count); env_->SetBackgroundThreads(0, Env::Priority::BOTTOM); - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); // Low pri pool is used if bottom pool has size 0. ASSERT_EQ(2, low_pri_count); ASSERT_EQ(2, bottom_pri_count); @@ -929,7 +1219,7 @@ TEST_F(DBCompactionTest, MinorCompactionsHappen) { } while (ChangeCompactOptions()); } -TEST_F(DBCompactionTest, UserKeyCrossFile1) { +TEST_P(DBCompactionTestWithMCC, UserKeyCrossFile1) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; options.level0_file_num_compaction_trigger = 3; @@ -949,7 +1239,8 @@ TEST_F(DBCompactionTest, UserKeyCrossFile1) { ASSERT_EQ("NOT_FOUND", Get("3")); // move both files down to l1 - ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); + ASSERT_EQ("NOT_FOUND", Get("3")); for (int i = 0; i < 3; i++) { @@ -962,7 +1253,7 @@ TEST_F(DBCompactionTest, UserKeyCrossFile1) { ASSERT_EQ("NOT_FOUND", Get("3")); } -TEST_F(DBCompactionTest, UserKeyCrossFile2) { +TEST_P(DBCompactionTestWithMCC, UserKeyCrossFile2) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; options.level0_file_num_compaction_trigger = 3; @@ -982,7 +1273,7 @@ TEST_F(DBCompactionTest, UserKeyCrossFile2) { ASSERT_EQ("NOT_FOUND", Get("3")); // move both files down to l1 - ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); ASSERT_EQ("NOT_FOUND", Get("3")); for (int i = 0; i < 3; i++) { @@ -995,7 +1286,7 @@ TEST_F(DBCompactionTest, UserKeyCrossFile2) { ASSERT_EQ("NOT_FOUND", Get("3")); } -TEST_F(DBCompactionTest, CompactionSstPartitioner) { +TEST_P(DBCompactionTestWithMCC, CompactionSstPartitioner) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; options.level0_file_num_compaction_trigger = 3; @@ -1016,7 +1307,7 @@ TEST_F(DBCompactionTest, CompactionSstPartitioner) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); // move both files down to l1 - ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); std::vector files; dbfull()->GetLiveFilesMetaData(&files); @@ -1025,7 +1316,7 @@ TEST_F(DBCompactionTest, CompactionSstPartitioner) { ASSERT_EQ("B", Get("bbbb1")); } -TEST_F(DBCompactionTest, CompactionSstPartitionWithManualCompaction) { +TEST_P(DBCompactionTestWithMCC, CompactionSstPartitionWithManualCompaction) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; options.level0_file_num_compaction_trigger = 3; @@ -1048,7 +1339,7 @@ TEST_F(DBCompactionTest, CompactionSstPartitionWithManualCompaction) { CompactRangeOptions compact_options; compact_options.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; - ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); // Check (compacted but no partitioning yet) std::vector files; @@ -1065,7 +1356,7 @@ TEST_F(DBCompactionTest, CompactionSstPartitionWithManualCompaction) { // overlap with actual entries Slice from("000017"); Slice to("000019"); - ASSERT_OK(dbfull()->CompactRange(compact_options, &from, &to)); + MyCompactRange(compact_options, &from, &to, true); // Check (no partitioning yet) files.clear(); @@ -1079,7 +1370,7 @@ TEST_F(DBCompactionTest, CompactionSstPartitionWithManualCompaction) { // NOTE: `to` is INCLUSIVE from = Slice("000019"); to = Slice("000020"); - ASSERT_OK(dbfull()->CompactRange(compact_options, &from, &to)); + MyCompactRange(compact_options, &from, &to, true); // Check (must be partitioned) files.clear(); @@ -1229,7 +1520,7 @@ TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) { } while (ChangeOptions()); } -TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { +TEST_P(DBCompactionTestWithParamWithMCC, TrivialMoveOneFile) { int32_t trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:TrivialMove", @@ -1265,7 +1556,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { cro.exclusive_manual_compaction = exclusive_manual_compaction_; // Compaction will initiate a trivial move from L0 to L1 - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); // File moved From L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 @@ -1285,7 +1576,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { +TEST_P(DBCompactionTestWithParamWithMCC, TrivialMoveNonOverlappingFiles) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -1328,7 +1619,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { // Since data is non-overlapping we expect compaction to initiate // a trivial move - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); // We expect that all the files were trivially moved from L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); @@ -1366,7 +1657,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ASSERT_OK(Flush()); } - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); for (size_t i = 0; i < ranges.size(); i++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { @@ -1379,7 +1670,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { +TEST_P(DBCompactionTestWithParamWithMCC, TrivialMoveTargetLevel) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -1423,7 +1714,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { compact_options.change_level = true; compact_options.target_level = 6; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -1438,7 +1729,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { } } -TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) { +TEST_P(DBCompactionTestWithParamWithMCC, PartialOverlappingL0) { class SubCompactionEventListener : public EventListener { public: void OnSubcompactionCompleted(const SubcompactionJobInfo&) override { @@ -1463,7 +1754,7 @@ TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) { ASSERT_OK(Put("key", "")); ASSERT_OK(Put("kez", "")); ASSERT_OK(Flush()); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); // Ranges that are only briefly overlapping so that they won't be trivially // moved but subcompaction ranges would only contain a subset of files. @@ -1506,7 +1797,7 @@ TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) { } } -TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { +TEST_P(DBCompactionTestWithParamWithMCC, ManualCompactionPartial) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -1573,7 +1864,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { compact_options.target_level = 6; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; // Trivial move the two non-overlapping files to level 6 - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -1608,7 +1899,10 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { Slice begin(begin_string); Slice end(end_string); // First non-trivial compaction is triggered - ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + auto cb_handle = + MyCompactRange(compact_options, &begin, &end, true, nullptr, + false /* wait_for_compact_range_to_complete */); + WaitForCompactRangeToComplete(cb_handle); }); TEST_SYNC_POINT("DBCompaction::ManualPartial:1"); @@ -1777,7 +2071,7 @@ TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) { } } -TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) { +TEST_P(DBCompactionTestWithMCC, ManualCompactionWithUnorderedWrite) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL", "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"}, @@ -1796,7 +2090,7 @@ TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) { TEST_SYNC_POINT( "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); writer.join(); ASSERT_EQ(Get("foo"), "v2"); @@ -1808,7 +2102,7 @@ TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) { ASSERT_EQ(Get("foo"), "v2"); } -TEST_F(DBCompactionTest, DeleteFileRange) { +TEST_P(DBCompactionTestWithMCC, DeleteFileRange) { Options options = CurrentOptions(); options.write_buffer_size = 10 * 1024 * 1024; options.max_bytes_for_level_multiplier = 2; @@ -1842,7 +2136,7 @@ TEST_F(DBCompactionTest, DeleteFileRange) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 2; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); // 2 files in L2 ASSERT_EQ("0,0,2", FilesPerLevel(0)); @@ -1910,7 +2204,7 @@ TEST_F(DBCompactionTest, DeleteFileRange) { // Note that we don't delete level 0 files compact_options.change_level = true; compact_options.target_level = 1; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK( @@ -1928,7 +2222,7 @@ TEST_F(DBCompactionTest, DeleteFileRange) { ASSERT_GT(old_num_files, new_num_files); } -TEST_F(DBCompactionTest, DeleteFilesInRanges) { +TEST_P(DBCompactionTestWithMCC, DeleteFilesInRanges) { Options options = CurrentOptions(); options.write_buffer_size = 10 * 1024 * 1024; options.max_bytes_for_level_multiplier = 2; @@ -1955,7 +2249,7 @@ TEST_F(DBCompactionTest, DeleteFilesInRanges) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 2; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_EQ("0,0,10", FilesPerLevel(0)); // file [0 => 100), [200 => 300), ... [800, 900) @@ -2098,7 +2392,7 @@ TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) { db_->ReleaseSnapshot(snapshot); } -TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { +TEST_P(DBCompactionTestWithParamWithMCC, TrivialMoveToLastLevelWithFiles) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -2131,7 +2425,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { compact_options.change_level = true; compact_options.target_level = 3; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 1); ASSERT_EQ(non_trivial_move, 0); @@ -2147,7 +2441,7 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { CompactRangeOptions cro; cro.exclusive_manual_compaction = exclusive_manual_compaction_; // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 4); ASSERT_EQ(non_trivial_move, 0); @@ -2524,7 +2818,7 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) { Destroy(options, true); } -TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { +TEST_P(DBCompactionTestWithParamWithMCC, ConvertCompactionStyle) { Random rnd(301); int max_key_level_insert = 200; int max_key_universal_insert = 600; @@ -2583,8 +2877,7 @@ TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { compact_options.bottommost_level_compaction = BottommostLevelCompaction::kForce; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; - ASSERT_OK( - dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + MyCompactRange(compact_options, handles_[1], nullptr, nullptr, true); // Only 1 file in L0 ASSERT_EQ("1", FilesPerLevel(1)); @@ -2680,7 +2973,7 @@ TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) { } while (ChangeCompactOptions()); } -TEST_F(DBCompactionTest, ManualAutoRace) { +TEST_P(DBCompactionTestWithMCC, ManualAutoRace) { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"}, @@ -2714,7 +3007,7 @@ TEST_F(DBCompactionTest, ManualAutoRace) { // before processing so that it will be cancelled. CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - ASSERT_OK(dbfull()->CompactRange(cro, handles_[1], nullptr, nullptr)); + MyCompactRange(cro, handles_[1], nullptr, nullptr, true); ASSERT_EQ("0,1", FilesPerLevel(1)); // Eventually the cancelled compaction will be rescheduled and executed. @@ -2723,7 +3016,7 @@ TEST_F(DBCompactionTest, ManualAutoRace) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_P(DBCompactionTestWithParam, ManualCompaction) { +TEST_P(DBCompactionTestWithParamWithMCC, ManualCompaction) { Options options = CurrentOptions(); options.max_subcompactions = max_subcompactions_; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); @@ -2736,15 +3029,15 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { ASSERT_EQ("1,1,1", FilesPerLevel(1)); // Compaction range falls before files - Compact(1, "", "c"); + MyCompact(1, "", "c"); ASSERT_EQ("1,1,1", FilesPerLevel(1)); // Compaction range falls after files - Compact(1, "r", "z"); + MyCompact(1, "r", "z"); ASSERT_EQ("1,1,1", FilesPerLevel(1)); // Compaction range overlaps files - Compact(1, "p", "q"); + MyCompact(1, "p", "q"); ASSERT_EQ("0,0,1", FilesPerLevel(1)); // Populate a different range @@ -2752,7 +3045,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { ASSERT_EQ("1,1,2", FilesPerLevel(1)); // Compact just the new range - Compact(1, "b", "f"); + MyCompact(1, "b", "f"); ASSERT_EQ("0,0,2", FilesPerLevel(1)); // Compact all @@ -2763,7 +3056,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { options.statistics->getTickerCount(BLOCK_CACHE_ADD); CompactRangeOptions cro; cro.exclusive_manual_compaction = exclusive_manual_compaction_; - ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr)); + MyCompactRange(cro, handles_[1], nullptr, nullptr, true); // Verify manual compaction doesn't fill block cache ASSERT_EQ(prev_block_cache_add, options.statistics->getTickerCount(BLOCK_CACHE_ADD)); @@ -2781,7 +3074,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompaction) { } } -TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { +TEST_P(DBCompactionTestWithParamWithMCC, ManualLevelCompactionOutputPathId) { Options options = CurrentOptions(); options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); @@ -2802,15 +3095,17 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { ASSERT_EQ(0, GetSstFileCount(dbname_)); // Compaction range falls before files - Compact(1, "", "c"); + MyCompact(1, "", "c"); ASSERT_EQ("3", FilesPerLevel(1)); // Compaction range falls after files - Compact(1, "r", "z"); + MyCompact(1, "r", "z"); ASSERT_EQ("3", FilesPerLevel(1)); + uint32_t target_path_id = 1U; + // Compaction range overlaps files - Compact(1, "p", "q", 1); + MyCompact(1, "p", "q", target_path_id); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); @@ -2826,7 +3121,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { ASSERT_EQ("3,1", FilesPerLevel(1)); // Compact just the new range - Compact(1, "b", "f", 1); + MyCompact(1, "b", "f", target_path_id); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,2", FilesPerLevel(1)); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); @@ -2843,8 +3138,7 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { CompactRangeOptions compact_options; compact_options.target_path_id = 1; compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; - ASSERT_OK( - db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + MyCompactRange(compact_options, handles_[1], nullptr, nullptr, true); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ("0,1", FilesPerLevel(1)); @@ -2866,15 +3160,15 @@ TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { } } -TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) { +TEST_P(DBCompactionTestWithMCC, FilesDeletedAfterCompaction) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v2")); - Compact(1, "a", "z"); + MyCompact(1, "a", "z"); const size_t num_files = CountLiveFiles(); for (int i = 0; i < 10; i++) { ASSERT_OK(Put(1, "foo", "v2")); - Compact(1, "a", "z"); + MyCompact(1, "a", "z"); } ASSERT_EQ(CountLiveFiles(), num_files); } while (ChangeCompactOptions()); @@ -3244,7 +3538,7 @@ TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) { // TODO(aekmekji): Make sure that the reason this fails when run with // max_subcompactions > 1 is not a correctness issue but just inherent to // running parallel L0-L1 compactions -TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { +TEST_P(DBCompactionTestWithMCC, SuggestCompactRangeNoTwoLevel0Compactions) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; options.write_buffer_size = 110 << 10; @@ -3264,7 +3558,7 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { for (int num = 0; num < 10; num++) { GenerateNewRandomFile(&rnd); } - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, true); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"CompactionJob::Run():Start", @@ -3305,7 +3599,7 @@ static std::string ShortKey(int i) { return std::string(buf); } -TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { +TEST_P(DBCompactionTestWithParamWithMCC, ForceBottommostLevelCompaction) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -3357,7 +3651,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = 3; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 1); ASSERT_EQ(non_trivial_move, 0); @@ -3375,7 +3669,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { compact_options = CompactRangeOptions(); compact_options.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 4); ASSERT_EQ(non_trivial_move, 1); @@ -3395,7 +3689,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { BottommostLevelCompaction::kSkip; // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) // and will skip bottommost level compaction - ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + MyCompactRange(compact_options, nullptr, nullptr, true); ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); ASSERT_EQ(trivial_move, 3); ASSERT_EQ(non_trivial_move, 0); @@ -3596,7 +3890,7 @@ TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) { Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); } -TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) { +TEST_P(DBCompactionTestWithParamWithMCC, CancelCompactionWaitingOnConflict) { // This test verifies cancellation of a compaction waiting to be scheduled due // to conflict with a running compaction. // @@ -3635,8 +3929,9 @@ TEST_F(DBCompactionTest, CancelCompactionWaitingOnConflict) { "DBCompactionTest::CancelCompactionWaitingOnConflict:" "PreDisableManualCompaction"}}); auto manual_compaction_thread = port::Thread([this]() { - ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr) - .IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, false, + &expected_completion_status); }); // Cancel it. Thread should be joinable, i.e., manual compaction was unblocked @@ -4918,7 +5213,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) { } } -TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { +TEST_P(DBCompactionTestWithParamWithMCC, CompactRangeDelayedByL0FileCount) { // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual // compaction only triggers flush after it's sure stall won't be triggered for // L0 file count going too high. @@ -4962,7 +5257,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { auto manual_compaction_thread = port::Thread([this]() { CompactRangeOptions cro; cro.allow_write_stall = false; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); }); manual_compaction_thread.join(); @@ -4973,7 +5268,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { } } -TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { +TEST_P(DBCompactionTestWithMCC, CompactRangeDelayedByImmMemTableCount) { // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual // compaction only triggers flush after it's sure stall won't be triggered for // immutable memtable count going too high. @@ -5019,7 +5314,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { auto manual_compaction_thread = port::Thread([this]() { CompactRangeOptions cro; cro.allow_write_stall = false; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); }); manual_compaction_thread.join(); @@ -5030,7 +5325,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { } } -TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { +TEST_P(DBCompactionTestWithMCC, CompactRangeShutdownWhileDelayed) { // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay // does not hang if CF is dropped or DB is closed const int kNumL0FilesTrigger = 4; @@ -5065,11 +5360,13 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { CompactRangeOptions cro; cro.allow_write_stall = false; if (i == 0) { - ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) - .IsColumnFamilyDropped()); + auto expected_completion_status = Status::ColumnFamilyDropped(); + MyCompactRange(cro, handles_[1], nullptr, nullptr, false, + &expected_completion_status); } else { - ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) - .IsShutdownInProgress()); + auto expected_completion_status = Status::ShutdownInProgress(); + MyCompactRange(cro, handles_[1], nullptr, nullptr, false, + &expected_completion_status); } }); @@ -5088,7 +5385,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { } } -TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { +TEST_P(DBCompactionTestWithMCC, CompactRangeSkipFlushAfterDelay) { // Verify that, when `CompactRangeOptions::allow_write_stall == false`, // CompactRange skips its flush if the delay is long enough that the memtables // existing at the beginning of the call have already been flushed. @@ -5123,7 +5420,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { auto manual_compaction_thread = port::Thread([this]() { CompactRangeOptions cro; cro.allow_write_stall = false; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); }); TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"); @@ -5144,7 +5441,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) { +TEST_P(DBCompactionTestWithMCC, CompactRangeFlushOverlappingMemtable) { // Verify memtable only gets flushed if it contains data overlapping the range // provided to `CompactRange`. Tests all kinds of overlap/non-overlap. const int kNumEndpointKeys = 5; @@ -5178,8 +5475,7 @@ TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) { ASSERT_OK(Put("b", "val")); ASSERT_OK(Put("d", "val")); CompactRangeOptions compact_range_opts; - ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr)); - + MyCompactRange(compact_range_opts, begin_ptr, end_ptr, true); uint64_t get_prop_tmp, num_memtable_entries = 0; ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables, &get_prop_tmp)); @@ -5224,7 +5520,7 @@ TEST_F(DBCompactionTest, CompactionStatsTest) { VerifyCompactionStats(*cfd, *collector); } -TEST_F(DBCompactionTest, SubcompactionEvent) { +TEST_P(DBCompactionTestWithMCC, SubcompactionEvent) { class SubCompactionEventListener : public EventListener { public: void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override { @@ -5307,8 +5603,7 @@ TEST_F(DBCompactionTest, SubcompactionEvent) { CompactRangeOptions comp_opts; comp_opts.max_subcompactions = 4; - Status s = dbfull()->CompactRange(comp_opts, nullptr, nullptr); - ASSERT_OK(s); + MyCompactRange(comp_opts, nullptr, nullptr, true); ASSERT_OK(dbfull()->TEST_WaitForCompact()); // make sure there's no running compaction ASSERT_EQ(listener->GetRunningCompactionCount(), 0); @@ -5402,7 +5697,7 @@ TEST_F(DBCompactionTest, CompactionHasEmptyOutput) { ASSERT_EQ(2, collector->num_ssts_creation_started()); } -TEST_F(DBCompactionTest, CompactionLimiter) { +TEST_P(DBCompactionTestWithMCC, CompactionLimiter) { const int kNumKeysPerFile = 10; const int kMaxBackgroundThreads = 64; @@ -5584,7 +5879,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test])); ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test)); - Compact(cf_test, Key(0), Key(keyIndex)); + MyCompact(cf_test, Key(0), Key(keyIndex)); ASSERT_OK(dbfull()->TEST_WaitForCompact()); } @@ -5594,12 +5889,23 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, std::make_tuple(4, true), std::make_tuple(4, false))); +INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParamWithMCC, + DBCompactionTestWithParamWithMCC, + ::testing::Values(std::make_tuple(1, true, false), + std::make_tuple(1, true, true), + std::make_tuple(1, false, false), + std::make_tuple(1, false, true), + std::make_tuple(4, true, false), + std::make_tuple(4, true, true), + std::make_tuple(4, false, false), + std::make_tuple(4, false, true))); + TEST_P(DBCompactionDirectIOTest, DirectIO) { Options options = CurrentOptions(); Destroy(options); options.create_if_missing = true; options.disable_auto_compactions = true; - options.use_direct_io_for_flush_and_compaction = GetParam(); + options.use_direct_io_for_flush_and_compaction = std::get<0>(GetParam()); options.env = MockEnv::Create(Env::Default()); Reopen(options); bool readahead = false; @@ -5617,7 +5923,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { CreateAndReopenWithCF({"pikachu"}, options); MakeTables(3, "p", "q", 1); ASSERT_EQ("1,1,1", FilesPerLevel(1)); - Compact(1, "p", "q"); + MyCompact(1, "p", "q"); ASSERT_EQ(readahead, options.use_direct_reads); ASSERT_EQ("0,0,1", FilesPerLevel(1)); Destroy(options); @@ -5625,7 +5931,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { } INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest, - testing::Bool()); + ::testing::Combine(testing::Bool(), ::testing::Bool())); class CompactionPriTest : public DBTestBase, public testing::WithParamInterface { @@ -6069,7 +6375,7 @@ class NoopMergeOperator : public MergeOperator { const char* Name() const override { return "Noop"; } }; -TEST_F(DBCompactionTest, PartialManualCompaction) { +TEST_P(DBCompactionTestWithMCC, PartialManualCompaction) { Options opts = CurrentOptions(); opts.num_levels = 3; opts.level0_file_num_compaction_trigger = 10; @@ -6096,10 +6402,10 @@ TEST_F(DBCompactionTest, PartialManualCompaction) { CompactRangeOptions cro; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } -TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) { +TEST_P(DBCompactionTestWithMCC, ManualCompactionFailsInReadOnlyMode) { // Regression test for bug where manual compaction hangs forever when the DB // is in read-only mode. Verify it now at least returns, despite failing. const int kNumL0Files = 4; @@ -6132,8 +6438,8 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) { cro.exclusive_manual_compaction = false; Slice begin_key("key1"); Slice end_key("key2"); - ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key)); - ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key)); + MyCompactRange(cro, &begin_key, &end_key, false); + MyCompactRange(cro, &begin_key, &end_key, false); // Close before mock_env destruct. Close(); @@ -6142,7 +6448,7 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) { // ManualCompactionBottomLevelOptimization tests the bottom level manual // compaction optimization to skip recompacting files created by Ln-1 to Ln // compaction -TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) { +TEST_P(DBCompactionTestWithMCC, ManualCompactionBottomLevelOptimized) { Options opts = CurrentOptions(); opts.num_levels = 3; opts.level0_file_num_compaction_trigger = 5; @@ -6183,7 +6489,7 @@ TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) { CompactRangeOptions cro; cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); const std::vector& comp_stats2 = internal_stats_ptr->TEST_GetCompactionStats(); @@ -6191,7 +6497,7 @@ TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) { ASSERT_EQ(num, 0); } -TEST_F(DBCompactionTest, ManualCompactionMax) { +TEST_P(DBCompactionTestWithMCC, ManualCompactionMax) { uint64_t l1_avg_size = 0, l2_avg_size = 0; auto generate_sst_func = [&]() { Random rnd(301); @@ -6242,7 +6548,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { generate_sst_func(); num_compactions.store(0); CompactRangeOptions cro; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_TRUE(num_compactions.load() == 1); // split the compaction to 5 @@ -6254,7 +6560,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { opts.target_file_size_base = total_size / num_split; Reopen(opts); num_compactions.store(0); - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_TRUE(num_compactions.load() == num_split); // very small max_compaction_bytes, it should still move forward @@ -6263,7 +6569,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { DestroyAndReopen(opts); generate_sst_func(); num_compactions.store(0); - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_TRUE(num_compactions.load() > 10); // dynamically set the option @@ -6278,11 +6584,11 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { ASSERT_OK(s); num_compactions.store(0); - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); ASSERT_TRUE(num_compactions.load() == num_split); } -TEST_F(DBCompactionTest, CompactionDuringShutdown) { +TEST_P(DBCompactionTestWithMCC, CompactionDuringShutdown) { Options opts = CurrentOptions(); opts.level0_file_num_compaction_trigger = 2; opts.disable_auto_compactions = true; @@ -6304,11 +6610,15 @@ TEST_F(DBCompactionTest, CompactionDuringShutdown) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", - [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); }); + [&](void* /*arg*/) { dbfull_shutting_down().store(true); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); - ASSERT_TRUE(s.ok() || s.IsShutdownInProgress()); - ASSERT_OK(dbfull()->error_handler_.GetBGError()); + + ValidateCompletionStatusFunc validate_func = [](Status s) { + ASSERT_TRUE(s.ok() || s.IsShutdownInProgress()); + }; + + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, validate_func); + ASSERT_OK(dbfull_error_handler().GetBGError()); } // FixFileIngestionCompactionDeadlock tests and verifies that compaction and @@ -6383,11 +6693,16 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { class DBCompactionTestWithOngoingFileIngestionParam : public DBCompactionTest, - public testing::WithParamInterface { + public CompactRangeHelper, + public testing::WithParamInterface> { public: - DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() { - compaction_path_to_test_ = GetParam(); + DBCompactionTestWithOngoingFileIngestionParam() + : DBCompactionTest(), CompactRangeHelper(std::get<1>(GetParam())) { + compaction_path_to_test_ = std::get<0>(GetParam()); } + + CR_HELPER_OVERRIDES; + void SetupOptions() { options_ = CurrentOptions(); options_.create_if_missing = true; @@ -6489,8 +6804,7 @@ class DBCompactionTestWithOngoingFileIngestionParam TEST_SYNC_POINT("PreCompaction"); // Without proper range conflict check, // this would have been `Status::Corruption` about overlapping ranges - Status s = dbfull()->CompactRange(cro, &start, &end); - EXPECT_OK(s); + MyCompactRange(cro, &start, &end, true); } else if (compaction_path_to_test_ == "RefitLevelCompactRange") { CompactRangeOptions cro; cro.change_level = true; @@ -6500,15 +6814,17 @@ class DBCompactionTestWithOngoingFileIngestionParam std::string end_key = "k4"; Slice end(end_key); TEST_SYNC_POINT("PreCompaction"); - Status s = dbfull()->CompactRange(cro, &start, &end); - // Without proper range conflict check, - // this would have been `Status::Corruption` about overlapping ranges - // To see this, remove the fix AND replace - // `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with - // `DBImpl::ReFitLevel:PostRegisterCompaction` - EXPECT_TRUE(s.IsNotSupported()); - EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") != - std::string::npos); + ValidateCompletionStatusFunc validate_func = [](Status s) { + // Without proper range conflict check, + // this would have been `Status::Corruption` about overlapping ranges + // To see this, remove the fix AND replace + // `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with + // `DBImpl::ReFitLevel:PostRegisterCompaction` + EXPECT_TRUE(s.IsNotSupported()); + EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") != + std::string::npos); + }; + MyCompactRange(cro, &start, &end, validate_func); } else if (compaction_path_to_test_ == "CompactFiles") { ColumnFamilyMetaData cf_meta_data; db_->GetColumnFamilyMetaData(&cf_meta_data); @@ -6542,12 +6858,14 @@ class DBCompactionTestWithOngoingFileIngestionParam std::shared_ptr sleeping_task_; }; -INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam, - DBCompactionTestWithOngoingFileIngestionParam, - ::testing::Values("AutoCompaction", - "NonRefitLevelCompactRange", - "RefitLevelCompactRange", - "CompactFiles")); +INSTANTIATE_TEST_CASE_P( + DBCompactionTestWithOngoingFileIngestionParam, + DBCompactionTestWithOngoingFileIngestionParam, + ::testing::Combine(::testing::Values("AutoCompaction", + "NonRefitLevelCompactRange", + "RefitLevelCompactRange", + "CompactFiles"), + ::testing::Bool())); TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) { SetupOptions(); @@ -6946,7 +7264,18 @@ class DBCompactionTestL0FilesMisorderCorruption : public DBCompactionTest { std::shared_ptr sleeping_task_; }; -TEST_F(DBCompactionTestL0FilesMisorderCorruption, +class DBCompactionTestL0FilesMisorderCorruptionWithMCC + : public DBCompactionTestL0FilesMisorderCorruption, + public CompactRangeHelper, + public testing::WithParamInterface { + public: + DBCompactionTestL0FilesMisorderCorruptionWithMCC() + : CompactRangeHelper(GetParam()) {} + + CR_HELPER_OVERRIDES; +}; + +TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithMCC, FlushAfterIntraL0LevelCompactionWithIngestedFile) { SetupOptions(CompactionStyle::kCompactionStyleLevel, ""); DestroyAndReopen(options_); @@ -6955,7 +7284,7 @@ TEST_F(DBCompactionTestL0FilesMisorderCorruption, ASSERT_OK(Put(Key(i), "")); // Prevents trivial move } ASSERT_OK(Flush()); - Compact("", Key(99)); + MyCompact("", Key(99)); ASSERT_EQ(0, NumTableFilesAtLevel(0)); // To get accurate NumTableFilesAtLevel(0) when the number reaches @@ -7205,6 +7534,17 @@ class DBCompactionTestL0FilesMisorderCorruptionWithParam : DBCompactionTestL0FilesMisorderCorruption() {} }; +class DBCompactionTestL0FilesMisorderCorruptionWithParamAndMCC + : public DBCompactionTestL0FilesMisorderCorruption, + public CompactRangeHelper, + public testing::WithParamInterface> { + public: + DBCompactionTestL0FilesMisorderCorruptionWithParamAndMCC() + : CompactRangeHelper(std::get<1>(GetParam())) {} + + CR_HELPER_OVERRIDES; +}; + // TODO: add `CompactionStyle::kCompactionStyleLevel` to testing parameter, // which requires careful unit test // design for ingesting file to L0 and CompactRange()/CompactFile() to L0 @@ -7214,6 +7554,17 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(CompactionStyle::kCompactionStyleUniversal, CompactionStyle::kCompactionStyleFIFO)); +// TODO: add `CompactionStyle::kCompactionStyleLevel` to testing parameter, +// which requires careful unit test +// design for ingesting file to L0 and CompactRange()/CompactFile() to L0 +INSTANTIATE_TEST_CASE_P( + DBCompactionTestL0FilesMisorderCorruptionWithParamAndMCC, + DBCompactionTestL0FilesMisorderCorruptionWithParamAndMCC, + ::testing::Combine( + ::testing::Values(CompactionStyle::kCompactionStyleUniversal, + CompactionStyle::kCompactionStyleFIFO), + ::testing::Bool())); + TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithParam, FlushAfterIntraL0CompactFileWithIngestedFile) { SetupOptions(GetParam(), "CompactFile"); @@ -7280,9 +7631,9 @@ TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithParam, Destroy(options_); } -TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithParam, +TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithParamAndMCC, FlushAfterIntraL0CompactRangeWithIngestedFile) { - SetupOptions(GetParam(), "CompactRange"); + SetupOptions(std::get<0>(GetParam()), "CompactRange"); DestroyAndReopen(options_); // To create below LSM tree @@ -7314,7 +7665,7 @@ TEST_P(DBCompactionTestL0FilesMisorderCorruptionWithParam, // (1) doesn't overlap with memtable therefore the memtable won't be flushed // (2) should target at compacting s0 with s1 and s2 Slice start("k3"), end("k5"); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + MyCompactRange(CompactRangeOptions(), &start, &end, true); // After compaction, we have LSM tree: // // memtable: m1 [ k2:new@4, k1:new@3] @@ -7368,7 +7719,7 @@ TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) { auto cro = CompactRangeOptions(); cro.bottommost_level_compaction = bottommost_level_compaction_; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce || bottommost_level_compaction_ == BottommostLevelCompaction::kForceOptimized) { @@ -7383,10 +7734,12 @@ TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) { INSTANTIATE_TEST_CASE_P( DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam, - ::testing::Values(BottommostLevelCompaction::kSkip, - BottommostLevelCompaction::kIfHaveCompactionFilter, - BottommostLevelCompaction::kForce, - BottommostLevelCompaction::kForceOptimized)); + ::testing::Combine( + ::testing::Values(BottommostLevelCompaction::kSkip, + BottommostLevelCompaction::kIfHaveCompactionFilter, + BottommostLevelCompaction::kForce, + BottommostLevelCompaction::kForceOptimized), + ::testing::Bool())); TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) { Options options = CurrentOptions(); @@ -7506,7 +7859,7 @@ TEST_P(ChangeLevelConflictsWithAuto, TestConflict) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 2; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } ASSERT_EQ("0,0,1", FilesPerLevel(0)); @@ -7541,10 +7894,10 @@ TEST_P(ChangeLevelConflictsWithAuto, TestConflict) { { CompactRangeOptions cro; cro.change_level = true; - cro.target_level = GetParam() ? 1 : 0; + cro.target_level = std::get<0>(GetParam()) ? 1 : 0; // This should return non-OK, but it's more important for the test to // make sure that the DB is not corrupted. - ASSERT_NOK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, false); } auto_comp.join(); // Refitting didn't happen. @@ -7555,7 +7908,8 @@ TEST_P(ChangeLevelConflictsWithAuto, TestConflict) { } INSTANTIATE_TEST_CASE_P(ChangeLevelConflictsWithAuto, - ChangeLevelConflictsWithAuto, testing::Bool()); + ChangeLevelConflictsWithAuto, + ::testing::Combine(testing::Bool(), ::testing::Bool())); TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) { // A `CompactRange()` with `change_level == true` needs to execute its final @@ -7650,7 +8004,7 @@ TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) { refit_level_thread.join(); } -TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { +TEST_P(DBCompactionTestWithMCC, ChangeLevelErrorPathTest) { // This test is added to ensure that RefitLevel() error paths are clearing // internal flags and to test that subsequent valid RefitLevel() calls // succeeds @@ -7672,7 +8026,7 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 2; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } ASSERT_EQ("0,0,2", FilesPerLevel(0)); @@ -7695,7 +8049,7 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 1; - ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end)); + MyCompactRange(cro, &begin, &end, true); } ASSERT_EQ("0,3,2", FilesPerLevel(0)); @@ -7711,7 +8065,7 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 1; - ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end)); + MyCompactRange(cro, &begin, &end, false); } ASSERT_EQ("0,3,2", FilesPerLevel(0)); @@ -7720,12 +8074,12 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 1; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } ASSERT_EQ("0,5", FilesPerLevel(0)); } -TEST_F(DBCompactionTest, CompactionWithBlob) { +TEST_P(DBCompactionTestWithMCC, CompactionWithBlob) { Options options; options.env = env_; options.disable_auto_compactions = true; @@ -7757,7 +8111,7 @@ TEST_F(DBCompactionTest, CompactionWithBlob) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + MyCompactRange(CompactRangeOptions(), begin, end, true); ASSERT_EQ(Get(first_key), third_value); ASSERT_EQ(Get(second_key), third_value); @@ -7808,17 +8162,24 @@ TEST_F(DBCompactionTest, CompactionWithBlob) { class DBCompactionTestBlobError : public DBCompactionTest, - public testing::WithParamInterface { + public CompactRangeHelper, + public testing::WithParamInterface> { public: - DBCompactionTestBlobError() : sync_point_(GetParam()) {} + DBCompactionTestBlobError() + : CompactRangeHelper(std::get<1>(GetParam())), + sync_point_(std::get<0>(GetParam())) {} + + CR_HELPER_OVERRIDES; std::string sync_point_; }; -INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError, - ::testing::ValuesIn(std::vector{ - "BlobFileBuilder::WriteBlobToFile:AddRecord", - "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); +INSTANTIATE_TEST_CASE_P( + DBCompactionTestBlobError, DBCompactionTestBlobError, + ::testing::Combine(::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"}), + ::testing::Bool())); TEST_P(DBCompactionTestBlobError, CompactionError) { Options options; @@ -7860,7 +8221,9 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError()); + auto expected_completion_status = Status::IOError(); + MyCompactRange(CompactRangeOptions(), begin, end, false, + &expected_completion_status); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -7907,18 +8270,23 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { class DBCompactionTestBlobGC : public DBCompactionTest, - public testing::WithParamInterface> { + public CompactRangeHelper, + public testing::WithParamInterface> { public: DBCompactionTestBlobGC() - : blob_gc_age_cutoff_(std::get<0>(GetParam())), + : CompactRangeHelper(std::get<2>(GetParam())), + blob_gc_age_cutoff_(std::get<0>(GetParam())), updated_enable_blob_files_(std::get<1>(GetParam())) {} + CR_HELPER_OVERRIDES; + double blob_gc_age_cutoff_; bool updated_enable_blob_files_; }; INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC, ::testing::Combine(::testing::Values(0.0, 0.5, 1.0), + ::testing::Bool(), ::testing::Bool())); TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) { @@ -7949,7 +8317,7 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) { cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce; cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_; - ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); // Check that the GC stats are correct { @@ -8038,7 +8406,7 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + MyCompactRange(CompactRangeOptions(), begin, end, true); ASSERT_EQ(Get(first_key), first_value); ASSERT_EQ(Get(second_key), second_value); @@ -8086,7 +8454,7 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { } } -TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) { +TEST_P(DBCompactionTestWithMCC, CompactionWithBlobGCError_CorruptIndex) { Options options; options.env = env_; options.disable_auto_compactions = true; @@ -8129,14 +8497,15 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_TRUE( - db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); + auto expected_completion_status = Status::Corruption(); + MyCompactRange(CompactRangeOptions(), begin, end, false, + &expected_completion_status); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } -TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) { +TEST_P(DBCompactionTestWithMCC, CompactionWithBlobGCError_InlinedTTLIndex) { constexpr uint64_t min_blob_size = 10; Options options; @@ -8185,11 +8554,13 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_TRUE( - db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); + auto expected_completion_status = Status::Corruption(); + MyCompactRange(CompactRangeOptions(), begin, end, false, + &expected_completion_status); } -TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) { +TEST_P(DBCompactionTestWithMCC, + CompactionWithBlobGCError_IndexWithInvalidFileNumber) { Options options; options.env = env_; options.disable_auto_compactions = true; @@ -8235,8 +8606,9 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) { constexpr Slice* begin = nullptr; constexpr Slice* end = nullptr; - ASSERT_TRUE( - db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); + auto expected_completion_status = Status::Corruption(); + MyCompactRange(CompactRangeOptions(), begin, end, false, + &expected_completion_status); } TEST_F(DBCompactionTest, CompactionWithChecksumHandoff1) { @@ -8593,7 +8965,7 @@ TEST_F(DBCompactionTest, FIFOWarm) { Destroy(options); } -TEST_F(DBCompactionTest, DisableMultiManualCompaction) { +TEST_P(DBCompactionTestWithMCC, DisableMultiManualCompaction) { const int kNumL0Files = 10; Options options = CurrentOptions(); @@ -8633,8 +9005,8 @@ TEST_F(DBCompactionTest, DisableMultiManualCompaction) { std::string end_str = Key(3); Slice b = begin_str; Slice e = end_str; - auto s = db_->CompactRange(cro, &b, &e); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, &b, &e, false, &expected_completion_status); }); port::Thread compact_thread2([&]() { @@ -8644,8 +9016,8 @@ TEST_F(DBCompactionTest, DisableMultiManualCompaction) { std::string end_str = Key(7); Slice b = begin_str; Slice e = end_str; - auto s = db_->CompactRange(cro, &b, &e); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, &b, &e, false, &expected_completion_status); }); // Disable manual compaction should cancel both manual compactions and both @@ -8663,7 +9035,7 @@ TEST_F(DBCompactionTest, DisableMultiManualCompaction) { ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); } -TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { +TEST_P(DBCompactionTestWithMCC, DisableJustStartedManualCompaction) { const int kNumL0Files = 4; Options options = CurrentOptions(); @@ -8691,8 +9063,8 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { port::Thread compact_thread([&]() { CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - auto s = db_->CompactRange(cro, nullptr, nullptr); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); }); TEST_SYNC_POINT( "DBCompactionTest::DisableJustStartedManualCompaction:" @@ -8702,7 +9074,7 @@ TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) { compact_thread.join(); } -TEST_F(DBCompactionTest, DisableInProgressManualCompaction) { +TEST_P(DBCompactionTestWithMCC, DisableInProgressManualCompaction) { const int kNumL0Files = 4; Options options = CurrentOptions(); @@ -8727,8 +9099,8 @@ TEST_F(DBCompactionTest, DisableInProgressManualCompaction) { port::Thread compact_thread([&]() { CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - auto s = db_->CompactRange(cro, nullptr, nullptr); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); }); TEST_SYNC_POINT( @@ -8739,7 +9111,7 @@ TEST_F(DBCompactionTest, DisableInProgressManualCompaction) { compact_thread.join(); } -TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { +TEST_P(DBCompactionTestWithMCC, DisableManualCompactionThreadQueueFull) { const int kNumL0Files = 4; SyncPoint::GetInstance()->LoadDependency( @@ -8770,8 +9142,8 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { port::Thread compact_thread([&]() { CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - auto s = db_->CompactRange(cro, nullptr, nullptr); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); }); TEST_SYNC_POINT( @@ -8801,7 +9173,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) { ASSERT_EQ("0,1", FilesPerLevel(0)); } -TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { +TEST_P(DBCompactionTestWithMCC, DisableManualCompactionThreadQueueFullDBClose) { const int kNumL0Files = 4; SyncPoint::GetInstance()->LoadDependency( @@ -8832,8 +9204,8 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { port::Thread compact_thread([&]() { CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - auto s = db_->CompactRange(cro, nullptr, nullptr); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); }); TEST_SYNC_POINT( @@ -8866,7 +9238,7 @@ TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) { } } -TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { +TEST_P(DBCompactionTestWithMCC, DBCloseWithManualCompaction) { const int kNumL0Files = 4; SyncPoint::GetInstance()->LoadDependency( @@ -8897,8 +9269,8 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { port::Thread compact_thread([&]() { CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - auto s = db_->CompactRange(cro, nullptr, nullptr); - ASSERT_TRUE(s.IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); }); TEST_SYNC_POINT( @@ -8928,7 +9300,7 @@ TEST_F(DBCompactionTest, DBCloseWithManualCompaction) { } } -TEST_F(DBCompactionTest, +TEST_P(DBCompactionTestWithMCC, DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) { // When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait // for automatic compactions to drain before starting the manual compaction. @@ -8965,13 +9337,14 @@ TEST_F(DBCompactionTest, CompactRangeOptions cro; cro.exclusive_manual_compaction = true; - ASSERT_TRUE(db_->CompactRange(cro, nullptr, nullptr).IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(cro, nullptr, nullptr, false, &expected_completion_status); ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_TRUE(callback_completed); } -TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) { +TEST_P(DBCompactionTestWithMCC, ChangeLevelConflictsWithManual) { Options options = CurrentOptions(); options.num_levels = 3; Reopen(options); @@ -8984,7 +9357,7 @@ TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 2; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); } ASSERT_EQ("0,0,1", FilesPerLevel(0)); @@ -9032,7 +9405,7 @@ TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) { CompactRangeOptions cro; cro.change_level = true; cro.target_level = 1; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); }); TEST_SYNC_POINT( @@ -9040,14 +9413,15 @@ TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) { "PreForegroundCompactRange"); ASSERT_OK(Put(Key(0), rnd.RandomString(990))); ASSERT_OK(Put(Key(1), rnd.RandomString(990))); - ASSERT_TRUE(dbfull() - ->CompactRange(CompactRangeOptions(), nullptr, nullptr) - .IsIncomplete()); + auto expected_completion_status = Status::Incomplete(); + MyCompactRange(CompactRangeOptions(), nullptr, nullptr, false, + &expected_completion_status); refit_level_thread.join(); } -TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) { +TEST_P(DBCompactionTestWithMCC, + BottomPriCompactionCountsTowardConcurrencyLimit) { // Flushes several files to trigger compaction while lock is released during // a bottom-pri compaction. Verifies it does not get scheduled to thread pool // because per-DB limit for compaction parallelism is one (default). @@ -9080,7 +9454,7 @@ TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) { CompactRangeOptions cro; cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; cro.exclusive_manual_compaction = false; - ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + MyCompactRange(cro, nullptr, nullptr, true); }); // Sleep in the low-pri thread so any newly scheduled compaction will be @@ -9142,6 +9516,9 @@ TEST_F(DBCompactionTest, BottommostFileCompactionAllowIngestBehind) { // ASSERT_OK(dbfull()->TEST_WaitForCompact(true /* wait_unscheduled */)); } +INSTANTIATE_TEST_CASE_P(DBCompactionTestWithMCC, DBCompactionTestWithMCC, + testing::Bool()); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/compact_range_threads_mngr.cc b/db/db_impl/compact_range_threads_mngr.cc new file mode 100644 index 0000000000..35dbd9b088 --- /dev/null +++ b/db/db_impl/compact_range_threads_mngr.cc @@ -0,0 +1,59 @@ +// Copyright (C) 2023 Speedb Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "compact_range_threads_mngr.h" + +#include + +namespace ROCKSDB_NAMESPACE { + +CompactRangeThreadsMngr::~CompactRangeThreadsMngr() { Shutdown(); } + +void CompactRangeThreadsMngr::Shutdown() { + std::lock_guard lock(lock_); + + std::for_each(std::begin(threads_), std::end(threads_), [](std::thread& t) { + if (t.joinable()) { + t.join(); + } + }); + threads_.clear(); +} + +void CompactRangeThreadsMngr::AddThread(std::thread&& thread) { + std::lock_guard lock(lock_); + + // Lazy removal (and destruction) of completed threads + CleanupCompletedThreads(); + threads_.push_back(std::move(thread)); +} + +void CompactRangeThreadsMngr::CleanupCompletedThreads() { + auto threads_iter = begin(threads_); + while (threads_iter != threads_.end()) { + if (threads_iter->joinable()) { + ++threads_iter; + } else { + threads_iter = threads_.erase(threads_iter); + } + } +} + +size_t CompactRangeThreadsMngr::TEST_GetNumJoinableThreads() const { + std::lock_guard lock(lock_); + return std::count_if(std::begin(threads_), std::end(threads_), + [](const std::thread& t) { return t.joinable(); }); +} + +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/db_impl/compact_range_threads_mngr.h b/db/db_impl/compact_range_threads_mngr.h new file mode 100644 index 0000000000..5f17a77ffb --- /dev/null +++ b/db/db_impl/compact_range_threads_mngr.h @@ -0,0 +1,51 @@ +// Copyright (C) 2023 Speedb Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +class CompactRangeThreadsMngr { + public: + ~CompactRangeThreadsMngr(); + + void Shutdown(); + + // In addition to adding the thread, this method lazily removes, from + // its container, threads that have already completed. + // Alternatively, this could have been done as a periodic activity in + // the periodic scheduler, but seems not to be a worthwhile periodic activity. + void AddThread(std::thread&& thread); + + public: + size_t TEST_GetNumJoinableThreads() const; + + private: + void CleanupCompletedThreads(); + + private: + mutable std::mutex lock_; + // A list should be fine as there is no random access required + // and a very small number of threads is expected + std::list threads_; +}; + +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0f01981559..e7f9436bda 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -622,6 +622,10 @@ Status DBImpl::CloseHelper() { cfd->UnrefAndTryDelete(); } + // Wait for all non-blocking manual compactions that may still be in progress. + // Do it only after cleaning up all compaction-related activity above. + compact_range_threads_mngr_.Shutdown(); + if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking mutex_.Unlock(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9f7cbe54b2..a0fcdeaa9f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -23,6 +23,7 @@ #include "db/column_family.h" #include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_job.h" +#include "db/db_impl/compact_range_threads_mngr.h" #include "db/db_impl/db_spdb_impl_write.h" #include "db/error_handler.h" #include "db/event_helpers.h" @@ -1604,6 +1605,11 @@ class DBImpl : public DB { friend class DBCompactionTest_CompactionDuringShutdown_Test; friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test; #ifndef NDEBUG + // Since all of the ut-s inherit from DBTestBase, this should be the only + // friend. Methods should be added (as applicable) to DBTestBase to allow + // access to the internals of DBImpl to ut-s + friend class DBTestBase; + friend class DBTest2_ReadCallbackTest_Test; friend class WriteCallbackPTest_WriteWithCallbackTest_Test; friend class XFTransactionWriteHandler; @@ -2344,6 +2350,17 @@ class DBImpl : public DB { bool ShouldReferenceSuperVersion(const MergeContext& merge_context); + void CompactRangeNonBlockingThread(const CompactRangeOptions options, + ColumnFamilyData* cfd, std::string begin, + std::string end, + const std::string trim_ts); + + Status CompactRangeInternalBlocking(const CompactRangeOptions& options, + ColumnFamilyData* cfd, const Slice* begin, + const Slice* end, + const std::string& trim_ts); + + private: // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; @@ -2756,6 +2773,8 @@ class DBImpl : public DB { // The number of LockWAL called without matching UnlockWAL call. // See also lock_wal_write_token_ uint32_t lock_wal_count_; + + CompactRangeThreadsMngr compact_range_threads_mngr_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 5ae72d8fe6..6d70ab2af5 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -8,7 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include +#include #include +#include +#include +#include #include "db/builder.h" #include "db/db_impl/db_impl.h" @@ -893,12 +897,23 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin_without_ts, const Slice* end_without_ts) { + auto HandleImmediateReturn = [&options](Status completion_status) { + if (options.async_completion_cb) { + options.async_completion_cb->CompletedCb(completion_status); + return Status::OK(); + } else { + return completion_status; + } + }; + if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) { - return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + return HandleImmediateReturn( + Status::Incomplete(Status::SubCode::kManualCompactionPaused)); } if (options.canceled && options.canceled->load(std::memory_order_acquire)) { - return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + return HandleImmediateReturn( + Status::Incomplete(Status::SubCode::kManualCompactionPaused)); } const Comparator* const ucmp = column_family->GetComparator(); @@ -987,6 +1002,28 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, return Status::OK(); } +void DBImpl::CompactRangeNonBlockingThread(const CompactRangeOptions options, + ColumnFamilyData* cfd, + std::string begin_str, + std::string end_str, + const std::string trim_ts) { + assert(options.async_completion_cb); + + if (shutdown_initiated_) { + options.async_completion_cb->CompletedCb(Status::ShutdownInProgress()); + } + + Slice begin{begin_str}; + Slice* begin_to_use = begin.empty() ? nullptr : &begin; + Slice end{end_str}; + Slice* end_to_use = end.empty() ? nullptr : &end; + + auto status = CompactRangeInternalBlocking(options, cfd, begin_to_use, + end_to_use, trim_ts); + + options.async_completion_cb->CompletedCb(status); +} + Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, @@ -994,27 +1031,60 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, auto cfh = static_cast_with_check(column_family); auto cfd = cfh->cfd(); + auto HandleImmediateReturn = [&options](Status completion_status) { + if (options.async_completion_cb) { + options.async_completion_cb->CompletedCb(completion_status); + return Status::OK(); + } else { + return completion_status; + } + }; + if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) { - return Status::InvalidArgument("Invalid target path ID"); + return HandleImmediateReturn( + Status::InvalidArgument("Invalid target path ID")); } - bool flush_needed = true; - // Update full_history_ts_low if it's set if (options.full_history_ts_low != nullptr && !options.full_history_ts_low->empty()) { std::string ts_low = options.full_history_ts_low->ToString(); if (begin != nullptr || end != nullptr) { - return Status::InvalidArgument( - "Cannot specify compaction range with full_history_ts_low"); + return HandleImmediateReturn(Status::InvalidArgument( + "Cannot specify compaction range with full_history_ts_low")); } Status s = IncreaseFullHistoryTsLowImpl(cfd, ts_low); if (!s.ok()) { LogFlush(immutable_db_options_.info_log); - return s; + return HandleImmediateReturn(s); + } + } + + if (options.async_completion_cb) { + std::string begin_str; + if (begin != nullptr) { + begin_str.assign(begin->data(), begin->size()); } + std::string end_str; + if (end != nullptr) { + end_str.assign(end->data(), end->size()); + } + std::thread compact_range_thread(&DBImpl::CompactRangeNonBlockingThread, + this, options, cfd, begin_str, end_str, + trim_ts); + compact_range_threads_mngr_.AddThread(std::move(compact_range_thread)); + return Status::OK(); + } else { + return CompactRangeInternalBlocking(options, cfd, begin, end, trim_ts); } +} +Status DBImpl::CompactRangeInternalBlocking(const CompactRangeOptions& options, + ColumnFamilyData* cfd, + const Slice* begin, + const Slice* end, + const std::string& trim_ts) { + bool flush_needed = true; Status s; if (begin != nullptr && end != nullptr) { // TODO(ajkr): We could also optimize away the flush in certain cases where diff --git a/db/db_test_util.h b/db/db_test_util.h index 40363c7567..a24433b67e 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -1097,6 +1098,9 @@ class DBTestBase : public testing::Test { DBImpl* dbfull() { return static_cast_with_check(db_); } + std::atomic& dbfull_shutting_down() { return dbfull()->shutting_down_; } + ErrorHandler& dbfull_error_handler() { return dbfull()->error_handler_; } + void CreateColumnFamilies(const std::vector& cfs, const Options& options); diff --git a/db/manual_compaction_test.cc b/db/manual_compaction_test.cc index b92cb794b9..b9421ec165 100644 --- a/db/manual_compaction_test.cc +++ b/db/manual_compaction_test.cc @@ -5,16 +5,19 @@ // // Test for issue 178: a manual compaction causes deleted data to reappear. #include +#include #include "port/port.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/write_batch.h" +#include "test_util/sync_point.h" #include "test_util/testharness.h" using ROCKSDB_NAMESPACE::CompactionFilter; using ROCKSDB_NAMESPACE::CompactionStyle; +using ROCKSDB_NAMESPACE::CompactRangeCompletedCbIf; using ROCKSDB_NAMESPACE::CompactRangeOptions; using ROCKSDB_NAMESPACE::CompressionType; using ROCKSDB_NAMESPACE::DB; @@ -24,9 +27,9 @@ using ROCKSDB_NAMESPACE::Iterator; using ROCKSDB_NAMESPACE::Options; using ROCKSDB_NAMESPACE::ReadOptions; using ROCKSDB_NAMESPACE::Slice; +using ROCKSDB_NAMESPACE::Status; using ROCKSDB_NAMESPACE::WriteBatch; using ROCKSDB_NAMESPACE::WriteOptions; - namespace { // Reasoning: previously the number was 1100000. Since the keys are written to @@ -44,16 +47,51 @@ std::string Key1(int i) { std::string Key2(int i) { return Key1(i) + "_xxx"; } -class ManualCompactionTest : public testing::Test { +class ManualCompactionTest : public testing::Test, + public testing::WithParamInterface { public: ManualCompactionTest() { + blocking_ = GetParam(); + // Get rid of any state from an old run. dbname_ = ROCKSDB_NAMESPACE::test::PerThreadDBPath( "rocksdb_manual_compaction_test"); EXPECT_OK(DestroyDB(dbname_, Options())); } + void TearDown() override { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + } + + struct CompactRangeCompleteCb : public CompactRangeCompletedCbIf { + void CompletedCb(Status completion_status) override { + ASSERT_OK(completion_status); + TEST_SYNC_POINT("TestCompactRangeComplete"); + } + }; + + void SetupTestPointsIfApplicable(const std::string& test_point_name) { + if (blocking_) { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"TestCompactRangeComplete", test_point_name}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + } + + CompactRangeOptions GetCompactRangeOptions() { + CompactRangeOptions cr_options; + if (blocking_) { + // cr_options.async_completion_cb.reset(std::bind(&ManualCompactionTest::CompactRangeCompleteCb, + // this, std::placeholders::_1)); + cr_options.async_completion_cb = + std::make_shared(); + } + + return cr_options; + } + std::string dbname_; + bool blocking_ = false; }; class DestroyAllCompactionFilter : public CompactionFilter { @@ -96,7 +134,7 @@ class LogCompactionFilter : public CompactionFilter { mutable std::map key_level_; }; -TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { +TEST_P(ManualCompactionTest, CompactTouchesAllKeys) { for (int iter = 0; iter < 2; ++iter) { DB* db; Options options; @@ -117,7 +155,13 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { ASSERT_OK(db->Put(WriteOptions(), Slice("key4"), Slice("destroy"))); Slice key4("key4"); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, &key4)); + + const std::string test_point_name = "WaitForCompactRangeComplete"; + SetupTestPointsIfApplicable(test_point_name); + + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), nullptr, &key4)); + TEST_SYNC_POINT(test_point_name); + Iterator* itr = db->NewIterator(ReadOptions()); itr->SeekToFirst(); ASSERT_TRUE(itr->Valid()); @@ -132,7 +176,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { } } -TEST_F(ManualCompactionTest, Test) { +TEST_P(ManualCompactionTest, Test) { // Open database. Disable compression since it affects the creation // of layers and the code below is trying to test against a very // specific scenario. @@ -170,8 +214,12 @@ TEST_F(ManualCompactionTest, Test) { Slice least(start_key.data(), start_key.size()); Slice greatest(end_key.data(), end_key.size()); + const std::string test_point_name = "WaitForCompactRangeComplete"; + SetupTestPointsIfApplicable(test_point_name); + // commenting out the line below causes the example to work correctly - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &least, &greatest)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &least, &greatest)); + TEST_SYNC_POINT(test_point_name); // count the keys Iterator* iter = db->NewIterator(ReadOptions()); @@ -187,7 +235,7 @@ TEST_F(ManualCompactionTest, Test) { ASSERT_OK(DestroyDB(dbname_, Options())); } -TEST_F(ManualCompactionTest, SkipLevel) { +TEST_P(ManualCompactionTest, SkipLevel) { DB* db; Options options; options.num_levels = 3; @@ -211,67 +259,95 @@ TEST_F(ManualCompactionTest, SkipLevel) { ASSERT_OK(db->Flush(fo)); { + const std::string test_point_name1 = "WaitForCompactRangeComplete1"; + SetupTestPointsIfApplicable(test_point_name1); + // L0: 1, 2, [4, 8] // no file has keys in range [5, 7] Slice start("5"); Slice end("7"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, &end)); + + // commenting out the line below causes the example to work correctly + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, &end)); + TEST_SYNC_POINT(test_point_name1); ASSERT_EQ(0, filter->NumKeys()); } { + const std::string test_point_name2 = "WaitForCompactRangeComplete2"; + SetupTestPointsIfApplicable(test_point_name2); + // L0: 1, 2, [4, 8] // [3, 7] overlaps with 4 in L0 Slice start("3"); Slice end("7"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, &end)); + + // commenting out the line below causes the example to work correctly + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, &end)); + TEST_SYNC_POINT(test_point_name2); ASSERT_EQ(2, filter->NumKeys()); ASSERT_EQ(0, filter->KeyLevel("4")); ASSERT_EQ(0, filter->KeyLevel("8")); } { + const std::string test_point_name3 = "WaitForCompactRangeComplete3"; + SetupTestPointsIfApplicable(test_point_name3); + // L0: 1, 2 // L1: [4, 8] // no file has keys in range (-inf, 0] Slice end("0"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, &end)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), nullptr, &end)); + TEST_SYNC_POINT(test_point_name3); ASSERT_EQ(0, filter->NumKeys()); } { + const std::string test_point_name4 = "WaitForCompactRangeComplete4"; + SetupTestPointsIfApplicable(test_point_name4); + // L0: 1, 2 // L1: [4, 8] // no file has keys in range [9, inf) Slice start("9"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, nullptr)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, nullptr)); + TEST_SYNC_POINT(test_point_name4); ASSERT_EQ(0, filter->NumKeys()); } { + const std::string test_point_name5 = "WaitForCompactRangeComplete5"; + SetupTestPointsIfApplicable(test_point_name5); + // L0: 1, 2 // L1: [4, 8] // [2, 2] overlaps with 2 in L0 Slice start("2"); Slice end("2"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, &end)); + TEST_SYNC_POINT(test_point_name5); ASSERT_EQ(1, filter->NumKeys()); ASSERT_EQ(0, filter->KeyLevel("2")); } { + const std::string test_point_name6 = "WaitForCompactRangeComplete6"; + SetupTestPointsIfApplicable(test_point_name6); + // L0: 1 // L1: 2, [4, 8] // [2, 5] overlaps with 2 and [4, 8) in L1, skip L0 Slice start("2"); Slice end("5"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, &end)); + TEST_SYNC_POINT(test_point_name6); ASSERT_EQ(3, filter->NumKeys()); ASSERT_EQ(1, filter->KeyLevel("2")); ASSERT_EQ(1, filter->KeyLevel("4")); @@ -279,12 +355,16 @@ TEST_F(ManualCompactionTest, SkipLevel) { } { + const std::string test_point_name7 = "WaitForCompactRangeComplete7"; + SetupTestPointsIfApplicable(test_point_name7); + // L0: 1 // L1: [2, 4, 8] // [0, inf) overlaps all files Slice start("0"); filter->Reset(); - ASSERT_OK(db->CompactRange(CompactRangeOptions(), &start, nullptr)); + ASSERT_OK(db->CompactRange(GetCompactRangeOptions(), &start, nullptr)); + TEST_SYNC_POINT(test_point_name7); ASSERT_EQ(4, filter->NumKeys()); // 1 is first compacted to L1 and then further compacted into [2, 4, 8], // so finally the logged level for 1 is L1. @@ -299,6 +379,9 @@ TEST_F(ManualCompactionTest, SkipLevel) { ASSERT_OK(DestroyDB(dbname_, options)); } +INSTANTIATE_TEST_CASE_P(ManualCompactionTest, ManualCompactionTest, + testing::Bool()); + } // anonymous namespace int main(int argc, char** argv) { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index a1062608f2..83d7b8b478 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. // +#include +#include #include #include @@ -19,6 +21,7 @@ #include "db_stress_tool/db_stress_table_properties_collector.h" #include "rocksdb/convenience.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/options.h" #include "rocksdb/secondary_cache.h" #include "rocksdb/sst_file_manager.h" #include "rocksdb/table_pinning_policy.h" @@ -2228,6 +2231,24 @@ Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) { return Status::OK(); } +namespace { +using CbFuture = std::future; + +struct CompactRangeCompleteCb : public CompactRangeCompletedCbIf { + CompactRangeCompleteCb() { + my_promise_ = std::make_unique>(); + } + + CbFuture GetFuture() { return my_promise_->get_future(); } + + void CompletedCb(Status completion_status) override { + my_promise_->set_value(completion_status); + } + + std::unique_ptr> my_promise_; +}; +} // namespace + void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, const Slice& start_key, ColumnFamilyHandle* column_family) { @@ -2274,10 +2295,34 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key); } - Status status = db_->CompactRange(cro, column_family, &start_key, &end_key); + Status status; + + if (thread->rand.OneIn(2)) { + auto completion_cb = std::make_shared(); + cro.async_completion_cb = completion_cb; + status = db_->CompactRange(cro, column_family, &start_key, &end_key); + + auto completion_cb_future = completion_cb->GetFuture(); + auto future_wait_status = + completion_cb_future.wait_for(std::chrono::seconds(60)); + if (future_wait_status == std::future_status::ready) { + // Obtain the actual completion status + status = completion_cb_future.get(); + } else { + fprintf(stderr, + "Non-Blocking CompactRange() Didn't Complete Successfuly in " + "Time: %d\n", + static_cast(future_wait_status)); + // Already notified about the error, fake success for the check + + // notification below + status = Status::OK(); + } + } else { + status = db_->CompactRange(cro, column_family, &start_key, &end_key); + } if (!status.ok()) { - fprintf(stdout, "Unable to perform CompactRange(): %s\n", + fprintf(stderr, "Unable to perform CompactRange(): %s\n", status.ToString().c_str()); } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 1bbe8e6925..9af74ed190 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1350,6 +1350,22 @@ class DB { // the files. In this case, client could set options.change_level to true, to // move the files back to the minimum level capable of holding the data set // or a given level (specified by non-negative options.target_level). + // + // Non-Blocking Compactions: + // A non-blocking compaction is initiated by setting the async_completion_cb + // option in the CompactRangeOptions options parameter. By default (unless + // explicitly set by the caller), the CompactRange() will be blocking. When + // async_completion_cb is set, the CompactRange() call will return control to + // the caller immediately. The manual compaction iteslf will be performed in + // an internally created thread. The manual compaction will ALWAYS call the + // specified callback upon completion and provide the completion status. + // + // NOTES: + // 1. The callback object must be alive until the callback has been called. + // 2. The callback MAY be called in the context of the caller's thread when + // there are conditions + // that prevent manual compaction from running. Otherwise, the callback + // will be called in the context of the internally created thread. virtual Status CompactRange(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 98677b9cff..204d5883c6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1941,6 +1941,16 @@ enum class BlobGarbageCollectionPolicy { kUseDefault, }; +// An abstract base class for non-blocking (asynchronous) manual compaction +// See async_completion_cb below and the CompactRange() API call for more +// details +struct CompactRangeCompletedCbIf { + virtual ~CompactRangeCompletedCbIf() = default; + + // Non-Blocking Manual Compaction Completion callback + virtual void CompletedCb(Status completion_status) = 0; +}; + // CompactRangeOptions is used by CompactRange() call. struct CompactRangeOptions { // If true, no other compaction will run at the same time as this @@ -1996,6 +2006,10 @@ struct CompactRangeOptions { // user-provided setting. This enables customers to selectively override the // age cutoff. double blob_garbage_collection_age_cutoff = -1; + + // An optional completion callback to allow for non-blocking (async) operation + // Default: Empty (Blocking) + std::shared_ptr async_completion_cb; }; // IngestExternalFileOptions is used by IngestExternalFile() diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 361869f436..4f6a9d0b9c 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -24,6 +24,8 @@ set(JNI_NATIVE_SOURCES rocksjni/compaction_options_fifo.cc rocksjni/compaction_options_universal.cc rocksjni/compact_range_options.cc + rocksjni/compact_range_completion_cb.cc + rocksjni/compact_range_completed_jnicallback.cc rocksjni/comparator.cc rocksjni/comparatorjnicallback.cc rocksjni/compression_options.cc diff --git a/java/Makefile b/java/Makefile index a2d4311ab3..5cc3bcdbd4 100644 --- a/java/Makefile +++ b/java/Makefile @@ -27,6 +27,8 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.CompactionOptionsFIFO\ org.rocksdb.CompactionOptionsUniversal\ org.rocksdb.CompactRangeOptions\ + org.rocksdb.AbstractCompactRangeCompletedCb\ + org.rocksdb.CompactRangeCompletedCb\ org.rocksdb.ComparatorOptions\ org.rocksdb.CompressionOptions\ org.rocksdb.ConfigOptions\ diff --git a/java/rocksjni/compact_range_completed_jnicallback.cc b/java/rocksjni/compact_range_completed_jnicallback.cc new file mode 100644 index 0000000000..1a1602a3b7 --- /dev/null +++ b/java/rocksjni/compact_range_completed_jnicallback.cc @@ -0,0 +1,71 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the callback "bridge" between Java and C++ for +// ROCKSDB_NAMESPACE::CompactRangeCbIf. + +#include "rocksjni/compact_range_completed_jnicallback.h" + +#include "rocksjni/portal.h" + +namespace ROCKSDB_NAMESPACE { +CompactRangeCompletedJniCallback::CompactRangeCompletedJniCallback( + JNIEnv* env, jobject jcompletion_cb) + : JniCallback(env, jcompletion_cb) { + InitCallbackMethodId( + m_cb_mid, env, + AbstractCompactRangeCompletedCbJni::getCompletedCbProxyMethodId); +} + +void CompactRangeCompletedJniCallback::CompletedCb(Status completion_status) { + if (m_cb_mid == nullptr) { + return; + } + + JNIEnv* env; + jboolean attached_thread; + jobject jcompletion_status = SetupCallbackInvocation( + env, attached_thread, completion_status, StatusJni::construct); + + if (jcompletion_status != nullptr) { + env->CallVoidMethod(m_jcallback_obj, m_cb_mid, jcompletion_status); + } + + CleanupCallbackInvocation(env, attached_thread, {&jcompletion_status}); +} + +void CompactRangeCompletedJniCallback::InitCallbackMethodId( + jmethodID& mid, JNIEnv* env, jmethodID (*get_id)(JNIEnv* env)) { + mid = get_id(env); +} + +template +jobject CompactRangeCompletedJniCallback::SetupCallbackInvocation( + JNIEnv*& env, jboolean& attached_thread, const T& cpp_obj, + jobject (*convert)(JNIEnv* env, const T* cpp_obj)) { + attached_thread = JNI_FALSE; + env = getJniEnv(&attached_thread); + assert(env != nullptr); + + return convert(env, &cpp_obj); +} + +void CompactRangeCompletedJniCallback::CleanupCallbackInvocation( + JNIEnv* env, jboolean attached_thread, + std::initializer_list refs) { + for (auto* ref : refs) { + if (*ref == nullptr) continue; + env->DeleteLocalRef(*ref); + } + + if (env->ExceptionCheck()) { + // exception thrown from CallVoidMethod + env->ExceptionDescribe(); // print out exception to stderr + } + + releaseJniEnv(attached_thread); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/java/rocksjni/compact_range_completed_jnicallback.h b/java/rocksjni/compact_range_completed_jnicallback.h new file mode 100644 index 0000000000..084b3187d2 --- /dev/null +++ b/java/rocksjni/compact_range_completed_jnicallback.h @@ -0,0 +1,44 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the callback "bridge" between Java and C++ for +// ROCKSDB_NAMESPACE::EventListener. + +#pragma once + +#include + +#include +#include + +#include "rocksdb/options.h" +#include "rocksjni/jnicallback.h" + +namespace ROCKSDB_NAMESPACE { + +class CompactRangeCompletedJniCallback : public JniCallback, + public CompactRangeCompletedCbIf { + public: + CompactRangeCompletedJniCallback(JNIEnv* env, jobject jcompletion_cb); + virtual ~CompactRangeCompletedJniCallback() = default; + + void CompletedCb(Status completion_status) override; + + private: + inline void InitCallbackMethodId(jmethodID& mid, JNIEnv* env, + jmethodID (*get_id)(JNIEnv* env)); + template + jobject SetupCallbackInvocation(JNIEnv*& env, jboolean& attached_thread, + const T& cpp_obj, + jobject (*convert)(JNIEnv* env, + const T* cpp_obj)); + + void CleanupCallbackInvocation(JNIEnv* env, jboolean attached_thread, + std::initializer_list refs); + + jmethodID m_cb_mid; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/java/rocksjni/compact_range_completion_cb.cc b/java/rocksjni/compact_range_completion_cb.cc new file mode 100644 index 0000000000..beccb7bfe6 --- /dev/null +++ b/java/rocksjni/compact_range_completion_cb.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ for +// ROCKSDB_NAMESPACE::EventListener. + +#include + +#include + +#include "include/org_rocksdb_AbstractCompactRangeCompletedCb.h" +#include "rocksdb/options.h" +#include "rocksjni/compact_range_completed_jnicallback.h" +#include "rocksjni/cplusplus_to_java_convert.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_AbstractEventListener + * Method: createNewEventListener + * Signature: (J)J + */ +jlong Java_org_rocksdb_AbstractCompactRangeCompletedCb_createNewCompactRangeCompletedCb( + JNIEnv* env, jobject jobj) { + auto* sptr_completion_cb = + new std::shared_ptr( + new ROCKSDB_NAMESPACE::CompactRangeCompletedJniCallback(env, jobj)); + return GET_CPLUSPLUS_POINTER(sptr_completion_cb); +} + +/* + * Class: org_rocksdb_AbstractEventListener + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_AbstractCompactRangeCompletedCb_disposeInternal( + JNIEnv*, jobject, jlong jhandle) { + delete reinterpret_cast< + std::shared_ptr*>( + jhandle); +} diff --git a/java/rocksjni/compact_range_options.cc b/java/rocksjni/compact_range_options.cc index 77fbb8890e..fa5001b217 100644 --- a/java/rocksjni/compact_range_options.cc +++ b/java/rocksjni/compact_range_options.cc @@ -11,6 +11,7 @@ #include "include/org_rocksdb_CompactRangeOptions.h" #include "rocksdb/options.h" #include "rocksjni/cplusplus_to_java_convert.h" +#include "rocksjni/jnicallback.h" #include "rocksjni/portal.h" /* @@ -208,6 +209,16 @@ void Java_org_rocksdb_CompactRangeOptions_setMaxSubcompactions( options->max_subcompactions = static_cast(max_subcompactions); } +void Java_org_rocksdb_CompactRangeOptions_setAsyncCompletionCb( + JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle, + jlong completion_cb_handle) { + auto* options = + reinterpret_cast(jhandle); + options->async_completion_cb = *reinterpret_cast< + std::shared_ptr*>( + completion_cb_handle); +} + /* * Class: org_rocksdb_CompactRangeOptions * Method: disposeInternal @@ -219,4 +230,4 @@ void Java_org_rocksdb_CompactRangeOptions_disposeInternal(JNIEnv* /*env*/, auto* options = reinterpret_cast(jhandle); delete options; -} +} \ No newline at end of file diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 7953f2ea76..305c3e9dab 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -34,6 +34,7 @@ #include "rocksdb/utilities/memory_util.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "rocksjni/compact_range_completed_jnicallback.h" #include "rocksjni/compaction_filter_factory_jnicallback.h" #include "rocksjni/comparatorjnicallback.h" #include "rocksjni/cplusplus_to_java_convert.h" @@ -8326,6 +8327,44 @@ class AbstractEventListenerJni } }; +// The portal class for org.rocksdb.AbstractCompactRangeCompletedCb +class AbstractCompactRangeCompletedCbJni + : public RocksDBNativeClass< + const ROCKSDB_NAMESPACE::CompactRangeCompletedJniCallback*, + AbstractCompactRangeCompletedCbJni> { + public: + /** + * Get the Java Class org.rocksdb.AbstractCompactRangeCompletedCb + * + * @param env A pointer to the Java environment + * + * @return The Java Class or nullptr if one of the + * ClassFormatError, ClassCircularityError, NoClassDefFoundError, + * OutOfMemoryError or ExceptionInInitializerError exceptions is thrown + */ + static jclass getJClass(JNIEnv* env) { + return RocksDBNativeClass::getJClass( + env, "org/rocksdb/AbstractCompactRangeCompletedCb"); + } + + /** + * Get the Java Method: + * AbstractCompactRangeCompletedCb#compactRangeCompletedCbProxy + * + * @param env A pointer to the Java environment + * + * @return The Java Method ID + */ + static jmethodID getCompletedCbProxyMethodId(JNIEnv* env) { + jclass jclazz = getJClass(env); + assert(jclazz != nullptr); + static jmethodID mid = env->GetMethodID( + jclazz, "compactRangeCompletedCbProxy", "(Lorg/rocksdb/Status;)V"); + assert(mid != nullptr); + return mid; + } +}; + class FlushJobInfoJni : public JavaClass { public: /** diff --git a/java/samples/src/main/java/RocksDBSample.java b/java/samples/src/main/java/RocksDBSample.java index 21eb495584..db995c279d 100644 --- a/java/samples/src/main/java/RocksDBSample.java +++ b/java/samples/src/main/java/RocksDBSample.java @@ -4,11 +4,11 @@ // (found in the LICENSE.Apache file in the root directory). import java.lang.IllegalArgumentException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.ArrayList; - +import java.util.concurrent.atomic.AtomicBoolean; import org.rocksdb.*; import org.rocksdb.util.SizeUnit; @@ -17,6 +17,36 @@ public class RocksDBSample { RocksDB.loadLibrary(); } + private static class MyCompactRangeCompletedCb extends AbstractCompactRangeCompletedCb { + public MyCompactRangeCompletedCb() { + completedCbCalled = new AtomicBoolean(); + } + + @Override + public void CompactRangeCompleted(final Status completionStatus) { + assert (completionStatus.getCode() == Status.Code.Ok); + System.out.println( + "Non-Blocking Compact Range Completed with Status:" + completionStatus.getCodeString()); + completedCbCalled.set(true); + } + + public AtomicBoolean completedCbCalled; + } + + private static MyCompactRangeCompletedCb InitiateNonBlockingCompactRange(final RocksDB db) { + final MyCompactRangeCompletedCb cb = new MyCompactRangeCompletedCb(); + final CompactRangeOptions cro = new CompactRangeOptions().setAsyncCompletionCb(cb); + + cb.completedCbCalled.set(false); + try { + db.compactRange(null, null, null, cro); + } catch (RocksDBException e) { + assert (false); + } + + return cb; + } + public static void main(final String[] args) { if (args.length < 1) { System.out.println("usage: RocksDBSample db_path"); @@ -138,6 +168,9 @@ public static void main(final String[] args) { System.out.println(""); } + // Initiate Non-Blocking Compact Range and continue operations + MyCompactRangeCompletedCb completionCb = InitiateNonBlockingCompactRange(db); + // write batch test try (final WriteOptions writeOpt = new WriteOptions()) { for (int i = 10; i <= 19; ++i) { @@ -290,6 +323,21 @@ public static void main(final String[] args) { for (final byte[] value1 : values) { assert (value1 != null); } + + // Now just verify that the non-blocking CompactRange() has completed asynchronously + try { + int totalWaitTimeMs = 0; + while ((completionCb.completedCbCalled.get() == false) && (totalWaitTimeMs < 5000)) { + Thread.sleep(100); + totalWaitTimeMs += 100; + } + if (completionCb.completedCbCalled.get() == false) { + assert (false); + } + } catch (InterruptedException e) { + assert (false); + } + } catch (final RocksDBException e) { System.err.println(e); } diff --git a/java/src/main/java/org/rocksdb/AbstractCompactRangeCompletedCb.java b/java/src/main/java/org/rocksdb/AbstractCompactRangeCompletedCb.java new file mode 100644 index 0000000000..8c9d54b9e0 --- /dev/null +++ b/java/src/main/java/org/rocksdb/AbstractCompactRangeCompletedCb.java @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + */ +public abstract class AbstractCompactRangeCompletedCb + extends RocksCallbackObject implements CompactRangeCompletedCb { + @Override + public void CompactRangeCompleted(final Status completionStatus) { + // no-op + } + + /** + * Called from JNI, proxy for + * {@link #CompactRangeCompleted(Status)}. + * + * @param completion_status the completion status + */ + private void compactRangeCompletedCbProxy(final Status completion_status) { + CompactRangeCompleted(completion_status); + } + + @Override + protected long initializeNative(final long... nativeParameterHandles) { + return createNewCompactRangeCompletedCb(); + } + + /** + * Deletes underlying C++ native callback object pointer + */ + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + private native long createNewCompactRangeCompletedCb(); + private native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/CompactRangeCompletedCb.java b/java/src/main/java/org/rocksdb/CompactRangeCompletedCb.java new file mode 100644 index 0000000000..92a351be57 --- /dev/null +++ b/java/src/main/java/org/rocksdb/CompactRangeCompletedCb.java @@ -0,0 +1,14 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + */ +public interface CompactRangeCompletedCb { + /** + */ + void CompactRangeCompleted(final Status completionStatus); +} diff --git a/java/src/main/java/org/rocksdb/CompactRangeOptions.java b/java/src/main/java/org/rocksdb/CompactRangeOptions.java index cf5708601c..20969b94eb 100644 --- a/java/src/main/java/org/rocksdb/CompactRangeOptions.java +++ b/java/src/main/java/org/rocksdb/CompactRangeOptions.java @@ -219,6 +219,15 @@ public CompactRangeOptions setMaxSubcompactions(final int maxSubcompactions) { return this; } + /** + */ + public CompactRangeOptions setAsyncCompletionCb( + final AbstractCompactRangeCompletedCb completionCb) { + assert (isOwningHandle()); + setAsyncCompletionCb(nativeHandle_, completionCb.nativeHandle_); + return this; + } + private native static long newCompactRangeOptions(); @Override protected final native void disposeInternal(final long handle); @@ -243,4 +252,6 @@ private native void setAllowWriteStall(final long handle, private native void setMaxSubcompactions(final long handle, final int maxSubcompactions); private native int maxSubcompactions(final long handle); + + private native void setAsyncCompletionCb(final long nativeHandle_, final long completeCbHandle); } diff --git a/java/src/test/java/org/rocksdb/CompactRangeCompletedCbTest.java b/java/src/test/java/org/rocksdb/CompactRangeCompletedCbTest.java new file mode 100644 index 0000000000..6fd5980c93 --- /dev/null +++ b/java/src/test/java/org/rocksdb/CompactRangeCompletedCbTest.java @@ -0,0 +1,29 @@ +package org.rocksdb; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +public class CompactRangeCompletedCbTest { + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + @Test + public void dummyTest() { + System.err.println("IN dummyTest !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1"); + } + + @Test + public void onFlushCompleted() throws RocksDBException { + final AbstractCompactRangeCompletedCb compactRangeCompletedCb = + new AbstractCompactRangeCompletedCb() { + @Override + public void CompactRangeCompleted(final Status completionStatus) { + System.err.println("IN CompactRangeCompletedCbTest::CompactRangeCompleted !!!!!"); + } + }; + } +} \ No newline at end of file diff --git a/java/src/test/java/org/rocksdb/CompactRangeOptionsTest.java b/java/src/test/java/org/rocksdb/CompactRangeOptionsTest.java index 57bf22b57f..a0d8b353d0 100644 --- a/java/src/test/java/org/rocksdb/CompactRangeOptionsTest.java +++ b/java/src/test/java/org/rocksdb/CompactRangeOptionsTest.java @@ -98,4 +98,20 @@ public void maxSubcompactions() { opt.setMaxSubcompactions(value); assertThat(opt.maxSubcompactions()).isEqualTo(value); } + + @Test + public void asyncCompletionCb() { + CompactRangeOptions opt = new CompactRangeOptions(); + + try (final AbstractCompactRangeCompletedCb completeCb = new TestCompactRangeCompletedCb()) { + opt.setAsyncCompletionCb(completeCb); + } + } + + private static class TestCompactRangeCompletedCb extends AbstractCompactRangeCompletedCb { + @Override + public void CompactRangeCompleted(final Status completionStatus) { + System.err.println("In TestCompactRangeCompletedCb::CompactRangeCompleted"); + } + } } diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 488dbafe80..2fccc52d05 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -4,17 +4,17 @@ // (found in the LICENSE.Apache file in the root directory). package org.rocksdb; -import org.junit.*; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; - -import java.nio.ByteBuffer; -import java.util.*; - import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.*; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + public class RocksDBTest { @ClassRule @@ -759,6 +759,75 @@ public void compactRangeWithKeysColumnFamily() } } + private static class TestCompactRangeCompletedCb extends AbstractCompactRangeCompletedCb { + public TestCompactRangeCompletedCb() { + completedCbCalled = new AtomicBoolean(); + } + + @Override + public void CompactRangeCompleted(final Status completionStatus) { + completedCbCalled.set(true); + } + + public AtomicBoolean completedCbCalled; + } + + @Test + public void fullCompactRangeColumnFamilyNonBlocking() throws RocksDBException { + try (final DBOptions opt = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final ColumnFamilyOptions new_cf_opts = new ColumnFamilyOptions() + .setDisableAutoCompactions(true) + .setCompactionStyle(CompactionStyle.LEVEL) + .setNumLevels(4) + .setWriteBufferSize(100 << 10) + .setLevelZeroFileNumCompactionTrigger(3) + .setTargetFileSizeBase(200 << 10) + .setTargetFileSizeMultiplier(1) + .setMaxBytesForLevelBase(500 << 10) + .setMaxBytesForLevelMultiplier(1) + .setDisableAutoCompactions(false); + final TestCompactRangeCompletedCb cb = new TestCompactRangeCompletedCb(); + final CompactRangeOptions cro = new CompactRangeOptions().setAsyncCompletionCb(cb)) { + final List columnFamilyDescriptors = + Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), + new ColumnFamilyDescriptor("new_cf".getBytes(), new_cf_opts)); + + // open database + final List columnFamilyHandles = new ArrayList<>(); + try (final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles)) { + try { + // fill database with key/value pairs + byte[] b = new byte[10000]; + for (int i = 0; i < 200; i++) { + rand.nextBytes(b); + db.put(columnFamilyHandles.get(1), String.valueOf(i).getBytes(), b); + } + cb.completedCbCalled.set(false); + db.compactRange(null, null, null, cro); + try { + int totalWaitTimeMs = 0; + while ((cb.completedCbCalled.get() == false) && (totalWaitTimeMs < 5000)) { + Thread.sleep(100); + totalWaitTimeMs += 100; + } + if (cb.completedCbCalled.get() == false) { + fail("Callback wasn't called"); + } + } catch (InterruptedException e) { + fail("InterruptedException"); + } + + } finally { + for (final ColumnFamilyHandle handle : columnFamilyHandles) { + handle.close(); + } + } + } + } + } + @Test public void compactRangeWithKeysReduceColumnFamily() throws RocksDBException { diff --git a/src.mk b/src.mk index 10b9f8487c..3580d0c543 100644 --- a/src.mk +++ b/src.mk @@ -55,6 +55,7 @@ LIB_SOURCES = \ db/db_impl/db_impl_secondary.cc \ db/db_impl/db_impl_write.cc \ db/db_impl/db_spdb_impl_write.cc \ + db/db_impl/compact_range_threads_mngr.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ db/dbformat.cc \ @@ -644,6 +645,8 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/cache.cc \ java/rocksjni/columnfamilyhandle.cc \ java/rocksjni/compact_range_options.cc \ + java/rocksjni/compact_range_completion_cb.cc \ + java/rocksjni/compact_range_completed_jnicallback.cc \ java/rocksjni/compaction_filter.cc \ java/rocksjni/compaction_filter_factory.cc \ java/rocksjni/compaction_filter_factory_jnicallback.cc \