Skip to content

Commit

Permalink
fix bug which may case write hung
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed Mar 18, 2020
1 parent f0a3028 commit 7c0ffad
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
17 changes: 12 additions & 5 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
bool is_leader_thread = false;
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
WriteThread::WriteGroup memtable_write_group;
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
if (memtable_write_group.size > 1) {
is_leader_thread = true;
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
auto version_set = versions_->GetColumnFamilySet();
Expand Down Expand Up @@ -199,16 +201,21 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
while (writer.write_group->running.load(std::memory_order_acquire) >
writer.write_group->size) {
if (!write_thread_.write_queue_.RunFunc()) {
std::this_thread::yield();
while (writer.write_group->running.load(std::memory_order_acquire) > 1) {
// Write thread could exit and block itself if it is not a leader thread.
if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) {
break;
}
}
if (write_thread_.CompleteParallelMemTableWriter(&writer)) {
// We only allow leader_thread to finish this WriteGroup because there may be another task which is done by thread
// which is not in this WriteGroup, and it would not notify threads in WriteGroup. So we must make someone in this
// WriteGroup to complete it and leader thread is easy to be decided.
if (is_leader_thread) {
MemTableInsertStatusCheck(writer.status);
versions_->SetLastSequence(writer.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group);
} else {
write_thread_.CompleteParallelMemTableWriter(&writer);
}
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ TEST_P(DBWriteTest, MultiThreadWrite) {
constexpr int kNumThreads = 4;
constexpr int kNumWrite = 4;
constexpr int kNumBatch = 8;
constexpr int kBatchSize = 8;
constexpr int kBatchSize = 16;
options.env = mock_env.get();
options.write_buffer_size = 1024 * 128;
Reopen(options);
Expand Down
2 changes: 1 addition & 1 deletion db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
// If there is no task in the queue for a long time, we should block
// this thread to avoid costing too much CPU. Because there may be a
// large WriteBatch writing into memtable.
if ((now - spin_begin) > std::chrono::microseconds(max_yield_usec_ * 50)) {
if ((now - spin_begin) > std::chrono::microseconds(max_yield_usec_)) {
break;
}
}
Expand Down

0 comments on commit 7c0ffad

Please sign in to comment.