Skip to content

Commit

Permalink
add range for compaction filter context (tikv#192)
Browse files Browse the repository at this point in the history
* add range for compaction filter context

Signed-off-by: qupeng <qupeng@pingcap.com>
Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
hicqu authored and v01dstar committed Feb 17, 2024
1 parent 98a1475 commit fcc6875
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 3 deletions.
81 changes: 81 additions & 0 deletions db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "test_util/sync_point.h"
Expand Down Expand Up @@ -60,6 +61,86 @@ class FlushedFileCollector : public EventListener {
std::mutex mutex_;
};

class TestFilterFactory : public CompactionFilterFactory {
public:
std::shared_ptr<CompactionFilter::Context> context_;
std::shared_ptr<int> compaction_count_;

TestFilterFactory(std::shared_ptr<CompactionFilter::Context> context,
std::shared_ptr<int> compaction_count) {
this->context_ = context;
this->compaction_count_ = compaction_count;
}

~TestFilterFactory() {}

const char* Name() const { return "TestFilterFactory"; }

std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
context_->start_key = context.start_key;
context_->end_key = context.end_key;
context_->is_end_key_inclusive = context.is_end_key_inclusive;
context_->file_numbers.clear();
context_->table_properties.clear();
for (size_t i = 0; i < context.file_numbers.size(); ++i) {
context_->file_numbers.push_back(context.file_numbers[i]);
context_->table_properties.push_back(context.table_properties[i]);
}
*compaction_count_.get() += 1;
return nullptr;
}
};

TEST_F(CompactFilesTest, FilterContext) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
const int kLevel0Trigger = 10;
options.create_if_missing = true;
options.compaction_style = kCompactionStyleLevel;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.compression = kNoCompression;

std::shared_ptr<CompactionFilter::Context> expected_context(
new CompactionFilter::Context);
std::shared_ptr<int> compaction_count(new int(0));
CompactionFilterFactory* factory =
new TestFilterFactory(expected_context, compaction_count);
options.compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(factory);

DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);

// `Flush` is different from `Compaction`.
db->Put(WriteOptions(), ToString(1), "");
db->Put(WriteOptions(), ToString(51), "");
db->Flush(FlushOptions());
db->Put(WriteOptions(), ToString(50), "");
db->Put(WriteOptions(), ToString(99), "");
db->Flush(FlushOptions());
ASSERT_EQ(*compaction_count.get(), 0);

db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
usleep(10000); // Wait for compaction start.
ASSERT_EQ(expected_context->start_key, Slice("1"));
ASSERT_EQ(expected_context->end_key, Slice("99"));
ASSERT_EQ(expected_context->is_end_key_inclusive, 1);
ASSERT_EQ(expected_context->file_numbers[0], 11);
ASSERT_EQ(*compaction_count.get(), 1);

delete (db);
}

TEST_F(CompactFilesTest, L0ConflictsFiles) {
Options options;
// to trigger compaction more easily
Expand Down
17 changes: 16 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,8 @@ uint64_t Compaction::OutputFilePreallocationSize() const {
preallocation_size + (preallocation_size / 10));
}

std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter(
const Slice* start, const Slice* end) const {
if (!cfd_->ioptions()->compaction_filter_factory) {
return nullptr;
}
Expand All @@ -828,6 +829,20 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
context.is_manual_compaction = is_manual_compaction_;
context.input_start_level = start_level_;
context.is_bottommost_level = bottommost_level_;
context.start_key =
(start == nullptr) ? GetSmallestUserKey() : ExtractUserKey(*start);
context.end_key =
(end == nullptr) ? GetLargestUserKey() : ExtractUserKey(*end);
context.is_end_key_inclusive = (end == nullptr);
for (auto l = inputs_.begin(); l != inputs_.end(); ++l) {
for (auto f = l->files.begin(); f != l->files.end(); ++f) {
std::shared_ptr<const TableProperties> tp;
Status s = input_version_->GetTableProperties(&tp, *f);
assert(s.ok());
context.file_numbers.push_back((*f)->fd.GetNumber());
context.table_properties.push_back(tp);
}
}
context.column_family_id = cfd_->GetID();
context.reason = TableFileCreationReason::kCompaction;
context.input_table_properties = GetInputTableProperties();
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ class Compaction {
void ResetNextCompactionIndex();

// Create a CompactionFilter from compaction_filter_factory
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const Slice* start, const Slice* end) const;

// Create a SstPartitioner from sst_partitioner_factory
std::unique_ptr<SstPartitioner> CreateSstPartitioner() const;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
sub_compact->compaction->CreateCompactionFilter(sub_compact->start,
sub_compact->end);
compaction_filter = compaction_filter_from_factory.get();
}
if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
Expand Down
13 changes: 13 additions & 0 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "rocksdb/wide_columns.h"
Expand Down Expand Up @@ -166,6 +167,18 @@ class CompactionFilter : public Customizable {
int input_start_level = kUnknownStartLevel;
// Whether output files are in bottommost level or not.
bool is_bottommost_level;

// The range of the compaction.
Slice start_key;
Slice end_key;
bool is_end_key_inclusive;

// File numbers of all involved SST files.
std::vector<uint64_t> file_numbers;

// Properties of all involved SST files.
std::vector<std::shared_ptr<const TableProperties>> table_properties;

// The column family that will contain the created table file.
uint32_t column_family_id;
// Reason this table file is being created.
Expand Down

0 comments on commit fcc6875

Please sign in to comment.