Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Delete duplicated key/value pairs recursively #19

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level) {
TableProperties* table_properties, int level,
InternalIterator* compare_iter) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
uint64_t num_duplicated = 0, num_total = 0;
meta->fd.file_size = 0;
iter->SeekToFirst();
std::unique_ptr<RangeDelAggregator> range_del_agg(
Expand Down Expand Up @@ -138,17 +140,56 @@ Status BuildTable(
&snapshots, earliest_write_conflict_snapshot, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();

ParsedInternalKey c_ikey, comp_ikey;
if (compare_iter != nullptr) {
compare_iter->SeekToFirst(); //find the first one
}
const rocksdb::Comparator *comp = internal_comparator.user_comparator();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
bool skip = false;
num_total++;

// Check whether the key is duplicated in later mems
c_ikey = c_iter.ikey();
if (compare_iter != nullptr ) {
while (compare_iter->Valid()) {
if (!ParseInternalKey(compare_iter->key(), &comp_ikey)) {
compare_iter->Next();
continue;
}
int result = comp->Compare(c_ikey.user_key, comp_ikey.user_key);
if (result == 0) {
// No delete for merge options.
if (c_ikey.type != kTypeMerge && comp_ikey.type != kTypeMerge) {
skip = true;
}
break;
}
else if (result > 0) {
compare_iter->Next();
}
else {
break;
}
}
}
if (skip) {
num_duplicated++;
continue;
}
else {
builder->Add(key, value);
meta->UpdateBoundaries(key, c_iter.ikey().sequence);

// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
}
// nullptr for table_{min,max} so all range tombstones will be flushed
Expand Down Expand Up @@ -218,9 +259,10 @@ Status BuildTable(
}

// Output to event logger and fire events.
bool show_num = (compare_iter != nullptr)?true:false;
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
job_id, meta->fd, tp, reason, s, show_num, num_total, num_duplicated);

return s;
}
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extern Status BuildTable(
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1);
TableProperties* table_properties = nullptr, int level = -1,
InternalIterator* compare_iter = nullptr);

} // namespace rocksdb
8 changes: 8 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,14 @@ void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t* opt, int n)
opt->rep.max_write_buffer_number = n;
}

void rocksdb_options_set_flush_style(rocksdb_options_t* opt, int style) {
opt->rep.flush_style = static_cast<rocksdb::FlushStyle>(style);
}

void rocksdb_options_set_write_buffer_number_to_flush(rocksdb_options_t* opt, int n) {
opt->rep.write_buffer_number_to_flush = n;
}

void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t* opt, int n) {
opt->rep.min_write_buffer_number_to_merge = n;
}
Expand Down
13 changes: 12 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.min_write_buffer_number_to_merge = 1;
}

if (result.flush_style == kFlushStyleDedup) {
if (result.write_buffer_number_to_flush < 1 ||
(result.write_buffer_number_to_flush >
result.min_write_buffer_number_to_merge)) {
result.write_buffer_number_to_flush = 1;
}
}

if (result.num_levels < 1) {
result.num_levels = 1;
}
Expand All @@ -181,6 +189,7 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2;
}

if (result.max_write_buffer_number_to_maintain < 0) {
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
}
Expand Down Expand Up @@ -359,7 +368,9 @@ ColumnFamilyData::ColumnFamilyData(
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(ioptions_.min_write_buffer_number_to_merge,
ioptions_.max_write_buffer_number_to_maintain),
ioptions_.max_write_buffer_number_to_maintain,
ioptions_.write_buffer_number_to_flush),
stop(false),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class ColumnFamilyData {
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void set_stop() { stop = true; }
bool is_stop() { return stop; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the stop flag needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to decide whether choosing all immutable tables to flush when MemTableList::PickMemtablesToFlush. In the new flush style, if the db process is going to shutdown, it needs to flush all existed immutable memtables. https://github.com/ceph/rocksdb/pull/19/files#diff-f8dc8f7b1ea2cb77b0eb9b156bc434f2R133

bool is_flush_recursive_dedup() { return (ioptions_.flush_style == kFlushStyleDedup); }
void SetCurrent(Version* _current);
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
Expand Down Expand Up @@ -371,6 +374,7 @@ class ColumnFamilyData {

MemTable* mem_;
MemTableList imm_;
bool stop;
SuperVersion* super_version_;

// An ordinal representing the current SuperVersion. Updated by
Expand Down
1 change: 1 addition & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ TEST_F(ColumnFamilyTest, DifferentWriteBufferSizes) {
default_cf.arena_block_size = 4 * 4096;
default_cf.max_write_buffer_number = 10;
default_cf.min_write_buffer_number_to_merge = 1;
default_cf.write_buffer_number_to_flush = 0;
default_cf.max_write_buffer_number_to_maintain = 0;
one.write_buffer_size = 200000;
one.arena_block_size = 4 * 4096;
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CompactionPickerTest : public testing::Test {
DeleteVersionStorage();
options_.num_levels = num_levels;
vstorage_.reset(new VersionStorageInfo(&icmp_, ucmp_, options_.num_levels,
style, nullptr, false));
kFlushStyleMerge, style, nullptr, false));
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
}

Expand Down
1 change: 1 addition & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
cfd->set_stop();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
mutex_.Lock();
Expand Down
5 changes: 4 additions & 1 deletion db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s) {
const Status& s, bool show_num, int total_num, int dup_num) {
if (s.ok() && event_logger) {
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
Expand Down Expand Up @@ -73,6 +73,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< "num_data_blocks" << table_properties.num_data_blocks
<< "num_entries" << table_properties.num_entries
<< "filter_policy_name" << table_properties.filter_policy_name;
if (show_num) {
jwriter << "total_paris" << total_num << "duplicated_pairs" << dup_num;
}

// user collected properties
for (const auto& prop : table_properties.readable_properties) {
Expand Down
2 changes: 1 addition & 1 deletion db/event_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EventHelpers {
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s);
const Status& s, bool show_num = false, int total_num = 0, int duplicated_num = 0);
static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
Expand Down
33 changes: 29 additions & 4 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ void FlushJob::PickMemTable() {
assert(!pick_memtable_called);
pick_memtable_called = true;
// Save the contents of the earliest memtable as a new Table
cfd_->imm()->PickMemtablesToFlush(&mems_);

if (cfd_->is_flush_recursive_dedup() && !cfd_->is_stop()) {
cfd_->imm()->PickMemtablesToFlush(&mems_, &compare_mems_);
}
else {
cfd_->imm()->PickMemtablesToFlush(&mems_);
}
if (mems_.empty()) {
return;
}

ReportFlushInputSize(mems_);

// entries mems are (implicitly) sorted in ascending order by their created
Expand Down Expand Up @@ -257,6 +262,7 @@ Status FlushJob::WriteLevel0Table() {
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
Arena arena1;
uint64_t total_num_entries = 0, total_num_deletes = 0;
size_t total_memory_usage = 0;
for (MemTable* m : mems_) {
Expand All @@ -274,17 +280,36 @@ Status FlushJob::WriteLevel0Table() {
total_memory_usage += m->ApproximateMemoryUsage();
}

std::vector<InternalIterator*> compare_memtables;
int comp_entries = 0;
for (MemTable *m: compare_mems_) {
compare_memtables.push_back(m->NewIterator(ro, &arena1));
comp_entries += m->num_entries();
}

event_logger_->Log() << "job" << job_context_->job_id << "event"
<< "flush_started"
<< "num_memtables" << mems_.size() << "num_entries"
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
<< total_memory_usage << "mems table count "
<< mems_.size() << " compare mems count "
<< compare_mems_.size() << " compare mems total "
<< comp_entries;

{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
InternalIterator *compare_iter = nullptr;
ScopedArenaIterator compare_scope_iter;
if (compare_mems_.size() > 0) {
compare_scope_iter.set(
NewMergingIterator(&cfd_->internal_comparator(), &compare_memtables[0],
static_cast<int>(compare_memtables.size()), &arena1));
compare_iter = compare_scope_iter.get();
}

std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(),
range_del_iters.empty() ? nullptr : &range_del_iters[0],
Expand All @@ -308,7 +333,7 @@ Status FlushJob::WriteLevel0Table() {
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */);
Env::IO_HIGH, &table_properties_, 0, compare_iter);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class FlushJob {
// Variables below are set by PickMemTable():
FileMetaData meta_;
autovector<MemTable*> mems_;
autovector<MemTable*> compare_mems_;
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
Expand Down
27 changes: 27 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,33 @@ bool MemTableList::IsFlushPending() const {
return false;
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret,
autovector<MemTable*>* compare_ret) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
assert(write_buffer_number_to_flush_ > 0);
const auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
if (ret->size() < write_buffer_number_to_flush_) {
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
}
else {
compare_ret->push_back(m);
}
}
}
flush_requested_ = false; // start-flush request is complete
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
AutoThreadOperationStageUpdater stage_updater(
Expand Down
7 changes: 6 additions & 1 deletion db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,11 @@ class MemTableList {
public:
// A list of memtables.
explicit MemTableList(int min_write_buffer_number_to_merge,
int max_write_buffer_number_to_maintain)
int max_write_buffer_number_to_maintain,
int write_buffer_number_to_flush = 1)
: imm_flush_needed(false),
min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
write_buffer_number_to_flush_(write_buffer_number_to_flush),
current_(new MemTableListVersion(&current_memory_usage_,
max_write_buffer_number_to_maintain)),
num_flush_not_started_(0),
Expand Down Expand Up @@ -194,6 +196,7 @@ class MemTableList {

// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(autovector<MemTable*>* mems, autovector<MemTable*>* compare_mems);
void PickMemtablesToFlush(autovector<MemTable*>* mems);

// Reset status of the given memtable list back to pending state so that
Expand Down Expand Up @@ -242,6 +245,8 @@ class MemTableList {

const int min_write_buffer_number_to_merge_;

const unsigned int write_buffer_number_to_flush_;

MemTableListVersion* current_;

// the number of elements that still need flushing
Expand Down
Loading