Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bad super version #3

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 64 additions & 50 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,60 +466,74 @@ Status MemTableList::TryInstallMemtableFlushResults(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
}

#ifndef NDEBUG
bool apply_callback_called = false;
#endif

auto apply_callback = [&](const Status& apply_result) {
#ifndef NDEBUG
assert(!apply_callback_called);
apply_callback_called = true;
#endif
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();

// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.

// commit new state only if the column family is NOT dropped.
// The reason is as follows (refer to
// ColumnFamilyTest.FlushAndDropRaceCondition).
// If the column family is dropped, then according to LogAndApply, its
// corresponding flush operation is NOT written to the MANIFEST. This
// means the DB is not aware of the L0 files generated from the flush.
// By committing the new state, we remove the memtable from the memtable
// list. Creating an iterator on this column family will not be able to
// read full data since the memtable is removed, and the DB is not aware
// of the L0 files, causing MergingIterator unable to build child
// iterators. RocksDB contract requires that the iterator can be created
// on a dropped column family, and we must be able to
// read full data as long as column family handle is not deleted, even
// if the column family is dropped.
if (apply_result.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
};

edit_list.back()->SetApplyCallback(apply_callback);

// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);

// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();

// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.

// commit new state only if the column family is NOT dropped.
// The reason is as follows (refer to
// ColumnFamilyTest.FlushAndDropRaceCondition).
// If the column family is dropped, then according to LogAndApply, its
// corresponding flush operation is NOT written to the MANIFEST. This
// means the DB is not aware of the L0 files generated from the flush.
// By committing the new state, we remove the memtable from the memtable
// list. Creating an iterator on this column family will not be able to
// read full data since the memtable is removed, and the DB is not aware
// of the L0 files, causing MergingIterator unable to build child
// iterators. RocksDB contract requires that the iterator can be created
// on a dropped column family, and we must be able to
// read full data as long as column family handle is not deleted, even if
// the column family is dropped.
if (s.ok() && !cfd->IsDropped()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
UpdateMemoryUsageExcludingLast();
ResetTrimHistoryNeeded();
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
assert(apply_callback_called || cfd->IsDropped());
}
}
commit_in_progress_ = false;
Expand Down
103 changes: 103 additions & 0 deletions db/merge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "utilities/merge_operators.h"

namespace rocksdb {
Expand Down Expand Up @@ -296,6 +297,95 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
}
}

void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
ASSERT_OK(db->Put({}, "1", "1"));
ASSERT_OK(db->Flush(FlushOptions()));

std::atomic<int> cnt{0};
const auto get_thread_id = [&cnt]() {
thread_local int thread_id{cnt++};
return thread_id;
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
int thread_id = get_thread_id();
if (1 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_compact_thread:0");
} else if (2 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:0");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
int thread_id = get_thread_id();
if (0 == thread_id) {
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:0");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::set_options_thread:1");
}
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
mutex->AssertHeld();
int thread_id = get_thread_id();
ASSERT_EQ(2, thread_id);
mutex->Unlock();
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:1");
TEST_SYNC_POINT(
"testCountersWithFlushAndCompaction::bg_flush_thread:2");
mutex->Lock();
});
SyncPoint::GetInstance()->LoadDependency({
{"testCountersWithFlushAndCompaction::set_options_thread:0",
"testCountersWithCompactionAndFlush:BeforeCompact"},
{"testCountersWithFlushAndCompaction::bg_compact_thread:0",
"testCountersWithFlushAndCompaction:BeforeIncCounters"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:0",
"testCountersWithFlushAndCompaction::set_options_thread:1"},
{"testCountersWithFlushAndCompaction::bg_flush_thread:1",
"testCountersWithFlushAndCompaction:BeforeVerification"},
{"testCountersWithFlushAndCompaction:AfterGet",
"testCountersWithFlushAndCompaction::bg_flush_thread:2"},
});
SyncPoint::GetInstance()->EnableProcessing();

port::Thread set_options_thread([&]() {
ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
{{"disable_auto_compactions", "false"}}));
});
TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
port::Thread compact_thread([&]() {
ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
});

TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
counters.add("test-key", 1);

FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db->Flush(flush_opts));

TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
std::string expected;
PutFixed64(&expected, 1);
std::string actual;
Status s = db->Get(ReadOptions(), "test-key", &actual);
TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
set_options_thread.join();
compact_thread.join();
ASSERT_OK(s);
ASSERT_EQ(expected, actual);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
size_t num_merges) {

Expand Down Expand Up @@ -488,6 +578,19 @@ TEST_F(MergeTest, MergeDbTtlTest) {
runTest(test::PerThreadDBPath("merge_testdbttl"),
true); // Run test on TTL database
}

TEST_F(MergeTest, MergeWithCompactionAndFlush) {
const std::string dbname =
test::PerThreadDBPath("merge_with_compaction_and_flush");
{
auto db = OpenDb(dbname);
{
MergeBasedCounters counters(db, 0);
testCountersWithFlushAndCompaction(counters, db.get());
}
}
DestroyDB(dbname, Options());
}
#endif // !ROCKSDB_LITE

} // namespace rocksdb
Expand Down
8 changes: 8 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ class VersionEdit {
deleted_files_.insert({level, file});
}

void SetApplyCallback(std::function<void(const Status&)> callback) {
apply_callback_ = std::move(callback);
}

// Number of edits
size_t NumEntries() { return new_files_.size() + deleted_files_.size(); }

Expand Down Expand Up @@ -337,6 +341,9 @@ class VersionEdit {
const std::vector<std::pair<int, FileMetaData>>& GetNewFiles() {
return new_files_;
}
void DoApplyCallback(const Status& s) {
if (apply_callback_) { apply_callback_(s); }
}

void MarkAtomicGroup(uint32_t remaining_entries) {
is_in_atomic_group_ = true;
Expand Down Expand Up @@ -377,6 +384,7 @@ class VersionEdit {

DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
std::function<void(const Status&)> apply_callback_;

// Each version edit record should have column_family_ set
// If it's not set, it is default (0)
Expand Down
8 changes: 7 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3768,7 +3768,7 @@ Status VersionSet::ProcessManifestWrites(
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
mu->Unlock();

TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() &&
Expand Down Expand Up @@ -3956,6 +3956,9 @@ Status VersionSet::ProcessManifestWrites(
DescriptorFileName(dbname_, pending_manifest_file_number_));
}
}
for (auto e : batch_edits) {
e->DoApplyCallback(s);
}

pending_manifest_file_number_ = 0;

Expand Down Expand Up @@ -4028,6 +4031,8 @@ Status VersionSet::LogAndApply(
}
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
nullptr);
while (!first_writer.done && &first_writer != manifest_writers_.front()) {
first_writer.cv.Wait();
}
Expand All @@ -4039,6 +4044,7 @@ Status VersionSet::LogAndApply(
for (const auto& writer : writers) {
assert(writer.done);
}
TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
#endif /* !NDEBUG */
return first_writer.status;
}
Expand Down