Skip to content

Commit

Permalink
CompactFiles, EventListener and GetDatabaseMetaData
Browse files Browse the repository at this point in the history
Summary:
This diff adds three sets of APIs to RocksDB.

= GetColumnFamilyMetaData =
* This APIs allow users to obtain the current state of a RocksDB instance on one column family.
* See GetColumnFamilyMetaData in include/rocksdb/db.h

= EventListener =
* A virtual class that allows users to implement a set of
  call-back functions which will be called when specific
  events of a RocksDB instance happens.
* To register EventListener, simply insert an EventListener to ColumnFamilyOptions::listeners

= CompactFiles =
* CompactFiles API inputs a set of file numbers and an output level, and RocksDB
  will try to compact those files into the specified level.

= Example =
* Example code can be found in example/compact_files_example.cc, which implements
  a simple external compactor using EventListener, GetColumnFamilyMetaData, and
  CompactFiles API.

Test Plan:
listener_test
compactor_test
example/compact_files_example
export ROCKSDB_TESTS=CompactFiles
db_test
export ROCKSDB_TESTS=MetaData
db_test

Reviewers: ljin, igor, rven, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D24705
  • Loading branch information
yhchiang committed Nov 7, 2014
1 parent 5c93090 commit 28c82ff
Show file tree
Hide file tree
Showing 30 changed files with 1,908 additions and 97 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ TESTS = \
cuckoo_table_db_test \
write_batch_with_index_test \
flush_job_test \
wal_manager_test
wal_manager_test \
listener_test \
write_batch_with_index_test

TOOLS = \
sst_dump \
Expand Down Expand Up @@ -502,6 +504,12 @@ cuckoo_table_reader_test: table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTH
cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

listener_test: db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

Expand Down
34 changes: 32 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {

uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

const std::string& ColumnFamilyHandleImpl::GetName() const {
return cfd()->GetName();
}

const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
Expand Down Expand Up @@ -255,10 +259,23 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else {
assert(ioptions_.compaction_style == kCompactionStyleFIFO);
} else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
compaction_picker_.reset(
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleNone) {
compaction_picker_.reset(new NullCompactionPicker(
ioptions_, &internal_comparator_));
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"Column family %s does not use any background compaction. "
"Compactions can only be done via CompactFiles\n",
GetName().c_str());
} else {
Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
"Unable to recognize the specified compaction style %d. "
"Column family %s will use kCompactionStyleLevel.\n",
ioptions_.compaction_style, GetName().c_str());
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
}

Log(InfoLogLevel::INFO_LEVEL,
Expand Down Expand Up @@ -503,6 +520,19 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false;
}

void ColumnFamilyData::NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop) {
auto listeners = ioptions()->listeners;
for (auto listener : listeners) {
listener->OnFlushCompleted(
db, GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
}
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
db_mutex->AssertHeld();
Expand Down
6 changes: 6 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual const Comparator* user_comparator() const;

virtual uint32_t GetID() const;
virtual const std::string& GetName() const override;

private:
ColumnFamilyData* cfd_;
Expand Down Expand Up @@ -250,6 +251,11 @@ class ColumnFamilyData {

void ResetThreadLocalSuperVersions();

void NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop);

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down
32 changes: 32 additions & 0 deletions db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,38 @@ Compaction::Compaction(int number_levels, int start_level, int out_level,
}
}

Compaction::Compaction(VersionStorageInfo* vstorage,
const autovector<CompactionInputFiles>& inputs,
int start_level, int output_level,
uint64_t max_grandparent_overlap_bytes,
const CompactionOptions& options,
bool deletion_compaction)
: start_level_(start_level),
output_level_(output_level),
max_output_file_size_(options.output_file_size_limit),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(nullptr), // TODO(yhchiang): set it later
number_levels_(vstorage->NumberLevels()),
cfd_(nullptr),
output_compression_(options.compression),
seek_compaction_(false),
deletion_compaction_(deletion_compaction),
inputs_(inputs),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
base_index_(-1),
parent_index_(-1),
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
}

Compaction::~Compaction() {
delete edit_;
if (input_version_ != nullptr) {
Expand Down
29 changes: 19 additions & 10 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class VersionStorageInfo;
// A Compaction encapsulates information about a compaction.
class Compaction {
public:
Compaction(VersionStorageInfo* input_version,
const autovector<CompactionInputFiles>& inputs,
int start_level, int output_level,
uint64_t max_grandparent_overlap_bytes,
const CompactionOptions& options,
bool deletion_compaction);

// No copying allowed
Compaction(const Compaction&) = delete;
void operator=(const Compaction&) = delete;
Expand Down Expand Up @@ -153,6 +160,8 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }

void SetOutputPathId(uint32_t path_id) { output_path_id_ = path_id; }

// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
Expand All @@ -164,6 +173,16 @@ class Compaction {

void SetInputVersion(Version* input_version);

// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);

// Initialize whether the compaction is producing files at the
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);

private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
Expand Down Expand Up @@ -226,16 +245,6 @@ class Compaction {
// records indices for all levels beyond "output_level_".
std::vector<size_t> level_ptrs_;

// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);

// Initialize whether the compaction is producing files at the
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);

// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
Expand Down
Loading

0 comments on commit 28c82ff

Please sign in to comment.