From a23f4997f0117d700948b2a0a47edc2633347bca Mon Sep 17 00:00:00 2001 From: tabokie Date: Thu, 16 Mar 2023 09:30:09 +0800 Subject: [PATCH 1/2] apply merge version edits atomically Signed-off-by: tabokie --- db/db_impl/db_impl_merge.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/db/db_impl/db_impl_merge.cc b/db/db_impl/db_impl_merge.cc index 95217feec9b..f58196edc26 100644 --- a/db/db_impl/db_impl_merge.cc +++ b/db/db_impl/db_impl_merge.cc @@ -307,6 +307,13 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options, { autovector> edit_ptrs; autovector cf_mopts; + // Mark the version edits as an atomic group if the number of version edits + // exceeds 1. + if (num_cfs > 1) { + for (size_t i = 0; i < num_cfs; i++) { + cf_edits[i].MarkAtomicGroup(num_cfs - i - 1 /*remaining_entries*/); + } + } for (size_t i = 0; i < num_cfs; i++) { edit_ptrs.push_back({&cf_edits[i]}); cf_mopts.push_back(this_cfds[i]->GetLatestMutableCFOptions()); From 97248da7023a427c8c1dc802e3fece8cc360935f Mon Sep 17 00:00:00 2001 From: tabokie Date: Thu, 16 Mar 2023 14:52:59 +0800 Subject: [PATCH 2/2] fix flaky test Signed-off-by: tabokie --- db/db_write_test.cc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 32941149213..21461bdcbf9 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -586,6 +586,8 @@ TEST_P(DBWriteTest, PostWriteCallback) { the_first_can_exit_write_mutex.Lock(); port::Mutex can_flush_mutex; can_flush_mutex.Lock(); + port::Mutex the_second_can_enter_write_mutex; + the_second_can_enter_write_mutex.Lock(); port::Mutex the_second_can_exit_write_mutex; the_second_can_exit_write_mutex.Lock(); @@ -599,13 +601,14 @@ TEST_P(DBWriteTest, PostWriteCallback) { opts.disableWAL = true; SimpleCallback callback([&](SequenceNumber seq) { ASSERT_NE(seq, 0); + the_second_can_enter_write_mutex.Unlock(); can_flush_mutex.Unlock(); the_first_can_exit_write_mutex.Lock(); the_second_can_exit_write_mutex.Unlock(); + written.fetch_add(1, std::memory_order_relaxed); }); batch.Put("key", "value"); ASSERT_OK(dbfull()->Write(opts, &batch, &callback)); - written.fetch_add(1, std::memory_order_relaxed); })); threads.push_back(port::Thread([&] { WriteBatch batch; @@ -615,10 +618,11 @@ TEST_P(DBWriteTest, PostWriteCallback) { SimpleCallback callback([&](SequenceNumber seq) { ASSERT_NE(seq, 0); the_second_can_exit_write_mutex.Lock(); + written.fetch_add(1, std::memory_order_relaxed); }); batch.Put("key", "value"); + the_second_can_enter_write_mutex.Lock(); ASSERT_OK(dbfull()->Write(opts, &batch, &callback)); - written.fetch_add(1, std::memory_order_relaxed); })); // Flush will enter write thread and wait for pending writes. threads.push_back(port::Thread([&] { @@ -634,9 +638,13 @@ TEST_P(DBWriteTest, PostWriteCallback) { ASSERT_EQ(flushed.load(std::memory_order_relaxed), false); the_first_can_exit_write_mutex.Unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds{100}); + size_t wait = 0; + while (!flushed.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + wait += 1; + ASSERT_LE(wait, 100); + } ASSERT_EQ(written.load(std::memory_order_relaxed), 2); - ASSERT_EQ(flushed.load(std::memory_order_relaxed), true); for (auto& t : threads) { t.join();