Skip to content

Commit

Permalink
Merge pull request #25 from speedb-io/14-import-shorten-memtable-swit…
Browse files Browse the repository at this point in the history
…ch-latency

#14:optimize memtable switch latency
  • Loading branch information
assaf-speedb authored Jul 25, 2022
2 parents ee70d7a + 7115af7 commit bdd716f
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 20 deletions.
61 changes: 58 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ ColumnFamilyData::ColumnFamilyData(
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0),
db_paths_registered_(false) {
db_paths_registered_(false),
terminate_switch_memtable_(false),
switch_mem_(nullptr) {
if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held.
Expand Down Expand Up @@ -612,11 +614,44 @@ ColumnFamilyData::ColumnFamilyData(
}

RecalculateWriteStallConditions(mutable_cf_options_);
switch_memtable_thread_ =
std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this);
}

void ColumnFamilyData::PrepareSwitchMemTable() {
for (;;) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
while (switch_mem_.load(std::memory_order_acquire) != nullptr) {
if (terminate_switch_memtable_) {
return;
}

switch_memtable_thread_cv_.wait(lck);
}
}

// Construct new memtable with an empty initial sequence
switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0, true),
std::memory_order_release);
}
}

// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTable* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}

// remove from linked list
auto prev = prev_;
auto next = next_;
Expand Down Expand Up @@ -1059,10 +1094,30 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
return current_->GetSstFilesSize();
}

MemTable* ColumnFamilyData::GetSwitchMemtable(SequenceNumber sn) {
MemTable* switch_mem = nullptr;

{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release);
}
switch_memtable_thread_cv_.notify_one();

if (switch_mem == nullptr) {
// No point in suspending, just construct the memtable here
switch_mem = ConstructNewMemtable(mutable_cf_options_, sn, false);
} else {
switch_mem->Activate(sn);
}

return switch_mem;
}

MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq,
bool pending) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_);
write_buffer_manager_, earliest_seq, id_, pending);
}

void ColumnFamilyData::CreateNewMemtable(
Expand Down
13 changes: 12 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,12 @@ class ColumnFamilyData {
// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();

MemTable* GetSwitchMemtable(SequenceNumber sn);

// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);
SequenceNumber earliest_seq,
bool pending = false);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);

Expand Down Expand Up @@ -538,6 +541,8 @@ class ColumnFamilyData {

std::vector<std::string> GetDbPaths() const;

void PrepareSwitchMemTable();

uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Expand Down Expand Up @@ -618,6 +623,12 @@ class ColumnFamilyData {
bool db_paths_registered_;

std::string full_history_ts_low_;

std::thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
bool terminate_switch_memtable_;
std::atomic<MemTable*> switch_mem_;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
}
if (s.ok()) {
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
new_mem = cfd->GetSwitchMemtable(seq);
context->superversion_context.NewSuperVersion();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand Down
19 changes: 15 additions & 4 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber latest_seq, uint32_t column_family_id)
SequenceNumber latest_seq, uint32_t column_family_id,
bool pending)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
mem_tracker_(write_buffer_manager),
mem_tracker_(write_buffer_manager, pending),
arena_(moptions_.arena_block_size,
(write_buffer_manager != nullptr &&
(write_buffer_manager->enabled() ||
Expand All @@ -97,8 +98,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
Expand All @@ -112,6 +111,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
SetInitialSeq(latest_seq);
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
Expand All @@ -131,6 +131,17 @@ MemTable::~MemTable() {
assert(refs_ == 0);
}

void MemTable::SetInitialSeq(SequenceNumber sn) {
earliest_seqno_ = sn;
creation_seq_ = sn;
}

void MemTable::Activate(SequenceNumber sn) {
SetInitialSeq(sn);
arena_.Activate();

}

size_t MemTable::ApproximateMemoryUsage() {
autovector<size_t> usages = {
arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
Expand Down
11 changes: 10 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class MemTable {
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id);
SequenceNumber earliest_seq, uint32_t column_family_id,
bool pending = false);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
Expand Down Expand Up @@ -403,6 +404,14 @@ class MemTable {

void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }

// Sets the initial sequence number for lazy initialization of the memtable
// NOTE: should only be called once before any other operation on the memtable
void SetInitialSeq(SequenceNumber sn);
// Sets the initial sequence number for lazy initialization of the memtable
// and activate mem_tracker_ if needed
// NOTE: should only be called once before any other operation on the memtable
void Activate(SequenceNumber sn);

// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
Expand Down
9 changes: 7 additions & 2 deletions memory/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ class Allocator {

class AllocTracker {
public:
explicit AllocTracker(WriteBufferManager* write_buffer_manager);
explicit AllocTracker(WriteBufferManager* write_buffer_manager, bool pending = false);
// No copying allowed
AllocTracker(const AllocTracker&) = delete;
void operator=(const AllocTracker&) = delete;

~AllocTracker();

// Will be called ONLY on a pending memtable
void Activate(size_t bytes);
void Allocate(size_t bytes);
// Call when we're finished allocating memory so we can free it from
// the write buffer's limit.
Expand All @@ -46,12 +49,14 @@ class AllocTracker {
void FreeMem();

bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; }

private:
bool AllocValid();
private:
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
bool done_allocating_;
bool freed_;
bool pending_;
};

} // namespace ROCKSDB_NAMESPACE
6 changes: 6 additions & 0 deletions memory/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size)
}
}

void Arena::Activate() {
if (tracker_ != nullptr) {
tracker_->Activate(kInlineSize);
}
}

Arena::~Arena() {
if (tracker_ != nullptr) {
assert(tracker_->is_freed());
Expand Down
2 changes: 2 additions & 0 deletions memory/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Arena : public Allocator {
AllocTracker* tracker = nullptr, size_t huge_page_size = 0);
~Arena();

void Activate();

char* Allocate(size_t bytes) override;

// huge_page_size: if >0, will try to allocate from huage page TLB.
Expand Down
4 changes: 4 additions & 0 deletions memory/concurrent_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class ConcurrentArena : public Allocator {
AllocTracker* tracker = nullptr,
size_t huge_page_size = 0);

void Activate() {
arena_.Activate();
}

char* Allocate(size_t bytes) override {
return AllocateImpl(bytes, false /*force_arena*/,
[this, bytes]() { return arena_.Allocate(bytes); });
Expand Down
27 changes: 19 additions & 8 deletions memtable/alloc_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,39 @@

namespace ROCKSDB_NAMESPACE {

AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager)
AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager, bool pending)
: write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0),
done_allocating_(false),
freed_(false) {}
freed_(false),
pending_(pending) {}

AllocTracker::~AllocTracker() { FreeMem(); }

void AllocTracker::Activate(size_t bytes) {
assert(pending_);
pending_ = false;
Allocate(bytes);
}

bool AllocTracker::AllocValid() {
return ((write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) && !pending_);

}

void AllocTracker::Allocate(size_t bytes) {
assert(write_buffer_manager_ != nullptr);
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {

if (AllocValid()) {
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
}

void AllocTracker::DoneAllocating() {
if (write_buffer_manager_ != nullptr && !done_allocating_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
if (AllocValid()) {
write_buffer_manager_->ScheduleFreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
Expand All @@ -49,8 +61,7 @@ void AllocTracker::FreeMem() {
DoneAllocating();
}
if (write_buffer_manager_ != nullptr && !freed_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
if (AllocValid()) {
write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
Expand Down

0 comments on commit bdd716f

Please sign in to comment.