Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MultiGet to provide consistent CF view for kPersistedTier #13433

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3354,6 +3354,69 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
keys.data(), values.data(), statuses.data(), true);
}

TEST_F(DBBasicTest, MultiGetWithSnapshotsAndPersistedTier) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = true;
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf1", "cf2"}, options);

// Insert initial data
ASSERT_OK(Put(0, "key1", "value1_cf0"));
ASSERT_OK(Put(1, "key1", "value1_cf1"));
ASSERT_OK(Put(2, "key1", "value1_cf2"));
ASSERT_OK(Flush({0, 1, 2}));
for (auto cf : {0, 1, 2}) {
ASSERT_EQ(1, NumTableFilesAtLevel(0, cf));
}

ASSERT_OK(Put(0, "key1", "value2_cf0"));
ASSERT_OK(Put(1, "key1", "value2_cf1"));
ASSERT_OK(Put(2, "key1", "value2_cf2"));

// Prepare for concurrent atomic flush
std::atomic<bool> flush_done(false);
std::thread flush_thread([&]() {
ASSERT_OK(Flush({0, 1, 2}));
flush_done.store(true);
});

// Perform MultiGet with snapshot and read_tier = kPersistentTier
ReadOptions ro;
const Snapshot* snapshot = db_->GetSnapshot();
ro.snapshot = snapshot;
ro.read_tier = kPersistedTier;

std::string k = "key1";
std::vector<Slice> keys(3, Slice(k));
std::vector<Status> statuses(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size());
std::vector<Slice> new_keys(keys.size());
std::vector<PinnableSlice> pin_values(keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
cfs[i] = handles_[i];
}
db_->MultiGet(ro, cfs.size(), cfs.data(), keys.data(), pin_values.data(),
statuses.data());
for (const auto& s : statuses) {
ASSERT_OK(s);
}

if (pin_values[0] == "value1_cf0") {
// Check if the first value matches expected value
ASSERT_EQ(pin_values[1], "value1_cf1");
ASSERT_EQ(pin_values[2], "value1_cf2");
} else {
// If first value doesn't match, check if we got the updated values
ASSERT_EQ(pin_values[0], "value2_cf0");
ASSERT_EQ(pin_values[1], "value2_cf1");
ASSERT_EQ(pin_values[2], "value2_cf2");
}

flush_thread.join();
db_->ReleaseSnapshot(snapshot);
}

TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
Options options = CurrentOptions();
DestroyAndReopen(options);
Expand Down
30 changes: 17 additions & 13 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2704,7 +2704,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
};

bool last_try = false;
bool acquire_mutex = false;
if (cf_list->size() == 1) {
// Fast path for a single column family. We can simply get the thread local
// super version
Expand Down Expand Up @@ -2753,29 +2753,32 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// sure.
constexpr int num_retries = 3;
for (int i = 0; i < num_retries; ++i) {
last_try = (i == num_retries - 1);
// When reading from kPersistedTier, we want a consistent view into CFs.
// So we take mutex to prevent any SV change in any CF.
acquire_mutex = ((i == num_retries - 1) && !read_options.snapshot) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd change the order.

acquire_mutex =  read_options.read_tier == kPersistedTier || ((i == num_retries - 1) && !read_options.snapshot);

Also consider updating the comment above that if read_tier is kPersistentTier we won't bother do the two first try without lock and just acquire the mutex and get the SV.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comment. read_options.read_tier == kPersistedTier is usually false so I ordered it last.

read_options.read_tier == kPersistedTier;
bool retry = false;

if (i > 0) {
sv_cleanup_func();
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
*snapshot = GetLastPublishedSequence();
} else {
*snapshot =
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
->number_;
}
if (acquire_mutex) {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
if (!acquire_mutex) {
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
Expand All @@ -2799,7 +2802,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot");
if (read_options.snapshot != nullptr || last_try) {
if (read_options.snapshot != nullptr || acquire_mutex) {
// If user passed a snapshot, then we don't care if a memtable is
// sealed or compaction happens because the snapshot would ensure
// that older key versions are kept around. If this is the last
Expand All @@ -2810,7 +2813,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// memtables, which will include immutable memtables as well, but that
// might be tricky to maintain in case we decide, in future, to do
// memtable compaction.
if (!last_try) {
if (!acquire_mutex) {
SequenceNumber seq =
node->super_version->mem->GetEarliestSequenceNumber();
if (seq > *snapshot) {
Expand All @@ -2820,19 +2823,20 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
}
if (!retry) {
if (last_try) {
if (acquire_mutex) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
assert(!acquire_mutex);
}
}

TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum1");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum2");
PERF_TIMER_STOP(get_snapshot_time);
*sv_from_thread_local = !last_try;
*sv_from_thread_local = !acquire_mutex;
if (!s.ok()) {
sv_cleanup_func();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* MultiGet with snapshot and ReadOptions::read_tier = kPersistedTier will now read a consistent view across CFs (instead of potentially reading some CF before and some CF after a flush).
Loading