Skip to content

Commit

Permalink
Addressed the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akankshamahajan15 committed Apr 11, 2022
1 parent 4b51ec7 commit f7e3c1c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 62 deletions.
36 changes: 18 additions & 18 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,11 +1240,11 @@ class DBImpl : public DB {

std::atomic<bool> shutting_down_;

// VersionEditsContext struct stores the context about version edits along
// RecoveryContext struct stores the context about version edits along
// with corresponding column_family_data and column_family_options.
class VersionEditsContext {
class RecoveryContext {
public:
~VersionEditsContext() {
~RecoveryContext() {
for (auto& edit_list : edit_lists_) {
for (auto* edit : edit_list) {
delete edit;
Expand All @@ -1254,6 +1254,7 @@ class DBImpl : public DB {
cfds_.clear();
mutable_cf_opts_.clear();
edit_lists_.clear();
files_to_delete_.clear();
}

void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
Expand All @@ -1272,6 +1273,8 @@ class DBImpl : public DB {
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::set<std::string> files_to_delete_;
};

// Except in DB::Open(), WriteOptionsFile can only be called when:
Expand Down Expand Up @@ -1390,20 +1393,19 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
// recovery_version_edits stores the context about version edits and all those
// recovery_ctx stores the context about version edits and all those
// edits are persisted to new Manifest after succesfully syncing the new WAL.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
uint64_t* recovered_seq = nullptr,
VersionEditsContext* recovery_version_edits = nullptr,
std::set<std::string>* files_to_delete = nullptr);
RecoveryContext* recovery_ctx = nullptr);

virtual bool OwnTablesAndLogs() const { return true; }

// Set DB identity file, and write DB ID to manifest if necessary.
Status SetDBId(bool read_only, VersionEditsContext* recovery_edit);
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);

// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
Expand All @@ -1412,15 +1414,15 @@ class DBImpl : public DB {
// not referenced in the MANIFEST (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// We delete these SST files. In the
// It stores the SST files to be deleted in RecoveryContext. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
// recovery_version_edits stores the context about version edits and all those
// edits are persisted to new Manifest after succesfully syncing the new WAL.
Status DeleteUnreferencedSstFiles(VersionEditsContext* recovery_edit,
std::set<std::string>* files_to_delete);
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after succesfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
Expand All @@ -1430,12 +1432,10 @@ class DBImpl : public DB {
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;

// recovery_version_edits stores the context about version edits and
// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
// successfully syncing new WAL.
Status LogAndApplyForRecovery(
const std::set<std::string>& files_to_delete,
const VersionEditsContext& recovery_version_edits);
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);

private:
friend class DB;
Expand Down Expand Up @@ -1694,7 +1694,7 @@ class DBImpl : public DB {
Status RecoverLogFiles(std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found,
VersionEditsContext* recovery_version_edits);
RecoveryContext* recovery_ctx);

// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
Expand Down
17 changes: 8 additions & 9 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}

Status DBImpl::SetDBId(bool read_only, VersionEditsContext* version_edits_ctx) {
Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
// the very first time.
Expand Down Expand Up @@ -891,12 +891,12 @@ Status DBImpl::SetDBId(bool read_only, VersionEditsContext* version_edits_ctx) {
s = GetDbIdentityFromIdentityFile(&db_id_);
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
assert(!read_only);
assert(version_edits_ctx != nullptr);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
VersionEdit edit;
edit.SetDBId(db_id_);
versions_->db_id_ = db_id_;
version_edits_ctx->UpdateVersionEdits(
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
} else if (!read_only) {
Expand All @@ -905,9 +905,7 @@ Status DBImpl::SetDBId(bool read_only, VersionEditsContext* version_edits_ctx) {
return s;
}

Status DBImpl::DeleteUnreferencedSstFiles(
VersionEditsContext* version_edits_ctx,
std::set<std::string>* files_to_delete) {
Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
Expand Down Expand Up @@ -944,8 +942,9 @@ Status DBImpl::DeleteUnreferencedSstFiles(
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
files_to_delete->find(normalized_fpath) == files_to_delete->end()) {
files_to_delete->insert(normalized_fpath);
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.insert(normalized_fpath);
}
}
}
Expand All @@ -962,7 +961,7 @@ Status DBImpl::DeleteUnreferencedSstFiles(
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(default_cfd);
version_edits_ctx->UpdateVersionEdits(default_cfd, edit);
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
return s;
}

Expand Down
57 changes: 26 additions & 31 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
uint64_t* recovered_seq, VersionEditsContext* version_edits_ctx,
std::set<std::string>* files_to_delete) {
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();

bool is_new_db = false;
Expand Down Expand Up @@ -520,9 +519,9 @@ Status DBImpl::Recover(
return s;
}

s = SetDBId(read_only, version_edits_ctx);
s = SetDBId(read_only, recovery_ctx);
if (s.ok() && !read_only) {
s = DeleteUnreferencedSstFiles(version_edits_ctx, files_to_delete);
s = DeleteUnreferencedSstFiles(recovery_ctx);
}

if (immutable_db_options_.paranoid_checks && s.ok()) {
Expand Down Expand Up @@ -606,9 +605,9 @@ Status DBImpl::Recover(
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
assert(version_edits_ctx != nullptr);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
version_edits_ctx->UpdateVersionEdits(
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
if (!s.ok()) {
Expand Down Expand Up @@ -646,7 +645,7 @@ Status DBImpl::Recover(

bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
version_edits_ctx);
recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
Expand Down Expand Up @@ -806,16 +805,15 @@ Status DBImpl::InitPersistStatsColumnFamily() {
return s;
}

Status DBImpl::LogAndApplyForRecovery(
const std::set<std::string>& files_to_delete,
const VersionEditsContext& version_edits_ctx) {
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
mutex_.AssertHeld();
assert(versions_->descriptor_log_ == nullptr);
Status s = versions_->LogAndApply(
version_edits_ctx.cfds_, version_edits_ctx.mutable_cf_opts_,
version_edits_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
if (s.ok() && !files_to_delete.empty()) {
recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_,
recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
mutex_.Unlock();
for (const auto& fname : files_to_delete) {
for (const auto& fname : recovery_ctx.files_to_delete_) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
Expand All @@ -830,7 +828,7 @@ Status DBImpl::LogAndApplyForRecovery(
Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found,
VersionEditsContext* version_edits_ctx) {
RecoveryContext* recovery_ctx) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
Expand Down Expand Up @@ -1225,7 +1223,6 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,

// True if there's any data in the WALs; if not, we can skip re-processing
// them later
bool truncate_last_wal = true;
bool data_seen = false;
if (!read_only) {
// no need to refcount since client still doesn't have access
Expand Down Expand Up @@ -1291,19 +1288,14 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
if (corrupted_wal_found != nullptr && *corrupted_wal_found == true &&
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
// Truncate the last WAL to reclaim the pre allocated space before
// moving it.
GetLogSizeAndMaybeTruncate(wal_numbers.back(), /*truncate=*/true,
nullptr);
MoveCorruptedWalFiles(wal_numbers, corrupted_wal_number);
truncate_last_wal = false;
}

assert(version_edits_ctx != nullptr);
assert(recovery_ctx != nullptr);
for (auto* cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
version_edits_ctx->UpdateVersionEdits(cfd, iter->second);
recovery_ctx->UpdateVersionEdits(cfd, iter->second);
}

if (flushed) {
Expand All @@ -1317,7 +1309,7 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
}
assert(versions_->GetColumnFamilySet() != nullptr);
version_edits_ctx->UpdateVersionEdits(
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
}
}
Expand All @@ -1330,7 +1322,7 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
// If there's no data in the WAL, or we flushed all the data, still
// truncate the log file. If the process goes into a crash loop before
// the file is deleted, the preallocated space will never get freed.
const bool truncate = !read_only && truncate_last_wal;
const bool truncate = !read_only;
GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
.PermitUncheckedError();
}
Expand All @@ -1353,14 +1345,18 @@ void DBImpl::MoveCorruptedWalFiles(std::vector<uint64_t>& wal_numbers,
// Increment iter to move WAL files from first corrupted_wal_number + 1.
iter++;

std::string archivalPath =
std::string archival_path =
ArchivalDirectory(immutable_db_options_.GetWalDir());
Status create_status = env_->CreateDirIfMissing(archivalPath);
Status create_status = env_->CreateDirIfMissing(archival_path);

// create_status is only checked when it needs to move the corrupted WAL files
// to archive folder.
create_status.PermitUncheckedError();

// Truncate the last WAL to reclaim the pre allocated space before
// moving it.
GetLogSizeAndMaybeTruncate(wal_numbers.back(), /*truncate=*/true, nullptr);

// Move all the WAL files from corrupted_wal_number + 1 to last WAL
// (max_wal_number) to avoid column family inconsistency error to archival
// directory. If its unable to create archive dir, it will delete the
Expand Down Expand Up @@ -1804,13 +1800,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,

impl->mutex_.Lock();

VersionEditsContext version_edits_ctx;
RecoveryContext recovery_ctx;

// Handles create_if_missing, error_if_exists
uint64_t recovered_seq(kMaxSequenceNumber);
std::set<std::string> files_to_delete;
s = impl->Recover(column_families, false, false, false, &recovered_seq,
&version_edits_ctx, &files_to_delete);
&recovery_ctx);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
Expand Down Expand Up @@ -1867,7 +1862,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
if (s.ok()) {
s = impl->LogAndApplyForRecovery(files_to_delete, version_edits_ctx);
s = impl->LogAndApplyForRecovery(recovery_ctx);
}

if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, uint64_t*,
VersionEditsContext* /*version_edits_ctx*/,
std::set<std::string>* /*files_to_delete*/) {
RecoveryContext* /*recovery_ctx*/) {
mutex_.AssertHeld();

JobContext job_context(0);
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ class DBImplSecondary : public DBImpl {
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, bool error_if_wal_file_exists,
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
VersionEditsContext* version_edits_ctx = nullptr,
std::set<std::string>* files_to_delete = nullptr) override;
RecoveryContext* recovery_ctx = nullptr) override;

// Implementations of the DB interface
using DB::Get;
Expand Down

0 comments on commit f7e3c1c

Please sign in to comment.