Skip to content

Commit

Permalink
Delete duplicated key/value pairs recursively
Browse files Browse the repository at this point in the history
    This is to implement the idea: http://pad.ceph.com/p/rocksdb-wal-improvement
    Add a new flush style called kFlushStyleDedup which users can config by setting
    flush_style=kFlushStyleDedup. When flush is triggered, it dedups the key/value
    pairs in the oldest memtable against other memtables before flushing the
    oldest memtable into L0.

    The flush solution benefits for the data which are duplicated between memtables.
    With this flush, it can decrease the data flushed into L0 a lot.

    Signed-off-by: Xiaoyan Li <xiaoyan.li@intel.com>
  • Loading branch information
lixiaoy1 committed Mar 7, 2018
1 parent ef29d2a commit d03ed3c
Show file tree
Hide file tree
Showing 27 changed files with 249 additions and 31 deletions.
79 changes: 69 additions & 10 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ Status BuildTable(
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
InternalIterator* cmp_iter,
std::unique_ptr<InternalIterator> cmp_range_del_iter) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
uint64_t num_duplicated = 0, num_total = 0;
meta->fd.file_size = 0;
iter->SeekToFirst();
std::unique_ptr<RangeDelAggregator> range_del_agg(
Expand All @@ -91,6 +94,15 @@ Status BuildTable(
// may be non-ok if a range tombstone key is unparsable
return s;
}
std::unique_ptr<RangeDelAggregator> cmp_range_del_agg(
new RangeDelAggregator(internal_comparator, snapshots));
if (cmp_range_del_iter != nullptr) {
s = cmp_range_del_agg->AddTombstones(std::move(cmp_range_del_iter));
if (!s.ok()) {
// may be non-ok if a range tombstone key is unparsable
return s;
}
}

std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
Expand Down Expand Up @@ -139,19 +151,64 @@ Status BuildTable(
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();

ParsedInternalKey c_ikey, cmp_ikey;
if (cmp_iter != nullptr) {
cmp_iter->SeekToFirst();
}
const rocksdb::Comparator *comp = internal_comparator.user_comparator();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
bool skip = false;
num_total++;

// Check whether the key is duplicated in later mems
if (ParseInternalKey(key, &c_ikey)) {
if (cmp_iter != nullptr ) {
while (cmp_iter->Valid()) {
if (!ParseInternalKey(cmp_iter->key(), &cmp_ikey)) {
cmp_iter->Next();
continue;
}
int result = comp->Compare(c_ikey.user_key, cmp_ikey.user_key);
if (result == 0) {
// No delete for merge options.
if (c_ikey.type != kTypeMerge && cmp_ikey.type != kTypeMerge) {
skip = true;
}
break;
}
else if (result > 0) {
cmp_iter->Next();
}
else {
break;
}
}
}
if (!skip && cmp_range_del_agg->ShouldDelete(c_ikey)) {
skip = true;
}
}

if (skip) {
num_duplicated++;
continue;
}
else {
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
}

// nullptr for table_{min,max} so all range tombstones will be flushed
range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */,
nullptr /* upper_bound */, meta);
Expand Down Expand Up @@ -219,9 +276,11 @@ Status BuildTable(
}

// Output to event logger and fire events.
bool show_num = (cmp_iter != nullptr ||
cmp_range_del_iter != nullptr)?true:false;
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
job_id, meta->fd, tp, reason, s, show_num, num_total, num_duplicated);

return s;
}
Expand Down
5 changes: 3 additions & 2 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern Status BuildTable(
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET);

Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
InternalIterator* cmp_iter = nullptr,
std::unique_ptr<InternalIterator> cmp_range_del_iter = nullptr);
} // namespace rocksdb
4 changes: 4 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,10 @@ void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t* opt, int n)
opt->rep.max_write_buffer_number = n;
}

void rocksdb_options_set_flush_style(rocksdb_options_t* opt, int style) {
opt->rep.flush_style = static_cast<rocksdb::FlushStyle>(style);
}

void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt, int n) {
opt->rep.min_write_buffer_number_to_merge = n;
}
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}

if (result.max_write_buffer_number_to_maintain < 0) {
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
}
Expand Down Expand Up @@ -375,6 +376,7 @@ ColumnFamilyData::ColumnFamilyData(
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
ioptions_.max_write_buffer_number_to_maintain),
stop(false),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ class ColumnFamilyData {
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void set_stop() { stop = true; }
bool is_stop() { return stop; }
bool is_flush_recursive_dedup() { return (ioptions_.flush_style == kFlushStyleDedup); }
void SetCurrent(Version* _current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
Expand Down Expand Up @@ -386,6 +389,7 @@ class ColumnFamilyData {

MemTable* mem_;
MemTableList imm_;
bool stop;
SuperVersion* super_version_;

// An ordinal representing the current SuperVersion. Updated by
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CompactionPickerTest : public testing::Test {
DeleteVersionStorage();
options_.num_levels = num_levels;
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
style, nullptr, false));
kFlushStyleMerge, style, nullptr, false));
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
}

Expand Down
1 change: 1 addition & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
cfd->set_stop();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
Expand Down
6 changes: 5 additions & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s) {
const Status& s, bool show_num,
const uint64_t total_num, const uint64_t dup_num) {
if (s.ok() && event_logger) {
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
Expand Down Expand Up @@ -89,6 +90,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< "num_data_blocks" << table_properties.num_data_blocks
<< "num_entries" << table_properties.num_entries
<< "filter_policy_name" << table_properties.filter_policy_name;
if (show_num) {
jwriter << "total_paris" << total_num << "duplicated_pairs" << dup_num;
}

// user collected properties
for (const auto& prop : table_properties.readable_properties) {
Expand Down
3 changes: 2 additions & 1 deletion db/event_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class EventHelpers {
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s);
const Status& s, const bool show_num = false,
const uint64_t total_num = 0, const uint64_t duplicated_num = 0);
static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
Expand Down
47 changes: 40 additions & 7 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,16 @@ void FlushJob::PickMemTable() {
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);

if (cfd_->is_flush_recursive_dedup() && !cfd_->is_stop()) {
cfd_->imm()->PickMemtablesToFlush(&mems_, &compare_mems_);
}
else {
cfd_->imm()->PickMemtablesToFlush(&mems_);
}
if (mems_.empty()) {
return;
}

ReportFlushInputSize(mems_);

// entries mems are (implicitly) sorted in ascending order by their created
Expand Down Expand Up @@ -287,6 +292,7 @@ Status FlushJob::WriteLevel0Table() {
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
Arena arena1;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems_) {
Expand All @@ -304,13 +310,30 @@ Status FlushJob::WriteLevel0Table() {
total_memory_usage += m->ApproximateMemoryUsage();
}

std::vector<InternalIterator*> cmp_memtables;
std::vector<InternalIterator*> cmp_range_del_iters;
uint64_t cmp_entries = 0;
uint64_t cmp_num_deletes = 0;
for (MemTable *m: compare_mems_) {
cmp_memtables.push_back(m->NewIterator(ro, &arena1));
auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
if (range_del_iter != nullptr) {
cmp_range_del_iters.push_back(range_del_iter);
}
cmp_entries += m->num_entries();
cmp_num_deletes += m->num_deletes();
}

event_logger_->Log()
<< "job" << job_context_->job_id << "event"
<< "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries" << total_num_entries
<< "num_deletes" << total_num_deletes << "memory_usage"
<< "num_memtables" << mems_.size() << "num_entries"<< total_num_entries
<< "num_deletes" << total_num_deletes << "memory_usage"
<< total_memory_usage << "flush_reason"
<< GetFlushReasonString(cfd_->GetFlushReason());
<< GetFlushReasonString(cfd_->GetFlushReason())
<< " compare mems count " << compare_mems_.size()
<< " compare mems total " << cmp_entries
<< " compare mems total delete " << cmp_num_deletes;

{
ScopedArenaIterator iter(
Expand All @@ -320,6 +343,15 @@ Status FlushJob::WriteLevel0Table() {
&cfd_->internal_comparator(),
range_del_iters.empty() ? nullptr : &range_del_iters[0],
static_cast<int>(range_del_iters.size())));

ScopedArenaIterator cmp_iter(
NewMergingIterator(
&cfd_->internal_comparator(), &cmp_memtables[0],
static_cast<int>(cmp_memtables.size()), &arena1));
std::unique_ptr<InternalIterator> cmp_range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(),
cmp_range_del_iters.empty() ? nullptr : &cmp_range_del_iters[0],
static_cast<int>(cmp_range_del_iters.size())));
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id,
Expand Down Expand Up @@ -353,7 +385,8 @@ Status FlushJob::WriteLevel0Table() {
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
oldest_key_time, write_hint);
oldest_key_time, write_hint,
cmp_iter.get(), std::move(cmp_range_del_iter));
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
autovector<MemTable*> compare_mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
Expand Down
26 changes: 26 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,32 @@ bool MemTableList::IsFlushPending() const {
return false;
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret,
autovector<MemTable*>* compare_ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
if (it == memlist.rbegin()) {
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
else {
compare_ret->push_back(m);
}
}
}
flush_requested_ = false; // start-flush request is complete
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
Expand Down
1 change: 1 addition & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class MemTableList {

// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(autovector<MemTable*>* mems, autovector<MemTable*>* compare_mems);
void PickMemtablesToFlush(autovector<MemTable*>* mems);

// Reset status of the given memtable list back to pending state so that
Expand Down
Loading

0 comments on commit d03ed3c

Please sign in to comment.