Skip to content

Commit

Permalink
Improve Multi Batch Write (#154)
Browse files Browse the repository at this point in the history
* perform like normal pipelined write

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace authored and Yi Wu committed Mar 20, 2020
1 parent d67f99e commit 3bf273a
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 138 deletions.
7 changes: 7 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,13 @@ Status ColumnFamilyData::ValidateOptions(
"Block-Based Table format. ");
}
}

if (db_options.enable_multi_thread_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
}

return s;
}

Expand Down
18 changes: 8 additions & 10 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiThreadWrite;
virtual Status MultiThreadWrite(
const WriteOptions& options,
const std::vector<WriteBatch*>& updates) override;
using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
Expand Down Expand Up @@ -1023,12 +1022,11 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr,
uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in pipeline
// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_thread_write = false;
}
Expand Down
102 changes: 63 additions & 39 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,23 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

Status DBImpl::MultiThreadWrite(const WriteOptions& options,
const std::vector<WriteBatch*>& updates) {
Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) {
if (immutable_db_options_.enable_multi_thread_write) {
if (UNLIKELY(updates.empty())) {
return Status::OK();
}
autovector<WriteBatch*> batches;
for (auto w : updates) {
batches.push_back(w);
}
return MultiThreadWriteImpl(options, batches, nullptr, nullptr);
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& updates,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, updates, callback, log_ref);
WriteThread::Writer writer(write_options, std::move(my_batch), callback,
log_ref);
write_thread_.JoinBatchGroup(&writer);

WriteContext write_context;
Expand Down Expand Up @@ -169,36 +163,66 @@ Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
bool is_leader_thread = false;
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
WriteThread::WriteGroup memtable_write_group;
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
if (memtable_write_group.size > 1) {
is_leader_thread = true;
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this,
&write_thread_.write_queue_);
}
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
std::function<void()> work;
if (write_thread_.write_queue_.PopFront(work)) {
work();
} else {
std::this_thread::yield();
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
if (!write_thread_.write_queue_.RunFunc()) {
std::this_thread::yield();
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
}
}
if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(writer.ShouldWriteToMemtable());
auto version_set = versions_->GetColumnFamilySet();
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
// Because `LaunchParallelMemTableWriters` has add `write_group->size` to `running`,
// the value of `running` is always larger than one if the leader thread does not
// call `CompleteParallelMemTableWriter`.
while (writer.write_group->running.load(std::memory_order_acquire) > 1) {
// Write thread could exit and block itself if it is not a leader thread.
if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) {
break;
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
// We only allow leader_thread to finish this WriteGroup because there may
// be another task which is done by the thread that is not in this WriteGroup,
// and it would not notify the threads in this WriteGroup. So we must make someone in
// this WriteGroup to complete it and leader thread is easy to be decided.
if (is_leader_thread) {
MemTableInsertStatusCheck(writer.status);
versions_->SetLastSequence(writer.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group);
} else {
write_thread_.CompleteParallelMemTableWriter(&writer);
}
}

if (seq_used != nullptr) {
*seq_used = writer.sequence;
}
Expand Down Expand Up @@ -293,10 +317,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (immutable_db_options_.enable_multi_thread_write) {
autovector<WriteBatch*> updates;
updates.push_back(my_batch);
return MultiThreadWriteImpl(write_options, updates, callback, log_used,
log_ref, seq_used);
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback,
log_used, log_ref, seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
Expand Down
26 changes: 14 additions & 12 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
if (!options.enable_multi_thread_write) {
return;
}
constexpr int kNumThreads = 8;
constexpr int kNumThreads = 4;
constexpr int kNumWrite = 4;
constexpr int kNumBatch = 8;
constexpr int kBatchSize = 16;
options.env = mock_env.get();
options.write_buffer_size = 1024 * 128;
Reopen(options);
Expand All @@ -201,21 +204,20 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
threads.push_back(port::Thread(
[&](int index) {
WriteOptions opt;
for (int j = 0; j < 64; j++) {
std::vector<WriteBatch> data(kNumBatch);
for (int j = 0; j < kNumWrite; j++) {
std::vector<WriteBatch*> batches;
for (int i = 0; i < 4; i++) {
WriteBatch* batch = new WriteBatch;
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumBatch; i++) {
WriteBatch* batch = &data[i];
batch->Clear();
for (int k = 0; k < kBatchSize; k++) {
batch->Put("key_" + ToString(index) + "_" + ToString(j) + "_" +
ToString(i) + "_" + ToString(k),
"value" + ToString(k));
}
batches.push_back(batch);
}
dbfull()->MultiThreadWrite(opt, batches);
for (auto b : batches) {
delete b;
}
dbfull()->MultiBatchWrite(opt, std::move(batches));
}
},
t));
Expand All @@ -226,9 +228,9 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
ReadOptions opt;
for (int t = 0; t < kNumThreads; t++) {
std::string value;
for (int i = 0; i < 64; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumWrite; i++) {
for (int j = 0; j < kNumBatch; j++) {
for (int k = 0; k < kBatchSize; k++) {
ASSERT_OK(dbfull()->Get(opt,
"key_" + ToString(t) + "_" + ToString(i) +
"_" + ToString(j) + "_" + ToString(k),
Expand Down
39 changes: 24 additions & 15 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}

int WriteBatchInternal::Count(const autovector<WriteBatch*> b) {
int WriteBatchInternal::Count(const std::vector<WriteBatch*> b) {
int count = 0;
for (auto w : b) {
count += DecodeFixed32(w->rep_.data() + 8);
Expand Down Expand Up @@ -1986,32 +1986,41 @@ Status WriteBatchInternal::InsertInto(
return s;
}

void WriteBatchInternal::AsyncInsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilySet* version_set, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
DB* db, SafeQueue<std::function<void()>>* pool) {
void WriteBatchInternal::AsyncInsertInto(WriteThread::Writer* writer,
SequenceNumber sequence,
ColumnFamilySet* version_set,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
DB* db, SafeFuncQueue* pool) {
auto write_group = writer->write_group;
write_group->running.fetch_add(writer->batches.size(),
std::memory_order_seq_cst);
for (auto w : writer->batches) {
pool->PushBack([=]() {
auto batch_size = writer->batches.size();
write_group->running.fetch_add(batch_size);
for (size_t i = 0; i < batch_size; i++) {
auto f = [=]() {
ColumnFamilyMemTablesImpl memtables(version_set);
MemTableInserter inserter(
sequence, &memtables, flush_scheduler, ignore_missing_column_families,
0 /*recovering_log_number*/, db, true /*concurrent_memtable_writes*/,
nullptr /*has_valid_writes*/);
inserter.set_log_number_ref(writer->log_ref);
SetSequence(w, sequence);
Status s = w->Iterate(&inserter);
SetSequence(writer->batches[i], sequence);
Status s = writer->batches[i]->Iterate(&inserter);
if (!s.ok()) {
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
write_group->status = s;
}
inserter.PostProcess();
write_group->running.fetch_sub(1, std::memory_order_release);
});
sequence += WriteBatchInternal::Count(w);
write_group->running.fetch_sub(1);
};
if (i + 1 == batch_size) {
// If there is only one WriteBatch written by this thread, It shall do it
// by self, because this batch may be large. And every thread does the latest
// one by self will reduce the cost of calling `SafeFuncQueue::Push`.
f();
} else {
pool->Push(std::move(f));
sequence += WriteBatchInternal::Count(writer->batches[i]);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class WriteBatchInternal {
// Return the number of entries in the batch.
static int Count(const WriteBatch* batch);

static int Count(const autovector<WriteBatch*> batch);
static int Count(const std::vector<WriteBatch*> batch);

// Set the count for the number of entries in the batch.
static void SetCount(WriteBatch* batch, int n);
Expand All @@ -140,7 +140,7 @@ class WriteBatchInternal {
return batch->rep_.size();
}

static size_t ByteSize(const autovector<WriteBatch*> batch) {
static size_t ByteSize(const std::vector<WriteBatch*> batch) {
size_t count = 0;
for (auto w : batch) {
count += w->rep_.size();
Expand Down Expand Up @@ -201,7 +201,7 @@ class WriteBatchInternal {
ColumnFamilySet* version_set,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, DB* db,
SafeQueue<std::function<void()>>* pool);
SafeFuncQueue* pool);

static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
Expand Down
Loading

0 comments on commit 3bf273a

Please sign in to comment.