Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subcompaction listener #218

Merged
merged 3 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -233,6 +234,8 @@ struct CompactionJob::SubcompactionState {

return false;
}

bool IsPartialCompaction() { return start || end; }
};

// Maintains state for the entire compaction
Expand Down Expand Up @@ -812,6 +815,19 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();

#ifndef ROCKSDB_LITE
SubcompactionJobInfo info;
if (sub_compact->IsPartialCompaction()) {
info.cf_name = cfd->GetName();
info.thread_id = env_->GetThreadID();
info.base_input_level = sub_compact->compaction->start_level();
info.output_level = sub_compact->compaction->output_level();
for (auto listener : db_options_.listeners) {
listener->OnSubcompactionBegin(info);
}
}
#endif // !ROCKSDB_LITE

// Create compaction filter and fail the compaction if
// IgnoreSnapshots() = false because it is not supported anymore
const CompactionFilter* compaction_filter =
Expand Down Expand Up @@ -1070,6 +1086,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset();
input.reset();
sub_compact->status = status;
#ifndef ROCKSDB_LITE
info.status = status;
if (sub_compact->IsPartialCompaction()) {
for (auto listener : db_options_.listeners) {
listener->OnSubcompactionCompleted(info);
}
}
#endif // !ROCKSDB_LITE
}

void CompactionJob::RecordDroppedKeys(
Expand Down
53 changes: 53 additions & 0 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,59 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
}
}

class TestSubcompactionListener : public EventListener {
public:
TestSubcompactionListener() : compacted_(0) {}
void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) override {
compacted_.fetch_add(1);
}

std::atomic<int> compacted_;
};

TEST_F(EventListenerTest, OnSingleDBSubcompactionTest) {
const int kNumL0Files = 8;

Options options;
options.env = CurrentOptions().env;
options.create_if_missing = true;
options.max_subcompactions = 2;
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = k110KB * 2;
options.target_file_size_base = options.write_buffer_size;
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());

TestSubcompactionListener* listener = new TestSubcompactionListener();
options.listeners.emplace_back(listener);
Reopen(options);
ASSERT_OK(Put("k1", std::string(90000, 'k')));
ASSERT_OK(Put("k4", std::string(90000, 'k')));
ASSERT_OK(dbfull()->CompactRange(
CompactRangeOptions(), db_->DefaultColumnFamily(), nullptr, nullptr));
// Three large files overlapped with L1 will trigger at least two
// subcompactions.
ASSERT_OK(Put("k1", std::string(90000, 'k')));
ASSERT_OK(Put("k2", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", std::string(90000, 'k')));
ASSERT_OK(Put("k3", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", std::string(90000, 'k')));
ASSERT_OK(Put("k4", std::string(90000, 'k')));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->RunManualCompaction(
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd(),
0 /* input_level */, 1 /* output_level */, CompactRangeOptions(),
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* disallow_trivial_move */,
port::kMaxUint64 /* max_file_num_to_ignore */));
ASSERT_EQ(listener->compacted_.load(), 2);
}

// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
Expand Down
27 changes: 27 additions & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ struct CompactionJobInfo {
CompactionJobStats stats;
};

struct SubcompactionJobInfo {
SubcompactionJobInfo() = default;

// the id of the column family where the compaction happened.
uint32_t cf_id;
// the name of the column family where the compaction happened.
std::string cf_name;
// the status indicating whether the compaction was successful or not.
Status status;
// the id of the thread that completed this compaction job.
uint64_t thread_id;
// the smallest input level of the compaction.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add job_id so we can use it to match the CompactionJobInfo later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems job id is only unique within each background thread, that's why I removed it.

// the job id, which is unique in the same thread.
  int job_id;

Copy link
Member Author

@tabokie tabokie Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put more thoughts on how to match subcompaction with main compaction later. Right now this PR covers our use case.

int base_input_level;
// the output level of the compaction.
int output_level;
};

struct MemTableInfo {
// the name of the column family to which memtable belongs
std::string cf_name;
Expand Down Expand Up @@ -345,6 +362,16 @@ class EventListener {
virtual void OnCompactionCompleted(DB* /*db*/,
const CompactionJobInfo& /*ci*/) {}

// A callback function for RocksDB which will be called each time when
// a registered RocksDB uses multiple subcompactions to compact a file. The
// callback is called by each subcompaction and in the same thread.
virtual void OnSubcompactionBegin(const SubcompactionJobInfo& /*si*/) {}

// A callback function for RocksDB which will be called each time when
// a registered RocksDB uses multiple subcompactions to compact a file. The
// callback is called by each subcompaction and in the same thread.
virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) {}

// A callback function for RocksDB which will be called whenever
// a SST file is created. Different from OnCompactionCompleted and
// OnFlushCompleted, this callback is designed for external logging
Expand Down