From e2f6ec7e27195082a336180e9f68448d1a8dbc5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 7 Sep 2023 12:43:48 +0800 Subject: [PATCH] compaction: send file size and range segments to compaction partitioner context (#341) compaction: send file size and range segments to compaction partitioner context Signed-off-by: hillium --- db/compaction/compaction.cc | 50 +++++++++++++++ db/compaction/compaction.h | 3 + db/db_compaction_test.cc | 103 ++++++++++++++++++++++++++++++ include/rocksdb/sst_partitioner.h | 41 ++++++++++++ 4 files changed, 197 insertions(+) diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 181df406d2c..5629e9ae549 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -9,6 +9,7 @@ #include "db/compaction/compaction.h" +#include #include #include @@ -566,6 +567,52 @@ std::unique_ptr Compaction::CreateCompactionFilter( context); } +std::pair, std::vector> +Compaction::CreateSegmentsForLevel(int level) const { + // So... the below files should be adjacently sorted. + // For now, this is only for creating the next-of-output level info, so it + // makes sense for not supporting L0. + assert(level != 0); + + // Some of test cases may not initialize the version... + if (input_version_ == nullptr) { + return std::make_pair(std::vector(), std::vector()); + } + + const auto vsi = input_version_->storage_info(); + if (level >= vsi->num_non_empty_levels()) { + // The level shall be empty. + return std::make_pair(std::vector(), std::vector()); + } + const auto& files = vsi->LevelFilesBrief(level); + // The file metadata hold internal keys, however the compaction is bounded by + // user keys. + const auto user_cmp = immutable_options()->user_comparator; + const auto end = files.files + files.num_files; + const auto start = std::lower_bound( + files.files, end, smallest_user_key_, + [user_cmp](FdWithKeyRange& fd, const Slice& slice) { + return user_cmp->Compare(ExtractUserKey(fd.largest_key), slice) < 0; + }); + + if (start == end) { + // There is no overlapping of next level. + return std::make_pair(std::vector(), std::vector()); + } + std::vector ranges; + std::vector sizes; + ranges.push_back(ExtractUserKey(start->smallest_key)); + for (const FdWithKeyRange* iter = start; iter < end; iter++) { + if (user_cmp->Compare(ExtractUserKey(iter->smallest_key), + largest_user_key_) > 0) { + break; + } + ranges.push_back(ExtractUserKey(iter->largest_key)); + sizes.push_back(iter->fd.GetFileSize()); + } + return std::make_pair(ranges, sizes); +} + std::unique_ptr Compaction::CreateSstPartitioner() const { if (!immutable_options_.sst_partitioner_factory) { return nullptr; @@ -577,6 +624,9 @@ std::unique_ptr Compaction::CreateSstPartitioner() const { context.output_level = output_level_; context.smallest_user_key = smallest_user_key_; context.largest_user_key = largest_user_key_; + std::tie(context.output_next_level_boundaries, + context.output_next_level_size) = + CreateSegmentsForLevel(output_level_ + 1); return immutable_options_.sst_partitioner_factory->CreatePartitioner(context); } diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index cd31778f83c..758175f033a 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -346,6 +346,9 @@ class Compaction { static bool IsFullCompaction(VersionStorageInfo* vstorage, const std::vector& inputs); + std::pair, std::vector> CreateSegmentsForLevel( + int in_level) const; + VersionStorageInfo* input_vstorage_; const int start_level_; // the lowest level to be compacted diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index fb90eba90be..33d047d97d0 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -84,6 +84,34 @@ class ChangeLevelConflictsWithAuto }; namespace { +class SplitAllPartitioner : public SstPartitioner { + public: + const char* Name() const override { return "SplitAllPartitioner"; } + + PartitionerResult ShouldPartition( + const PartitionerRequest& /*request*/) override { + return PartitionerResult::kRequired; + } + + bool CanDoTrivialMove(const Slice&, const Slice&) { return true; } +}; + +class SplitAllPatitionerFactory : public SstPartitionerFactory { + public: + std::function on_create_; + + SplitAllPatitionerFactory( + std::function on_create) + : on_create_(on_create) {} + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& context) const override { + on_create_(context); + return std::unique_ptr(new SplitAllPartitioner()); + } + + const char* Name() const override { return "SplitAllPartitionerFactory"; } +}; class FlushedFileCollector : public EventListener { public: @@ -1030,6 +1058,81 @@ TEST_F(DBCompactionTest, CompactionSstPartitionerNonTrivial) { ASSERT_EQ("B", Get("bbbb1")); } +TEST_F(DBCompactionTest, CompactionSstPartitionerNextLevel) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 1; + options.max_bytes_for_level_base = 10; + options.max_bytes_for_level_multiplier = 2; + options.sst_partitioner_factory = std::unique_ptr( + new SplitAllPatitionerFactory([this](const SstPartitioner::Context& cx) { + if (!cx.output_next_level_boundaries.empty()) { + std::vector files; + // We are holding the mutex in this context... + // Perhaps we'd better make a `TEST_GetVersion` for fetching. + dbfull()->TEST_UnlockMutex(); + dbfull()->GetLiveFilesMetaData(&files); + dbfull()->TEST_LockMutex(); + std::vector overlapped_files; + std::copy_if( + files.begin(), files.end(), std::back_inserter(overlapped_files), + [&](const LiveFileMetaData& ld) { + return Slice(ld.smallestkey).compare(cx.largest_user_key) < 0 && + Slice(ld.largestkey).compare(cx.smallest_user_key) > 0 && + ld.level == cx.output_level + 1; + }); + std::sort(overlapped_files.begin(), overlapped_files.end(), + [](LiveFileMetaData& x, LiveFileMetaData& y) { + return x.largestkey < y.largestkey; + }); + auto next_level_overlap_files = overlapped_files.size(); + ASSERT_EQ(next_level_overlap_files + 1, + cx.output_next_level_boundaries.size()); + ASSERT_EQ(next_level_overlap_files, cx.output_next_level_size.size()); + ASSERT_EQ(next_level_overlap_files, cx.OutputNextLevelSegmentCount()); + for (size_t i = 0; i < overlapped_files.size(); i++) { + Slice next_level_lower, next_level_upper; + int next_level_size; + cx.OutputNextLevelSegment(i, &next_level_lower, &next_level_upper, + &next_level_size); + + if (i == 0) { + ASSERT_EQ(overlapped_files[i].smallestkey, next_level_lower); + } + ASSERT_EQ(overlapped_files[i].largestkey, next_level_upper); + ASSERT_EQ(overlapped_files[i].size, next_level_size); + } + } + })); + DestroyAndReopen(options); + + ASSERT_OK(Put("A", "there are more than 10 bytes.")); + ASSERT_OK(Put("B", "yet another key.")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_OK(Put("A1", "the new challenger...")); + ASSERT_OK(Put("B1", "and his buddy.")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_OK(Put("A1P", "the new challenger... Changed.")); + ASSERT_OK(Put("B1P", "and his buddy. Changed too.")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_OK(Put(InternalKey("A", 0, ValueType::kTypeDeletion).Encode(), + "And a tricker: he pretends to be A, but not A.")); + ASSERT_OK(Put(InternalKey("B", 0, ValueType::kTypeDeletion).Encode(), + "Yeah, another tricker.")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + + std::vector files; + dbfull()->GetLiveFilesMetaData(&files); + ASSERT_EQ(8, files.size()); +} + TEST_F(DBCompactionTest, ZeroSeqIdCompaction) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; diff --git a/include/rocksdb/sst_partitioner.h b/include/rocksdb/sst_partitioner.h index 3af8e949297..9e3c218da4a 100644 --- a/include/rocksdb/sst_partitioner.h +++ b/include/rocksdb/sst_partitioner.h @@ -62,6 +62,13 @@ class SstPartitioner { virtual bool CanDoTrivialMove(const Slice& smallest_user_key, const Slice& largest_user_key) = 0; + struct Segment { + Segment(uint64_t size_diff, Slice until) + : size_in_this_segment(size_diff), segment_until_user_key(until) {} + uint64_t size_in_this_segment; + Slice segment_until_user_key; + }; + // Context information of a compaction run struct Context { // Does this compaction run include all data files @@ -75,6 +82,40 @@ class SstPartitioner { Slice smallest_user_key; // Largest key for compaction Slice largest_user_key; + + // The segments consist with the next level of target level. + // This will be useful while deciding whether to partition + // files to finer parts for avoiding possible huge compactions. + + // The boundaries of the next level of output level. + // For example, when the next level contains files with range ("001", + // "002"), ("003", "004"), The boundaries will be ["001", "002", "004"]; + std::vector output_next_level_boundaries; + // The size of each segment, for example, when + // `output_next_level_boundaries` is ["001", "002", "004"], this might be + // [42, 96], which means range ["001", "002") contains 42 bytes of data, + // ["002", "004") contains 96 bytes of data. + std::vector output_next_level_size; + + // Helper function to fetch the count of next level segments. + int OutputNextLevelSegmentCount() const { + return output_next_level_size.size(); + } + + // Helper function to fetch the n-th segment of the next level of the output + // level. `index` shall less than `OutputNextLevelSegmentCount`. + void OutputNextLevelSegment(int index, Slice* smallest_key, + Slice* largest_key, int* size) const { + if (smallest_key != nullptr) { + *smallest_key = output_next_level_boundaries[index]; + } + if (largest_key != nullptr) { + *largest_key = output_next_level_boundaries[index + 1]; + } + if (size != nullptr) { + *size = output_next_level_size[index]; + } + } }; };