Skip to content

Commit

Permalink
optimize memtable switch latency
Browse files Browse the repository at this point in the history
With the new speedb memtable, each memtable switch needs to wait
for the allocation and initialization of ~8MiB, which creates
spikes in latency when switching memtables. Add a thread per CF
to allocate new memtables in the background so that they'll be
ready to be used when a switch happens.
In case memtable is not ready when switch occor the memtable will be created
immediately
  • Loading branch information
ayulas committed Jun 28, 2022
1 parent 336cab6 commit d9915cd
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
56 changes: 55 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ ColumnFamilyData::ColumnFamilyData(
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0),
db_paths_registered_(false) {
db_paths_registered_(false),
terminate_switch_memtable_(false),
switch_mem_(nullptr) {
if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held.
Expand Down Expand Up @@ -612,11 +614,44 @@ ColumnFamilyData::ColumnFamilyData(
}

RecalculateWriteStallConditions(mutable_cf_options_);
switch_memtable_thread_ =
std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this);
}

void ColumnFamilyData::PrepareSwitchMemTable() {
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_) {
return;
}

switch_memtable_thread_cv_.wait(lck);
}
}

// Construct new memtable with an empty initial sequence
switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0),
std::memory_order_release);
}
}

// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTable* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}

// remove from linked list
auto prev = prev_;
auto next = next_;
Expand Down Expand Up @@ -1059,6 +1094,25 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
return current_->GetSstFilesSize();
}

MemTable* ColumnFamilyData::GetSwitchMemtable(SequenceNumber sn) {
MemTable* switch_mem = nullptr;

{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();

if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem = ConstructNewMemtable(mutable_cf_options_, sn);
} else {
switch_mem->SetInitialSeq(sn);
}

return switch_mem;
}

MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
Expand Down
10 changes: 10 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ class ColumnFamilyData {
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();

MemTable* GetSwitchMemtable(SequenceNumber sn);

// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
Expand Down Expand Up @@ -538,6 +540,8 @@ class ColumnFamilyData {

std::vector<std::string> GetDbPaths() const;

void PrepareSwitchMemTable();

uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Expand Down Expand Up @@ -618,6 +622,12 @@ class ColumnFamilyData {
bool db_paths_registered_;

std::string full_history_ts_low_;

std::thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
bool terminate_switch_memtable_;
std::atomic<MemTable*> switch_mem_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
new_mem = cfd->GetSwitchMemtable(seq);
context->superversion_context.NewSuperVersion();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand Down
8 changes: 6 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
Expand All @@ -112,6 +110,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
SetInitialSeq(latest_seq);
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand All @@ -131,6 +130,11 @@ MemTable::~MemTable() {
assert(refs_ == 0);
}

void MemTable::SetInitialSeq(SequenceNumber sn) {
earliest_seqno_ = sn;
creation_seq_ = sn;
}

size_t MemTable::ApproximateMemoryUsage() {
autovector<size_t> usages = {
arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
Expand Down
4 changes: 4 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ class MemTable {

void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }

// Sets the initial sequence number for lazy initialization of the memtable
// NOTE: should only be called once before any other operation on the memtable
void SetInitialSeq(SequenceNumber sn);

// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
Expand Down

0 comments on commit d9915cd

Please sign in to comment.