diff --git a/db/column_family.cc b/db/column_family.cc index 7558d2f45b..f1b5e83dba 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -542,7 +542,9 @@ ColumnFamilyData::ColumnFamilyData( prev_compaction_needed_bytes_(0), allow_2pc_(db_options.allow_2pc), last_memtable_id_(0), - db_paths_registered_(false) { + db_paths_registered_(false), + terminate_switch_memtable_(false), + switch_mem_(nullptr) { if (id_ != kDummyColumnFamilyDataId) { // TODO(cc): RegisterDbPaths can be expensive, considering moving it // outside of this constructor which might be called with db mutex held. @@ -612,11 +614,44 @@ ColumnFamilyData::ColumnFamilyData( } RecalculateWriteStallConditions(mutable_cf_options_); + switch_memtable_thread_ = + std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this); +} + +void ColumnFamilyData::PrepareSwitchMemTable() { + for (;;) { + { + std::unique_lock lck(switch_memtable_thread_mutex_); + while (switch_mem_.load(std::memory_order_acquire) != nullptr) { + if (terminate_switch_memtable_) { + return; + } + + switch_memtable_thread_cv_.wait(lck); + } + } + + // Construct new memtable with an empty initial sequence + switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0, true), + std::memory_order_release); + } } // DB mutex held ColumnFamilyData::~ColumnFamilyData() { assert(refs_.load(std::memory_order_relaxed) == 0); + { + std::unique_lock lck(switch_memtable_thread_mutex_); + terminate_switch_memtable_ = true; + } + switch_memtable_thread_cv_.notify_one(); + switch_memtable_thread_.join(); + + const MemTable* memtable = switch_mem_.exchange(nullptr); + if (memtable != nullptr) { + delete memtable; + } + // remove from linked list auto prev = prev_; auto next = next_; @@ -1059,10 +1094,30 @@ uint64_t ColumnFamilyData::GetLiveSstFilesSize() const { return current_->GetSstFilesSize(); } +MemTable* ColumnFamilyData::GetSwitchMemtable(SequenceNumber sn) { + MemTable* switch_mem = nullptr; + + { + std::unique_lock lck(switch_memtable_thread_mutex_); + switch_mem = switch_mem_.exchange(nullptr, std::memory_order_release); + } + switch_memtable_thread_cv_.notify_one(); + + if (switch_mem == nullptr) { + // No point in suspending, just construct the memtable here + switch_mem = ConstructNewMemtable(mutable_cf_options_, sn, false); + } else { + switch_mem->Activate(sn); + } + + return switch_mem; +} + MemTable* ColumnFamilyData::ConstructNewMemtable( - const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq, + bool pending) { return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, - write_buffer_manager_, earliest_seq, id_); + write_buffer_manager_, earliest_seq, id_, pending); } void ColumnFamilyData::CreateNewMemtable( diff --git a/db/column_family.h b/db/column_family.h index c374303662..4b0b178983 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -368,9 +368,12 @@ class ColumnFamilyData { // calculate the oldest log needed for the durability of this column family uint64_t OldestLogToKeep(); + MemTable* GetSwitchMemtable(SequenceNumber sn); + // See Memtable constructor for explanation of earliest_seq param. MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options, - SequenceNumber earliest_seq); + SequenceNumber earliest_seq, + bool pending = false); void CreateNewMemtable(const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq); @@ -538,6 +541,8 @@ class ColumnFamilyData { std::vector GetDbPaths() const; + void PrepareSwitchMemTable(); + uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. @@ -618,6 +623,12 @@ class ColumnFamilyData { bool db_paths_registered_; std::string full_history_ts_low_; + + std::thread switch_memtable_thread_; + std::mutex switch_memtable_thread_mutex_; + std::condition_variable switch_memtable_thread_cv_; + bool terminate_switch_memtable_; + std::atomic switch_mem_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 39657d4623..18bde1ddf2 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1952,7 +1952,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + new_mem = cfd->GetSwitchMemtable(seq); context->superversion_context.NewSuperVersion(); } ROCKS_LOG_INFO(immutable_db_options_.info_log, diff --git a/db/memtable.cc b/db/memtable.cc index 3ce44ea1d5..7a49f37d02 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -69,12 +69,13 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber latest_seq, uint32_t column_family_id) + SequenceNumber latest_seq, uint32_t column_family_id, + bool pending) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), - mem_tracker_(write_buffer_manager), + mem_tracker_(write_buffer_manager, pending), arena_(moptions_.arena_block_size, (write_buffer_manager != nullptr && (write_buffer_manager->enabled() || @@ -97,8 +98,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp, flush_completed_(false), file_number_(0), first_seqno_(0), - earliest_seqno_(latest_seq), - creation_seq_(latest_seq), mem_next_logfile_number_(0), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support @@ -112,6 +111,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, oldest_key_time_(std::numeric_limits::max()), atomic_flush_seqno_(kMaxSequenceNumber), approximate_memory_usage_(0) { + SetInitialSeq(latest_seq); UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -131,6 +131,17 @@ MemTable::~MemTable() { assert(refs_ == 0); } +void MemTable::SetInitialSeq(SequenceNumber sn) { + earliest_seqno_ = sn; + creation_seq_ = sn; +} + +void MemTable::Activate(SequenceNumber sn) { + SetInitialSeq(sn); + arena_.Activate(); + +} + size_t MemTable::ApproximateMemoryUsage() { autovector usages = { arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(), diff --git a/db/memtable.h b/db/memtable.h index 965404d25f..5fbd1e2aab 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -107,7 +107,8 @@ class MemTable { const ImmutableOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq, uint32_t column_family_id); + SequenceNumber earliest_seq, uint32_t column_family_id, + bool pending = false); // No copying allowed MemTable(const MemTable&) = delete; MemTable& operator=(const MemTable&) = delete; @@ -403,6 +404,14 @@ class MemTable { void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } + // Sets the initial sequence number for lazy initialization of the memtable + // NOTE: should only be called once before any other operation on the memtable + void SetInitialSeq(SequenceNumber sn); + // Sets the initial sequence number for lazy initialization of the memtable + // and activate mem_tracker_ if needed + // NOTE: should only be called once before any other operation on the memtable + void Activate(SequenceNumber sn); + // Returns the next active logfile number when this memtable is about to // be flushed to storage // REQUIRES: external synchronization to prevent simultaneous diff --git a/memory/allocator.h b/memory/allocator.h index 002ad5f1d8..c4a41aa657 100644 --- a/memory/allocator.h +++ b/memory/allocator.h @@ -32,12 +32,15 @@ class Allocator { class AllocTracker { public: - explicit AllocTracker(WriteBufferManager* write_buffer_manager); + explicit AllocTracker(WriteBufferManager* write_buffer_manager, bool pending = false); // No copying allowed AllocTracker(const AllocTracker&) = delete; void operator=(const AllocTracker&) = delete; ~AllocTracker(); + + // Will be called ONLY on a pending memtable + void Activate(size_t bytes); void Allocate(size_t bytes); // Call when we're finished allocating memory so we can free it from // the write buffer's limit. @@ -46,12 +49,14 @@ class AllocTracker { void FreeMem(); bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; } - + private: + bool AllocValid(); private: WriteBufferManager* write_buffer_manager_; std::atomic bytes_allocated_; bool done_allocating_; bool freed_; + bool pending_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/memory/arena.cc b/memory/arena.cc index bcdad5c76f..e302e760e3 100644 --- a/memory/arena.cc +++ b/memory/arena.cc @@ -66,6 +66,12 @@ Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size) } } +void Arena::Activate() { + if (tracker_ != nullptr) { + tracker_->Activate(kInlineSize); + } +} + Arena::~Arena() { if (tracker_ != nullptr) { assert(tracker_->is_freed()); diff --git a/memory/arena.h b/memory/arena.h index 07fc435596..b6f06ce67a 100644 --- a/memory/arena.h +++ b/memory/arena.h @@ -42,6 +42,8 @@ class Arena : public Allocator { AllocTracker* tracker = nullptr, size_t huge_page_size = 0); ~Arena(); + void Activate(); + char* Allocate(size_t bytes) override; // huge_page_size: if >0, will try to allocate from huage page TLB. diff --git a/memory/concurrent_arena.h b/memory/concurrent_arena.h index 9c55587e63..807213ce74 100644 --- a/memory/concurrent_arena.h +++ b/memory/concurrent_arena.h @@ -48,6 +48,10 @@ class ConcurrentArena : public Allocator { AllocTracker* tracker = nullptr, size_t huge_page_size = 0); + void Activate() { + arena_.Activate(); + } + char* Allocate(size_t bytes) override { return AllocateImpl(bytes, false /*force_arena*/, [this, bytes]() { return arena_.Allocate(bytes); }); diff --git a/memtable/alloc_tracker.cc b/memtable/alloc_tracker.cc index fe21343471..79961528aa 100644 --- a/memtable/alloc_tracker.cc +++ b/memtable/alloc_tracker.cc @@ -14,18 +14,31 @@ namespace ROCKSDB_NAMESPACE { -AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager) +AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager, bool pending) : write_buffer_manager_(write_buffer_manager), bytes_allocated_(0), done_allocating_(false), - freed_(false) {} + freed_(false), + pending_(pending) {} AllocTracker::~AllocTracker() { FreeMem(); } +void AllocTracker::Activate(size_t bytes) { + assert(pending_); + pending_ = false; + Allocate(bytes); +} + +bool AllocTracker::AllocValid() { + return ((write_buffer_manager_->enabled() || + write_buffer_manager_->cost_to_cache()) && !pending_); + +} + void AllocTracker::Allocate(size_t bytes) { assert(write_buffer_manager_ != nullptr); - if (write_buffer_manager_->enabled() || - write_buffer_manager_->cost_to_cache()) { + + if (AllocValid()) { bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_manager_->ReserveMem(bytes); } @@ -33,8 +46,7 @@ void AllocTracker::Allocate(size_t bytes) { void AllocTracker::DoneAllocating() { if (write_buffer_manager_ != nullptr && !done_allocating_) { - if (write_buffer_manager_->enabled() || - write_buffer_manager_->cost_to_cache()) { + if (AllocValid()) { write_buffer_manager_->ScheduleFreeMem( bytes_allocated_.load(std::memory_order_relaxed)); } else { @@ -49,8 +61,7 @@ void AllocTracker::FreeMem() { DoneAllocating(); } if (write_buffer_manager_ != nullptr && !freed_) { - if (write_buffer_manager_->enabled() || - write_buffer_manager_->cost_to_cache()) { + if (AllocValid()) { write_buffer_manager_->FreeMem( bytes_allocated_.load(std::memory_order_relaxed)); } else {