Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Multi Batch Write #154

Merged
merged 20 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,13 @@ Status ColumnFamilyData::ValidateOptions(
"Block-Based Table format. ");
}
}

if (db_options.enable_multi_thread_write &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
}

return s;
}

Expand Down
18 changes: 8 additions & 10 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,9 @@ class DBImpl : public DB {
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

using DB::MultiThreadWrite;
virtual Status MultiThreadWrite(
const WriteOptions& options,
const std::vector<WriteBatch*>& updates) override;
using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
Expand Down Expand Up @@ -1023,12 +1022,11 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr,
uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in pipeline
// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_thread_write = false;
}
Expand Down
102 changes: 63 additions & 39 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,23 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

Status DBImpl::MultiThreadWrite(const WriteOptions& options,
const std::vector<WriteBatch*>& updates) {
Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) {
if (immutable_db_options_.enable_multi_thread_write) {
if (UNLIKELY(updates.empty())) {
return Status::OK();
}
autovector<WriteBatch*> batches;
for (auto w : updates) {
batches.push_back(w);
}
return MultiThreadWriteImpl(options, batches, nullptr, nullptr);
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
const autovector<WriteBatch*>& updates,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, updates, callback, log_ref);
WriteThread::Writer writer(write_options, std::move(my_batch), callback,
log_ref);
write_thread_.JoinBatchGroup(&writer);

WriteContext write_context;
Expand Down Expand Up @@ -169,36 +163,66 @@ Status DBImpl::MultiThreadWriteImpl(const WriteOptions& write_options,
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
bool is_leader_thread = false;
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
WriteThread::WriteGroup memtable_write_group;
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
if (memtable_write_group.size > 1) {
is_leader_thread = true;
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
it != memtable_write_group.end(); ++it) {
if (!it.writer->ShouldWriteToMemtable()) {
continue;
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
}
WriteBatchInternal::AsyncInsertInto(
it.writer, it.writer->sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this,
&write_thread_.write_queue_);
}
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
std::function<void()> work;
if (write_thread_.write_queue_.PopFront(work)) {
work();
} else {
std::this_thread::yield();
while (memtable_write_group.running.load(std::memory_order_acquire) > 0) {
if (!write_thread_.write_queue_.RunFunc()) {
std::this_thread::yield();
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
}
}
if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(writer.ShouldWriteToMemtable());
auto version_set = versions_->GetColumnFamilySet();
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
// Because `LaunchParallelMemTableWriters` has add `write_group->size` to `running`,
// the value of `running` is always larger than one if the leader thread does not
// call `CompleteParallelMemTableWriter`.
while (writer.write_group->running.load(std::memory_order_acquire) > 1) {
// Write thread could exit and block itself if it is not a leader thread.
if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) {
break;
}
}
MemTableInsertStatusCheck(memtable_write_group.status);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group);
// We only allow leader_thread to finish this WriteGroup because there may
// be another task which is done by the thread that is not in this WriteGroup,
// and it would not notify the threads in this WriteGroup. So we must make someone in
// this WriteGroup to complete it and leader thread is easy to be decided.
if (is_leader_thread) {
MemTableInsertStatusCheck(writer.status);
versions_->SetLastSequence(writer.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group);
} else {
write_thread_.CompleteParallelMemTableWriter(&writer);
}
}

if (seq_used != nullptr) {
*seq_used = writer.sequence;
}
Expand Down Expand Up @@ -293,10 +317,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (immutable_db_options_.enable_multi_thread_write) {
autovector<WriteBatch*> updates;
updates.push_back(my_batch);
return MultiThreadWriteImpl(write_options, updates, callback, log_used,
log_ref, seq_used);
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback,
log_used, log_ref, seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
Expand Down
26 changes: 14 additions & 12 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
if (!options.enable_multi_thread_write) {
return;
}
constexpr int kNumThreads = 8;
constexpr int kNumThreads = 4;
constexpr int kNumWrite = 4;
constexpr int kNumBatch = 8;
constexpr int kBatchSize = 16;
options.env = mock_env.get();
options.write_buffer_size = 1024 * 128;
Reopen(options);
Expand All @@ -201,21 +204,20 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
threads.push_back(port::Thread(
[&](int index) {
WriteOptions opt;
for (int j = 0; j < 64; j++) {
std::vector<WriteBatch> data(kNumBatch);
for (int j = 0; j < kNumWrite; j++) {
std::vector<WriteBatch*> batches;
for (int i = 0; i < 4; i++) {
WriteBatch* batch = new WriteBatch;
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumBatch; i++) {
WriteBatch* batch = &data[i];
batch->Clear();
for (int k = 0; k < kBatchSize; k++) {
batch->Put("key_" + ToString(index) + "_" + ToString(j) + "_" +
ToString(i) + "_" + ToString(k),
"value" + ToString(k));
}
batches.push_back(batch);
}
dbfull()->MultiThreadWrite(opt, batches);
for (auto b : batches) {
delete b;
}
dbfull()->MultiBatchWrite(opt, std::move(batches));
}
},
t));
Expand All @@ -226,9 +228,9 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
ReadOptions opt;
for (int t = 0; t < kNumThreads; t++) {
std::string value;
for (int i = 0; i < 64; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 64; k++) {
for (int i = 0; i < kNumWrite; i++) {
for (int j = 0; j < kNumBatch; j++) {
for (int k = 0; k < kBatchSize; k++) {
ASSERT_OK(dbfull()->Get(opt,
"key_" + ToString(t) + "_" + ToString(i) +
"_" + ToString(j) + "_" + ToString(k),
Expand Down
39 changes: 24 additions & 15 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}

int WriteBatchInternal::Count(const autovector<WriteBatch*> b) {
int WriteBatchInternal::Count(const std::vector<WriteBatch*> b) {
int count = 0;
for (auto w : b) {
count += DecodeFixed32(w->rep_.data() + 8);
Expand Down Expand Up @@ -1986,32 +1986,41 @@ Status WriteBatchInternal::InsertInto(
return s;
}

void WriteBatchInternal::AsyncInsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilySet* version_set, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
DB* db, SafeQueue<std::function<void()>>* pool) {
void WriteBatchInternal::AsyncInsertInto(WriteThread::Writer* writer,
SequenceNumber sequence,
ColumnFamilySet* version_set,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
DB* db, SafeFuncQueue* pool) {
auto write_group = writer->write_group;
write_group->running.fetch_add(writer->batches.size(),
std::memory_order_seq_cst);
for (auto w : writer->batches) {
pool->PushBack([=]() {
auto batch_size = writer->batches.size();
write_group->running.fetch_add(batch_size);
for (size_t i = 0; i < batch_size; i++) {
auto f = [=]() {
ColumnFamilyMemTablesImpl memtables(version_set);
MemTableInserter inserter(
sequence, &memtables, flush_scheduler, ignore_missing_column_families,
0 /*recovering_log_number*/, db, true /*concurrent_memtable_writes*/,
nullptr /*has_valid_writes*/);
inserter.set_log_number_ref(writer->log_ref);
SetSequence(w, sequence);
Status s = w->Iterate(&inserter);
SetSequence(writer->batches[i], sequence);
Status s = writer->batches[i]->Iterate(&inserter);
if (!s.ok()) {
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
write_group->status = s;
}
inserter.PostProcess();
write_group->running.fetch_sub(1, std::memory_order_release);
});
sequence += WriteBatchInternal::Count(w);
write_group->running.fetch_sub(1);
};
if (i + 1 == batch_size) {
// If there is only one WriteBatch written by this thread, It shall do it
// by self, because this batch may be large. And every thread does the latest
// one by self will reduce the cost of calling `SafeFuncQueue::Push`.
f();
} else {
pool->Push(std::move(f));
sequence += WriteBatchInternal::Count(writer->batches[i]);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class WriteBatchInternal {
// Return the number of entries in the batch.
static int Count(const WriteBatch* batch);

static int Count(const autovector<WriteBatch*> batch);
static int Count(const std::vector<WriteBatch*> batch);

// Set the count for the number of entries in the batch.
static void SetCount(WriteBatch* batch, int n);
Expand All @@ -140,7 +140,7 @@ class WriteBatchInternal {
return batch->rep_.size();
}

static size_t ByteSize(const autovector<WriteBatch*> batch) {
static size_t ByteSize(const std::vector<WriteBatch*> batch) {
size_t count = 0;
for (auto w : batch) {
count += w->rep_.size();
Expand Down Expand Up @@ -201,7 +201,7 @@ class WriteBatchInternal {
ColumnFamilySet* version_set,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, DB* db,
SafeQueue<std::function<void()>>* pool);
SafeFuncQueue* pool);

static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
Expand Down
Loading