From 3bf273ab76dec67d1ed981f8194906299f31ab34 Mon Sep 17 00:00:00 2001 From: Wallace Date: Thu, 19 Mar 2020 20:08:40 +0800 Subject: [PATCH] Improve Multi Batch Write (#154) * perform like normal pipelined write Signed-off-by: Little-Wallace --- db/column_family.cc | 7 +++ db/db_impl/db_impl.h | 18 +++---- db/db_impl/db_impl_open.cc | 2 +- db/db_impl/db_impl_write.cc | 102 ++++++++++++++++++++++-------------- db/db_write_test.cc | 26 ++++----- db/write_batch.cc | 39 ++++++++------ db/write_batch_internal.h | 6 +-- db/write_thread.cc | 36 +++++-------- db/write_thread.h | 9 ++-- include/rocksdb/db.h | 13 ++--- tools/db_bench_tool.cc | 4 +- util/safe_queue.h | 38 ++++++++------ 12 files changed, 162 insertions(+), 138 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 9f99fa1bf91..4adf0611b0f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1188,6 +1188,13 @@ Status ColumnFamilyData::ValidateOptions( "Block-Based Table format. "); } } + + if (db_options.enable_multi_thread_write && + cf_options.max_successive_merges > 0) { + return Status::NotSupported( + "Multi thread write is only supported with no successive merges"); + } + return s; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e9625af758b..67ba9d1e440 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -154,10 +154,9 @@ class DBImpl : public DB { virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; - using DB::MultiThreadWrite; - virtual Status MultiThreadWrite( - const WriteOptions& options, - const std::vector& updates) override; + using DB::MultiBatchWrite; + virtual Status MultiBatchWrite(const WriteOptions& options, + std::vector&& updates) override; using DB::Get; virtual Status Get(const ReadOptions& options, @@ -1023,12 +1022,11 @@ class DBImpl : public DB { size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); - Status MultiThreadWriteImpl(const WriteOptions& write_options, - const autovector& my_batch, - WriteCallback* callback, - uint64_t* log_used = nullptr, - uint64_t log_ref = 0, - uint64_t* seq_used = nullptr); + Status MultiBatchWriteImpl(const WriteOptions& write_options, + std::vector&& my_batch, + WriteCallback* callback, + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + uint64_t* seq_used = nullptr); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ac1d23678a3..8e9cfe8f353 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -121,7 +121,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.avoid_flush_during_recovery = false; } - // multi thread write do not support two-write-que or write in pipeline + // multi thread write do not support two-write-que or write in 2PC if (result.two_write_queues || result.allow_2pc) { result.enable_multi_thread_write = false; } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 3893b28bd15..b690fc0fad5 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -66,29 +66,23 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, } #endif // ROCKSDB_LITE -Status DBImpl::MultiThreadWrite(const WriteOptions& options, - const std::vector& updates) { +Status DBImpl::MultiBatchWrite(const WriteOptions& options, + std::vector&& updates) { if (immutable_db_options_.enable_multi_thread_write) { - if (UNLIKELY(updates.empty())) { - return Status::OK(); - } - autovector batches; - for (auto w : updates) { - batches.push_back(w); - } - return MultiThreadWriteImpl(options, batches, nullptr, nullptr); + return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr); } else { return Status::NotSupported(); } } -Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options, - const autovector& updates, - WriteCallback* callback, uint64_t* log_used, - uint64_t log_ref, uint64_t* seq_used) { +Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, + std::vector&& my_batch, + WriteCallback* callback, uint64_t* log_used, + uint64_t log_ref, uint64_t* seq_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - WriteThread::Writer writer(write_options, updates, callback, log_ref); + WriteThread::Writer writer(write_options, std::move(my_batch), callback, + log_ref); write_thread_.JoinBatchGroup(&writer); WriteContext write_context; @@ -169,36 +163,66 @@ Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options, } write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status); } + bool is_leader_thread = false; if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_GUARD(write_memtable_time); assert(writer.ShouldWriteToMemtable()); WriteThread::WriteGroup memtable_write_group; write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group); assert(immutable_db_options_.allow_concurrent_memtable_write); - auto version_set = versions_->GetColumnFamilySet(); - memtable_write_group.running.store(0); - for (auto it = memtable_write_group.begin(); - it != memtable_write_group.end(); ++it) { - if (!it.writer->ShouldWriteToMemtable()) { - continue; + if (memtable_write_group.size > 1) { + is_leader_thread = true; + write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); + } else { + auto version_set = versions_->GetColumnFamilySet(); + memtable_write_group.running.store(0); + for (auto it = memtable_write_group.begin(); + it != memtable_write_group.end(); ++it) { + if (!it.writer->ShouldWriteToMemtable()) { + continue; + } + WriteBatchInternal::AsyncInsertInto( + it.writer, it.writer->sequence, version_set, &flush_scheduler_, + ignore_missing_faimly, this, &write_thread_.write_queue_); } - WriteBatchInternal::AsyncInsertInto( - it.writer, it.writer->sequence, version_set, &flush_scheduler_, - ignore_missing_faimly, this, - &write_thread_.write_queue_); - } - while (memtable_write_group.running.load(std::memory_order_acquire) > 0) { - std::function work; - if (write_thread_.write_queue_.PopFront(work)) { - work(); - } else { - std::this_thread::yield(); + while (memtable_write_group.running.load(std::memory_order_acquire) > 0) { + if (!write_thread_.write_queue_.RunFunc()) { + std::this_thread::yield(); + } + } + MemTableInsertStatusCheck(memtable_write_group.status); + versions_->SetLastSequence(memtable_write_group.last_sequence); + write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group); + } + } + if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(writer.ShouldWriteToMemtable()); + auto version_set = versions_->GetColumnFamilySet(); + WriteBatchInternal::AsyncInsertInto( + &writer, writer.sequence, version_set, &flush_scheduler_, + ignore_missing_faimly, this, &write_thread_.write_queue_); + // Because `LaunchParallelMemTableWriters` has add `write_group->size` to `running`, + // the value of `running` is always larger than one if the leader thread does not + // call `CompleteParallelMemTableWriter`. + while (writer.write_group->running.load(std::memory_order_acquire) > 1) { + // Write thread could exit and block itself if it is not a leader thread. + if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) { + break; } } - MemTableInsertStatusCheck(memtable_write_group.status); - versions_->SetLastSequence(memtable_write_group.last_sequence); - write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group); + // We only allow leader_thread to finish this WriteGroup because there may + // be another task which is done by the thread that is not in this WriteGroup, + // and it would not notify the threads in this WriteGroup. So we must make someone in + // this WriteGroup to complete it and leader thread is easy to be decided. + if (is_leader_thread) { + MemTableInsertStatusCheck(writer.status); + versions_->SetLastSequence(writer.write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group); + } else { + write_thread_.CompleteParallelMemTableWriter(&writer); + } } + if (seq_used != nullptr) { *seq_used = writer.sequence; } @@ -293,10 +317,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (immutable_db_options_.enable_multi_thread_write) { - autovector updates; - updates.push_back(my_batch); - return MultiThreadWriteImpl(write_options, updates, callback, log_used, - log_ref, seq_used); + std::vector updates(1); + updates[0] = my_batch; + return MultiBatchWriteImpl(write_options, std::move(updates), callback, + log_used, log_ref, seq_used); } if (immutable_db_options_.enable_pipelined_write) { diff --git a/db/db_write_test.cc b/db/db_write_test.cc index b6247a9cec8..63fe32f2333 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -192,7 +192,10 @@ TEST_P(DBWriteTest, MultiThreadWrite) { if (!options.enable_multi_thread_write) { return; } - constexpr int kNumThreads = 8; + constexpr int kNumThreads = 4; + constexpr int kNumWrite = 4; + constexpr int kNumBatch = 8; + constexpr int kBatchSize = 16; options.env = mock_env.get(); options.write_buffer_size = 1024 * 128; Reopen(options); @@ -201,21 +204,20 @@ TEST_P(DBWriteTest, MultiThreadWrite) { threads.push_back(port::Thread( [&](int index) { WriteOptions opt; - for (int j = 0; j < 64; j++) { + std::vector data(kNumBatch); + for (int j = 0; j < kNumWrite; j++) { std::vector batches; - for (int i = 0; i < 4; i++) { - WriteBatch* batch = new WriteBatch; - for (int k = 0; k < 64; k++) { + for (int i = 0; i < kNumBatch; i++) { + WriteBatch* batch = &data[i]; + batch->Clear(); + for (int k = 0; k < kBatchSize; k++) { batch->Put("key_" + ToString(index) + "_" + ToString(j) + "_" + ToString(i) + "_" + ToString(k), "value" + ToString(k)); } batches.push_back(batch); } - dbfull()->MultiThreadWrite(opt, batches); - for (auto b : batches) { - delete b; - } + dbfull()->MultiBatchWrite(opt, std::move(batches)); } }, t)); @@ -226,9 +228,9 @@ TEST_P(DBWriteTest, MultiThreadWrite) { ReadOptions opt; for (int t = 0; t < kNumThreads; t++) { std::string value; - for (int i = 0; i < 64; i++) { - for (int j = 0; j < 4; j++) { - for (int k = 0; k < 64; k++) { + for (int i = 0; i < kNumWrite; i++) { + for (int j = 0; j < kNumBatch; j++) { + for (int k = 0; k < kBatchSize; k++) { ASSERT_OK(dbfull()->Get(opt, "key_" + ToString(t) + "_" + ToString(i) + "_" + ToString(j) + "_" + ToString(k), diff --git a/db/write_batch.cc b/db/write_batch.cc index 27670304cb0..05c9997df56 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -721,7 +721,7 @@ int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } -int WriteBatchInternal::Count(const autovector b) { +int WriteBatchInternal::Count(const std::vector b) { int count = 0; for (auto w : b) { count += DecodeFixed32(w->rep_.data() + 8); @@ -1986,32 +1986,41 @@ Status WriteBatchInternal::InsertInto( return s; } -void WriteBatchInternal::AsyncInsertInto( - WriteThread::Writer* writer, SequenceNumber sequence, - ColumnFamilySet* version_set, FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - DB* db, SafeQueue>* pool) { +void WriteBatchInternal::AsyncInsertInto(WriteThread::Writer* writer, + SequenceNumber sequence, + ColumnFamilySet* version_set, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + DB* db, SafeFuncQueue* pool) { auto write_group = writer->write_group; - write_group->running.fetch_add(writer->batches.size(), - std::memory_order_seq_cst); - for (auto w : writer->batches) { - pool->PushBack([=]() { + auto batch_size = writer->batches.size(); + write_group->running.fetch_add(batch_size); + for (size_t i = 0; i < batch_size; i++) { + auto f = [=]() { ColumnFamilyMemTablesImpl memtables(version_set); MemTableInserter inserter( sequence, &memtables, flush_scheduler, ignore_missing_column_families, 0 /*recovering_log_number*/, db, true /*concurrent_memtable_writes*/, nullptr /*has_valid_writes*/); inserter.set_log_number_ref(writer->log_ref); - SetSequence(w, sequence); - Status s = w->Iterate(&inserter); + SetSequence(writer->batches[i], sequence); + Status s = writer->batches[i]->Iterate(&inserter); if (!s.ok()) { std::lock_guard guard(write_group->leader->StateMutex()); write_group->status = s; } inserter.PostProcess(); - write_group->running.fetch_sub(1, std::memory_order_release); - }); - sequence += WriteBatchInternal::Count(w); + write_group->running.fetch_sub(1); + }; + if (i + 1 == batch_size) { + // If there is only one WriteBatch written by this thread, It shall do it + // by self, because this batch may be large. And every thread does the latest + // one by self will reduce the cost of calling `SafeFuncQueue::Push`. + f(); + } else { + pool->Push(std::move(f)); + sequence += WriteBatchInternal::Count(writer->batches[i]); + } } } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index e91c9a9f9d3..8e00898e3d9 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -116,7 +116,7 @@ class WriteBatchInternal { // Return the number of entries in the batch. static int Count(const WriteBatch* batch); - static int Count(const autovector batch); + static int Count(const std::vector batch); // Set the count for the number of entries in the batch. static void SetCount(WriteBatch* batch, int n); @@ -140,7 +140,7 @@ class WriteBatchInternal { return batch->rep_.size(); } - static size_t ByteSize(const autovector batch) { + static size_t ByteSize(const std::vector batch) { size_t count = 0; for (auto w : batch) { count += w->rep_.size(); @@ -201,7 +201,7 @@ class WriteBatchInternal { ColumnFamilySet* version_set, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, DB* db, - SafeQueue>* pool); + SafeFuncQueue* pool); static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); diff --git a/db/write_thread.cc b/db/write_thread.cc index 18e91040fe0..6f3db234bd5 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -142,39 +142,26 @@ uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, const int sampling_base = 256; if (enable_multi_thread_write_) { -#ifdef NDEBUG - std::function work; - while ((state & goal_mask) == 0) { - if (write_queue_.PopFront(work)) { - work(); - } else { - std::this_thread::yield(); - } - state = w->state.load(std::memory_order_acquire); - } - // In release build, we do not need to block thread, because it could help - // writer-leader to insert into memtable. -#else auto spin_begin = std::chrono::steady_clock::now(); - const int max_yield_usec = 2000; - std::function work; while ((state & goal_mask) == 0) { - if (write_queue_.PopFront(work)) { - work(); + if (write_queue_.RunFunc()) { + spin_begin = std::chrono::steady_clock::now(); } else { std::this_thread::yield(); + auto now = std::chrono::steady_clock::now(); + // If there is no task in the queue for a long time, we should block + // this thread to avoid costing too much CPU. Because there may be a + // large WriteBatch writing into memtable. + if ((now - spin_begin) > std::chrono::microseconds(max_yield_usec_)) { + break; + } } state = w->state.load(std::memory_order_acquire); - auto now = std::chrono::steady_clock::now(); - if ((now - spin_begin) > std::chrono::microseconds(max_yield_usec)) { - break; - } } if ((state & goal_mask) == 0) { TEST_SYNC_POINT_CALLBACK("WriteThread::AwaitState:BlockingWaiting", w); state = BlockingAwaitState(w, goal_mask); } -#endif return state; } @@ -537,7 +524,8 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, write_group->size = 1; Writer* last_writer = leader; - if (!allow_concurrent_memtable_write_ || !leader->batches[0]->HasMerge()) { + if (!allow_concurrent_memtable_write_ || enable_multi_thread_write_ || + !leader->batches[0]->HasMerge()) { Writer* newest_writer = newest_memtable_writer_.load(); CreateMissingNewerLinks(newest_writer); @@ -549,7 +537,7 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, break; } - if (w->batches[0]->HasMerge()) { + if (!enable_multi_thread_write_ && w->batches[0]->HasMerge()) { break; } diff --git a/db/write_thread.h b/db/write_thread.h index 9136ac90408..8a5882dfb3d 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -117,7 +117,7 @@ class WriteThread { // Information kept for every waiting writer. struct Writer { WriteBatch* batch; - autovector batches; + std::vector batches; bool sync; bool no_slowdown; bool disable_wal; @@ -180,9 +180,8 @@ class WriteThread { batches.push_back(_batch); } - Writer(const WriteOptions& write_options, - const autovector& _batch, WriteCallback* _callback, - uint64_t _log_ref, + Writer(const WriteOptions& write_options, std::vector&& _batch, + WriteCallback* _callback, uint64_t _log_ref, PreReleaseCallback* _pre_release_callback = nullptr) : batch(nullptr), batches(_batch), @@ -377,7 +376,7 @@ class WriteThread { // Remove the dummy writer and wake up waiting writers void EndWriteStall(); - SafeQueue> write_queue_; + SafeFuncQueue write_queue_; private: // See AwaitState. diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 3f23c3c6219..5c3b003c6fe 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -376,16 +376,9 @@ class DB { // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; - virtual Status MultiThreadWrite(const WriteOptions& options, - const std::vector& updates) { - Status s; - for (auto w : updates) { - s = Write(options, w); - if (!s.ok()) { - break; - } - } - return s; + virtual Status MultiBatchWrite(const WriteOptions& /*options*/, + std::vector&& /*updates*/) { + return Status::NotSupported(); } // If the database contains an entry for "key" store the diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index fe0e2e2aa54..c0e44832efe 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -4287,8 +4287,8 @@ class Benchmark { } } if (use_multi_write_) { - s = db_with_cfh->db->MultiThreadWrite(write_options_, - batches.GetWriteBatch()); + s = db_with_cfh->db->MultiBatchWrite(write_options_, + batches.GetWriteBatch()); } else if (!use_blob_db_) { s = db_with_cfh->db->Write(write_options_, &batch); } diff --git a/util/safe_queue.h b/util/safe_queue.h index 194bb76745d..b1329c3f8a3 100644 --- a/util/safe_queue.h +++ b/util/safe_queue.h @@ -5,41 +5,45 @@ #pragma once +#include #include #include #include namespace rocksdb { -template -class SafeQueue { +class SafeFuncQueue { + private: + struct Item { + std::function func; + }; + public: - SafeQueue() {} + SafeFuncQueue() {} - ~SafeQueue() {} + ~SafeFuncQueue() {} - bool PopFront(T &ret) { - if (0 == que_len_.load(std::memory_order_relaxed)) { - return false; - } - std::lock_guard lock(mu_); + bool RunFunc() { + mu_.lock(); if (que_.empty()) { + mu_.unlock(); return false; } - ret = std::move(que_.front()); + auto func = std::move(que_.front().func); que_.pop_front(); - que_len_.fetch_sub(1, std::memory_order_relaxed); + mu_.unlock(); + func(); return true; } - void PushBack(T &&v) { - std::lock_guard lock(mu_); - que_.push_back(v); - que_len_.fetch_add(1, std::memory_order_relaxed); + void Push(std::function &&v) { + std::lock_guard _guard(mu_); + que_.emplace_back(); + que_.back().func = std::move(v); } private: - std::deque que_; + std::deque que_; std::mutex mu_; - std::atomic que_len_; }; + } // namespace rocksdb