Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#14:optimize memtable switch latency #25

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ ColumnFamilyData::ColumnFamilyData(
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0),
db_paths_registered_(false) {
db_paths_registered_(false),
terminate_switch_memtable_(false),
switch_mem_(nullptr) {
if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held.
Expand Down Expand Up @@ -612,11 +614,44 @@ ColumnFamilyData::ColumnFamilyData(
}

RecalculateWriteStallConditions(mutable_cf_options_);
switch_memtable_thread_ =
std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this);
}

void ColumnFamilyData::PrepareSwitchMemTable() {
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_) {
return;
}

switch_memtable_thread_cv_.wait(lck);
}
}

// Construct new memtable with an empty initial sequence
switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0, true),
std::memory_order_release);
}
}

// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTable* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}

// remove from linked list
auto prev = prev_;
auto next = next_;
Expand Down Expand Up @@ -1059,10 +1094,30 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
return current_->GetSstFilesSize();
}

MemTable* ColumnFamilyData::GetSwitchMemtable(SequenceNumber sn) {
MemTable* switch_mem = nullptr;

{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();

if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem = ConstructNewMemtable(mutable_cf_options_, sn, false);
} else {
switch_mem->Activate(sn);
}

return switch_mem;
}

MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq,
bool pending) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_);
write_buffer_manager_, earliest_seq, id_, pending);
}

void ColumnFamilyData::CreateNewMemtable(
Expand Down
13 changes: 12 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,12 @@ class ColumnFamilyData {
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();

MemTable* GetSwitchMemtable(SequenceNumber sn);

// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
SequenceNumber earliest_seq,
bool pending = false);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);

Expand Down Expand Up @@ -538,6 +541,8 @@ class ColumnFamilyData {

std::vector<std::string> GetDbPaths() const;

void PrepareSwitchMemTable();

uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Expand Down Expand Up @@ -618,6 +623,12 @@ class ColumnFamilyData {
bool db_paths_registered_;

std::string full_history_ts_low_;

std::thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
bool terminate_switch_memtable_;
std::atomic<MemTable*> switch_mem_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
new_mem = cfd->GetSwitchMemtable(seq);
context->superversion_context.NewSuperVersion();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand Down
19 changes: 15 additions & 4 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber latest_seq, uint32_t column_family_id)
SequenceNumber latest_seq, uint32_t column_family_id,
bool pending)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
mem_tracker_(write_buffer_manager),
mem_tracker_(write_buffer_manager, pending),
arena_(moptions_.arena_block_size,
(write_buffer_manager != nullptr &&
(write_buffer_manager->enabled() ||
Expand All @@ -97,8 +98,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
Expand All @@ -112,6 +111,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
SetInitialSeq(latest_seq);
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand All @@ -131,6 +131,17 @@ MemTable::~MemTable() {
assert(refs_ == 0);
}

void MemTable::SetInitialSeq(SequenceNumber sn) {
earliest_seqno_ = sn;
creation_seq_ = sn;
}

void MemTable::Activate(SequenceNumber sn) {
SetInitialSeq(sn);
arena_.Activate();

}

size_t MemTable::ApproximateMemoryUsage() {
autovector<size_t> usages = {
arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
Expand Down
11 changes: 10 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class MemTable {
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id);
SequenceNumber earliest_seq, uint32_t column_family_id,
bool pending = false);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
Expand Down Expand Up @@ -403,6 +404,14 @@ class MemTable {

void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }

// Sets the initial sequence number for lazy initialization of the memtable
// NOTE: should only be called once before any other operation on the memtable
void SetInitialSeq(SequenceNumber sn);
// Sets the initial sequence number for lazy initialization of the memtable
// and activate mem_tracker_ if needed
// NOTE: should only be called once before any other operation on the memtable
void Activate(SequenceNumber sn);

// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
Expand Down
9 changes: 7 additions & 2 deletions memory/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ class Allocator {

class AllocTracker {
public:
explicit AllocTracker(WriteBufferManager* write_buffer_manager);
explicit AllocTracker(WriteBufferManager* write_buffer_manager, bool pending = false);
// No copying allowed
AllocTracker(const AllocTracker&) = delete;
void operator=(const AllocTracker&) = delete;

~AllocTracker();

// Will be called ONLY on a pending memtable
void Activate(size_t bytes);
void Allocate(size_t bytes);
// Call when we're finished allocating memory so we can free it from
// the write buffer's limit.
Expand All @@ -46,12 +49,14 @@ class AllocTracker {
void FreeMem();

bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; }

private:
bool AllocValid();
private:
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
bool done_allocating_;
bool freed_;
bool pending_;
};

} // namespace ROCKSDB_NAMESPACE
6 changes: 6 additions & 0 deletions memory/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size)
}
}

void Arena::Activate() {
if (tracker_ != nullptr) {
tracker_->Activate(kInlineSize);
}
}

Arena::~Arena() {
if (tracker_ != nullptr) {
assert(tracker_->is_freed());
Expand Down
2 changes: 2 additions & 0 deletions memory/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Arena : public Allocator {
AllocTracker* tracker = nullptr, size_t huge_page_size = 0);
~Arena();

void Activate();

char* Allocate(size_t bytes) override;

// huge_page_size: if >0, will try to allocate from huage page TLB.
Expand Down
4 changes: 4 additions & 0 deletions memory/concurrent_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class ConcurrentArena : public Allocator {
AllocTracker* tracker = nullptr,
size_t huge_page_size = 0);

void Activate() {
arena_.Activate();
}

char* Allocate(size_t bytes) override {
return AllocateImpl(bytes, false /*force_arena*/,
[this, bytes]() { return arena_.Allocate(bytes); });
Expand Down
27 changes: 19 additions & 8 deletions memtable/alloc_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,39 @@

namespace ROCKSDB_NAMESPACE {

AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager)
AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager, bool pending)
: write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0),
done_allocating_(false),
freed_(false) {}
freed_(false),
pending_(pending) {}

AllocTracker::~AllocTracker() { FreeMem(); }

void AllocTracker::Activate(size_t bytes) {
assert(pending_);
pending_ = false;
Allocate(bytes);
}

bool AllocTracker::AllocValid() {
return ((write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) && !pending_);

}

void AllocTracker::Allocate(size_t bytes) {
assert(write_buffer_manager_ != nullptr);
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {

if (AllocValid()) {
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
}

void AllocTracker::DoneAllocating() {
if (write_buffer_manager_ != nullptr && !done_allocating_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
if (AllocValid()) {
write_buffer_manager_->ScheduleFreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
Expand All @@ -49,8 +61,7 @@ void AllocTracker::FreeMem() {
DoneAllocating();
}
if (write_buffer_manager_ != nullptr && !freed_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
if (AllocValid()) {
write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
Expand Down