Skip to content

Commit

Permalink
Improve Multi Batch Write (tikv#154)
Browse files Browse the repository at this point in the history
* perform like normal pipelined write

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
Little-Wallace authored and tabokie committed May 12, 2022
1 parent 3c608b0 commit c6312bf
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 135 deletions.
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,12 @@ Status ColumnFamilyData::ValidateOptions(
"FIFO compaction only supported with max_open_files = -1.");
}

if (db_options.enable_multi_thread_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
}

return s;
}

Expand Down
18 changes: 8 additions & 10 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,9 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiThreadWrite;
virtual Status MultiThreadWrite(
const WriteOptions& options,
const std::vector<WriteBatch*>& updates) override;
using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
Expand Down Expand Up @@ -1296,12 +1295,11 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr,
uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in pipeline
// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_thread_write = false;
}
Expand Down
108 changes: 69 additions & 39 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,24 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

Status DBImpl::MultiThreadWrite(const WriteOptions& options,
const std::vector<WriteBatch*>& updates) {
Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) {
if (immutable_db_options_.enable_multi_thread_write) {
if (UNLIKELY(updates.empty())) {
return Status::OK();
}
autovector<WriteBatch*> batches;
for (auto w : updates) {
batches.push_back(w);
}
return MultiThreadWriteImpl(options, batches, nullptr, nullptr);
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& updates,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, updates, callback, log_ref);
WriteThread::Writer writer(write_options, std::move(my_batch), callback,
log_ref);
write_thread_.JoinBatchGroup(&writer);

WriteContext write_context;
Expand Down Expand Up @@ -170,36 +164,72 @@ Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
bool is_leader_thread = false;
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
WriteThread::WriteGroup memtable_write_group;
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
if (memtable_write_group.size > 1) {
is_leader_thread = true;
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
&trim_history_scheduler_, ignore_missing_faimly, this,
&write_thread_.write_queue_);
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
&trim_history_scheduler_, ignore_missing_faimly, this,
&write_thread_.write_queue_);
}
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
std::function<void()> work;
if (write_thread_.write_queue_.PopFront(work)) {
work();
} else {
std::this_thread::yield();
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
if (!write_thread_.write_queue_.RunFunc()) {
std::this_thread::yield();
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
}
}
if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(writer.ShouldWriteToMemtable());
auto version_set = versions_->GetColumnFamilySet();
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
&trim_history_scheduler_, ignore_missing_faimly, this,
&write_thread_.write_queue_);
// Because `LaunchParallelMemTableWriters` has add `write_group->size` to
// `running`,
// the value of `running` is always larger than one if the leader thread
// does not
// call `CompleteParallelMemTableWriter`.
while (writer.write_group->running.load(std::memory_order_acquire) > 1) {
// Write thread could exit and block itself if it is not a leader thread.
if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) {
break;
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
// We only allow leader_thread to finish this WriteGroup because there may
// be another task which is done by the thread that is not in this
// WriteGroup,
// and it would not notify the threads in this WriteGroup. So we must make
// someone in
// this WriteGroup to complete it and leader thread is easy to be decided.
if (is_leader_thread) {
MemTableInsertStatusCheck(writer.status);
versions_->SetLastSequence(writer.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group);
} else {
write_thread_.CompleteParallelMemTableWriter(&writer);
}
}

if (seq_used != nullptr) {
*seq_used = writer.sequence;
}
Expand Down Expand Up @@ -299,10 +329,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (immutable_db_options_.enable_multi_thread_write) {
autovector<WriteBatch*> updates;
updates.push_back(my_batch);
return MultiThreadWriteImpl(write_options, updates, callback, log_used,
log_ref, seq_used);
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback,
log_used, log_ref, seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
Expand Down
26 changes: 14 additions & 12 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
if (!options.enable_multi_thread_write) {
return;
}
constexpr int kNumThreads = 8;
constexpr int kNumThreads = 4;
constexpr int kNumWrite = 4;
constexpr int kNumBatch = 8;
constexpr int kBatchSize = 16;
options.env = mock_env.get();
options.write_buffer_size = 1024 * 128;
Reopen(options);
Expand All @@ -467,21 +470,20 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
threads.push_back(port::Thread(
[&](int index) {
WriteOptions opt;
for (int j = 0; j < 64; j++) {
std::vector<WriteBatch> data(kNumBatch);
for (int j = 0; j < kNumWrite; j++) {
std::vector<WriteBatch*> batches;
for (int i = 0; i < 4; i++) {
WriteBatch* batch = new WriteBatch;
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumBatch; i++) {
WriteBatch* batch = &data[i];
batch->Clear();
for (int k = 0; k < kBatchSize; k++) {
batch->Put("key_" + ToString(index) + "_" + ToString(j) + "_" +
ToString(i) + "_" + ToString(k),
"value" + ToString(k));
}
batches.push_back(batch);
}
dbfull()->MultiThreadWrite(opt, batches);
for (auto b : batches) {
delete b;
}
dbfull()->MultiBatchWrite(opt, std::move(batches));
}
},
t));
Expand All @@ -492,9 +494,9 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
ReadOptions opt;
for (int t = 0; t < kNumThreads; t++) {
std::string value;
for (int i = 0; i < 64; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumWrite; i++) {
for (int j = 0; j < kNumBatch; j++) {
for (int k = 0; k < kBatchSize; k++) {
ASSERT_OK(dbfull()->Get(opt,
"key_" + ToString(t) + "_" + ToString(i) +
"_" + ToString(j) + "_" + ToString(k),
Expand Down
32 changes: 20 additions & 12 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}

uint32_t WriteBatchInternal::Count(const autovector<WriteBatch*> b) {
uint32_t WriteBatchInternal::Count(const std::vector<WriteBatch*> b) {
uint32_t count = 0;
for (auto w : b) {
count += DecodeFixed32(w->rep_.data() + 8);
Expand Down Expand Up @@ -2367,30 +2367,38 @@ void WriteBatchInternal::AsyncInsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilySet* version_set, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, DB* db,
SafeQueue<std::function<void()>>* pool) {
bool ignore_missing_column_families, DB* db, SafeFuncQueue* pool) {
auto write_group = writer->write_group;
write_group->running.fetch_add(writer->batches.size(),
std::memory_order_seq_cst);
for (auto w : writer->batches) {
pool->PushBack([=]() {
auto batch_size = writer->batches.size();
write_group->running.fetch_add(batch_size);
for (size_t i = 0; i < batch_size; i++) {
auto f = [=]() {
ColumnFamilyMemTablesImpl memtables(version_set);
MemTableInserter inserter(
sequence, &memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, 0 /*recovering_log_number*/, db,
true /*concurrent_memtable_writes*/, nullptr /*prot_info*/,
nullptr /*has_valid_writes*/);
inserter.set_log_number_ref(writer->log_ref);
SetSequence(w, sequence);
Status s = w->Iterate(&inserter);
SetSequence(writer->batches[i], sequence);
Status s = writer->batches[i]->Iterate(&inserter);
if (!s.ok()) {
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
write_group->status = s;
}
inserter.PostProcess();
write_group->running.fetch_sub(1, std::memory_order_release);
});
sequence += WriteBatchInternal::Count(w);
write_group->running.fetch_sub(1);
};
if (i + 1 == batch_size) {
// If there is only one WriteBatch written by this thread, It shall do it
// by self, because this batch may be large. And every thread does the
// latest
// one by self will reduce the cost of calling `SafeFuncQueue::Push`.
f();
} else {
pool->Push(std::move(f));
sequence += WriteBatchInternal::Count(writer->batches[i]);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class WriteBatchInternal {
// Return the number of entries in the batch.
static uint32_t Count(const WriteBatch* batch);

static uint32_t Count(const autovector<WriteBatch*> batch);
static uint32_t Count(const std::vector<WriteBatch*> batch);

// Set the count for the number of entries in the batch.
static void SetCount(WriteBatch* batch, uint32_t n);
Expand All @@ -157,7 +157,7 @@ class WriteBatchInternal {
return batch->rep_.size();
}

static size_t ByteSize(const autovector<WriteBatch*> batch) {
static size_t ByteSize(const std::vector<WriteBatch*> batch) {
size_t count = 0;
for (auto w : batch) {
count += w->rep_.size();
Expand Down Expand Up @@ -223,7 +223,7 @@ class WriteBatchInternal {
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, DB* db,
SafeQueue<std::function<void()>>* pool);
SafeFuncQueue* pool);

static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
Expand Down
Loading

0 comments on commit c6312bf

Please sign in to comment.