Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert open logic changes in #9634 #9906

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## Unreleased
### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false.
* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen.

### New Features
* DB::GetLiveFilesStorageInfo is ready for production use.
Expand Down
62 changes: 62 additions & 0 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,68 @@ TEST_F(CorruptionTest, Recovery) {
Check(36, 36);
}

TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
// Repro for bug where WALs following the point-in-time recovery were not
// retained leading to the next recovery failing.
CloseDb();

options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;

const std::string test_cf_name = "test_cf";
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());

uint64_t log_num;
{
options_.create_missing_column_families = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));

ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
log_num = file_nums.back();
for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}

CorruptFileWithTruncation(FileType::kWalFile, log_num,
/*bytes_to_truncate=*/1);

{
// Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
options_.avoid_flush_during_recovery = true;
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
// Flush one but not both CFs and write some data so there's a seqno gap
// between the PITR corruption and the next DB session's first WAL.
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));

for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}

// With the bug, this DB open would remove the WALs following the PITR
// corruption. Then, the next recovery would fail.
for (int i = 0; i < 2; ++i) {
std::vector<ColumnFamilyHandle*> cfhs;
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));

for (auto* cfh : cfhs) {
delete cfh;
}
CloseDb();
}
}

TEST_F(CorruptionTest, RecoverWriteError) {
env_->writable_file_error_ = true;
Status s = TryReopen();
Expand Down
69 changes: 7 additions & 62 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,43 +1240,6 @@ class DBImpl : public DB {

std::atomic<bool> shutting_down_;

// RecoveryContext struct stores the context about version edits along
// with corresponding column_family_data and column_family_options.
class RecoveryContext {
public:
~RecoveryContext() {
for (auto& edit_list : edit_lists_) {
for (auto* edit : edit_list) {
delete edit;
}
edit_list.clear();
}
cfds_.clear();
mutable_cf_opts_.clear();
edit_lists_.clear();
files_to_delete_.clear();
}

void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
if (map_.find(cfd->GetID()) == map_.end()) {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
edit_lists_[i].emplace_back(new VersionEdit(edit));
}

std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::set<std::string> files_to_delete_;
};

// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
Expand Down Expand Up @@ -1393,19 +1356,16 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
// recovery_ctx stores the context about version edits and all those
// edits are persisted to new Manifest after successfully syncing the new WAL.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
uint64_t* recovered_seq = nullptr,
RecoveryContext* recovery_ctx = nullptr);
uint64_t* recovered_seq = nullptr);

virtual bool OwnTablesAndLogs() const { return true; }

// Set DB identity file, and write DB ID to manifest if necessary.
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);
Status SetDBId(bool read_only);

// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
Expand All @@ -1414,15 +1374,12 @@ class DBImpl : public DB {
// not referenced in the MANIFEST (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// It stores the SST files to be deleted in RecoveryContext. In the
// We delete these SST files. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after successfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);
Status DeleteUnreferencedSstFiles();

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
Expand All @@ -1432,11 +1389,6 @@ class DBImpl : public DB {
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;

// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
// successfully syncing new WAL.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);

private:
friend class DB;
friend class ErrorHandler;
Expand Down Expand Up @@ -1691,10 +1643,9 @@ class DBImpl : public DB {

// REQUIRES: log_numbers are sorted in ascending order
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(std::vector<uint64_t>& log_numbers,
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found,
RecoveryContext* recovery_ctx);
bool* corrupted_log_found);

// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
Expand All @@ -1704,12 +1655,6 @@ class DBImpl : public DB {
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit);

// Move all the WAL files starting from corrupted WAL found to
// max_wal_number to avoid column family inconsistency error on recovery. It
// also removes the deleted file from the vector wal_numbers.
void MoveCorruptedWalFiles(std::vector<uint64_t>& wal_numbers,
uint64_t corrupted_wal_number);

// Get the size of a log file and, if truncate is true, truncate the
// log file to its actual size, thereby freeing preallocated space.
// Return success even if truncate fails
Expand Down
36 changes: 25 additions & 11 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}

Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
Status DBImpl::SetDBId(bool read_only) {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
// the very first time.
Expand All @@ -890,22 +890,22 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
}
s = GetDbIdentityFromIdentityFile(&db_id_);
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
assert(!read_only);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
VersionEdit edit;
edit.SetDBId(db_id_);
Options options;
MutableCFOptions mutable_cf_options(options);
versions_->db_id_ = db_id_;
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &edit, &mutex_, nullptr,
/* new_descriptor_log */ false);
}
} else if (!read_only) {
s = SetIdentityFile(env_, dbname_, db_id_);
}
return s;
}

Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
Status DBImpl::DeleteUnreferencedSstFiles() {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
Expand All @@ -925,6 +925,7 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {

uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
std::set<std::string> files_to_delete;
Status s;
for (const auto& path : paths) {
std::vector<std::string> files;
Expand All @@ -942,9 +943,8 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.insert(normalized_fpath);
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
files_to_delete.insert(normalized_fpath);
}
}
}
Expand All @@ -961,7 +961,21 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(default_cfd);
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
s = versions_->LogAndApply(
default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
directories_.GetDbDir(), /*new_descriptor_log*/ false);
if (!s.ok()) {
return s;
}

mutex_.Unlock();
for (const auto& fname : files_to_delete) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
return s;
}

Expand Down
Loading