Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corrupted wal number when predecessor wal corrupts + minor cleanup #13359

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,21 @@ class Directories {
std::unique_ptr<FSDirectory> wal_dir_;
};

struct DBOpenLogReporter : public log::Reader::Reporter {
struct DBOpenLogRecordReadReporter : public log::Reader::Reporter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to better indicate what level of corruption it reports. The level may change in the future once we figure out how to consolidate all the error status in ProcessLogFile() (see existing TODOs)

Copy link
Contributor

@jowlyzhang jowlyzhang Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alternatively, we can have a Reporter initialized with the wal_number_ it is opened for. If a Reporter::Corruption(, corrupted_wal_number=kMaxSequence) call is not providing a specific wal number, it will track its own wal_number_ as the corrupted_wal_number_. I think that can help eliminate the need of a dedicated PredecessorLogCorruption API. But it's up to you, not a big deal.

This may help us with removing the if else in HandleNonOkStatusOrOldLogRecord, but I agree with you that before we can consolidate the status handling, it's safer to keep it like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see let me think more and fix it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed excluding the constructor part, which I found is better done with the status consolidation.

Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
bool* old_log_record;
void Corruption(size_t bytes, const Status& s) override;
void Corruption(size_t bytes, const Status& s,
uint64_t log_number = kMaxSequenceNumber) override;

void OldLogRecord(size_t bytes) override;

uint64_t GetCorruptedLogNumber() const { return corrupted_log_number_; }

private:
uint64_t corrupted_log_number_ = kMaxSequenceNumber;
};

// While DB is the public interface of RocksDB, and DBImpl is the actual
Expand Down Expand Up @@ -2067,21 +2073,25 @@ class DBImpl : public DB {

void SetupLogFileProcessing(uint64_t wal_number);

Status InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
Status InitializeLogReader(uint64_t wal_number, bool is_retry,
std::string& fname,

bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record, Status* const reporter_status,
DBOpenLogReporter* reporter, std::unique_ptr<log::Reader>& reader);
bool stop_replay_for_corruption,
uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info,
bool* const old_log_record,
Status* const reporter_status,
DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader);
Status ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
const std::function<void()>& logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);

Status InitializeWriteBatchForLogRecord(
Expand All @@ -2106,9 +2116,9 @@ class DBImpl : public DB {

Status HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status log_read_status, bool* old_log_record,
bool* stop_replay_for_corruption, uint64_t* corrupted_wal_number,
bool* corrupted_wal_found);
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found);

Status UpdatePredecessorWALInfo(uint64_t wal_number,
const SequenceNumber last_seqno_observed,
Expand Down
33 changes: 21 additions & 12 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1106,16 +1106,18 @@ bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
return true;
}

void DBOpenLogReporter::Corruption(size_t bytes, const Status& s) {
void DBOpenLogRecordReadReporter::Corruption(size_t bytes, const Status& s,
uint64_t log_number) {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (status != nullptr && status->ok()) {
*status = s;
corrupted_log_number_ = log_number;
}
}

void DBOpenLogReporter::OldLogRecord(size_t bytes) {
void DBOpenLogRecordReadReporter::OldLogRecord(size_t bytes) {
if (old_log_record != nullptr) {
*old_log_record = true;
}
Expand Down Expand Up @@ -1229,7 +1231,7 @@ Status DBImpl::ProcessLogFile(
Status status;
bool old_log_record = false;

DBOpenLogReporter reporter;
DBOpenLogRecordReadReporter reporter;
std::unique_ptr<log::Reader> reader;

std::string fname =
Expand Down Expand Up @@ -1323,7 +1325,7 @@ Status DBImpl::ProcessLogFile(
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
"Recovered to log #%" PRIu64 " next seq #%" PRIu64, wal_number,
*next_sequence);

if (status.ok()) {
Expand All @@ -1333,7 +1335,7 @@ Status DBImpl::ProcessLogFile(

if (!status.ok() || old_log_record) {
status = HandleNonOkStatusOrOldLogRecord(
wal_number, next_sequence, status, &old_log_record,
wal_number, next_sequence, status, reporter, &old_log_record,
stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
}

Expand All @@ -1357,7 +1359,7 @@ Status DBImpl::InitializeLogReader(
uint64_t wal_number, bool is_retry, std::string& fname,
bool stop_replay_for_corruption, uint64_t min_wal_number,
const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
Status* const reporter_status, DBOpenLogReporter* reporter,
Status* const reporter_status, DBOpenLogRecordReadReporter* reporter,
std::unique_ptr<log::Reader>& reader) {
assert(old_log_record);
assert(reporter_status);
Expand Down Expand Up @@ -1408,10 +1410,11 @@ Status DBImpl::ProcessLogRecord(
Slice record, const std::unique_ptr<log::Reader>& reader,
const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
const std::string& fname, bool read_only, int job_id,
std::function<void()> logFileDropped, DBOpenLogReporter* reporter,
uint64_t* record_checksum, SequenceNumber* last_seqno_observed,
SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
Status* status, bool* stop_replay_by_wal_filter,
const std::function<void()>& logFileDropped,
DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
bool* stop_replay_for_corruption, Status* status,
bool* stop_replay_by_wal_filter,
std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
assert(reporter);
assert(last_seqno_observed);
Expand Down Expand Up @@ -1607,7 +1610,8 @@ Status DBImpl::MaybeWriteLevel0TableForRecovery(

Status DBImpl::HandleNonOkStatusOrOldLogRecord(
uint64_t wal_number, SequenceNumber const* const next_sequence,
Status status, bool* old_log_record, bool* stop_replay_for_corruption,
Status status, const DBOpenLogRecordReadReporter& reporter,
bool* old_log_record, bool* stop_replay_for_corruption,
uint64_t* corrupted_wal_number, bool* corrupted_wal_found) {
assert(!status.ok() || *old_log_record);

Expand Down Expand Up @@ -1641,7 +1645,12 @@ Status DBImpl::HandleNonOkStatusOrOldLogRecord(
// We should ignore the error but not continue replaying
*old_log_record = false;
*stop_replay_for_corruption = true;
*corrupted_wal_number = wal_number;
// TODO(hx235): have a single source of corrupted WAL number once we
// consolidate the statuses
uint64_t reporter_corrupted_wal_number = reporter.GetCorruptedLogNumber();
*corrupted_wal_number = reporter_corrupted_wal_number != kMaxSequenceNumber
? reporter_corrupted_wal_number
: wal_number;
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class LogReaderContainer {
Logger* info_log;
std::string fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname.c_str(), static_cast<int>(bytes),
Expand Down
3 changes: 2 additions & 1 deletion db/experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ Status GetFileChecksumsFromCurrentManifest(FileSystem* fs,

struct LogReporter : public log::Reader::Reporter {
Status* status_ptr;
void Corruption(size_t /*bytes*/, const Status& st) override {
void Corruption(size_t /*bytes*/, const Status& st,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
if (status_ptr->ok()) {
*status_ptr = st;
}
Expand Down
22 changes: 14 additions & 8 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,16 @@ void Reader::MaybeVerifyPredecessorWALInfo(
if (recorded_predecessor_log_number >= min_wal_number_to_keep_) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
} else {
if (observed_predecessor_wal_info_.GetLogNumber() !=
recorded_predecessor_log_number) {
std::string reason = "Missing WAL of log number " +
std::to_string(recorded_predecessor_log_number);
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() !=
recorded_predecessor_wal_info.GetLastSeqnoRecorded()) {
std::string reason =
Expand All @@ -392,7 +394,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
std::to_string(
observed_predecessor_wal_info_.GetLastSeqnoRecorded()) +
". (Last sequence number equal to 0 indicates no WAL records)";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
} else if (observed_predecessor_wal_info_.GetSizeBytes() !=
recorded_predecessor_wal_info.GetSizeBytes()) {
std::string reason =
Expand All @@ -402,7 +405,8 @@ void Reader::MaybeVerifyPredecessorWALInfo(
" bytes. Observed " +
std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) +
" bytes.";
ReportCorruption(fragment.size(), reason.c_str());
ReportCorruption(fragment.size(), reason.c_str(),
recorded_predecessor_log_number);
}
}
}
Expand Down Expand Up @@ -483,13 +487,15 @@ void Reader::UnmarkEOFInternal() {
}
}

void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
void Reader::ReportCorruption(size_t bytes, const char* reason,
uint64_t log_number) {
ReportDrop(bytes, Status::Corruption(reason), log_number);
}

void Reader::ReportDrop(size_t bytes, const Status& reason) {
void Reader::ReportDrop(size_t bytes, const Status& reason,
uint64_t log_number) {
if (reporter_ != nullptr) {
reporter_->Corruption(bytes, reason);
reporter_->Corruption(bytes, reason, log_number);
}
}

Expand Down
26 changes: 13 additions & 13 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class Reader {

// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
virtual void Corruption(size_t bytes, const Status& status,
uint64_t log_number = kMaxSequenceNumber) = 0;

virtual void OldLogRecord(size_t /*bytes*/) {}
};
Expand Down Expand Up @@ -220,8 +221,10 @@ class Reader {

// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
void ReportCorruption(size_t bytes, const char* reason,
uint64_t log_number = kMaxSequenceNumber);
void ReportDrop(size_t bytes, const Status& reason,
uint64_t log_number = kMaxSequenceNumber);
void ReportOldLogRecord(size_t bytes);

void InitCompression(const CompressionTypeRecord& compression_record);
Expand All @@ -236,17 +239,14 @@ class Reader {

class FragmentBufferedReader : public Reader {
public:
FragmentBufferedReader(
std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file, Reporter* reporter,
bool checksum, uint64_t log_num, bool verify_and_track_wals = false,
bool stop_replay_for_corruption = false,
uint64_t min_wal_number_to_keep = std::numeric_limits<uint64_t>::max(),
const PredecessorWALInfo& observed_predecessor_wal_info =
PredecessorWALInfo())
FragmentBufferedReader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num)
: Reader(info_log, std::move(_file), reporter, checksum, log_num,
verify_and_track_wals, stop_replay_for_corruption,
min_wal_number_to_keep, observed_predecessor_wal_info),
false /*verify_and_track_wals*/,
false /*stop_replay_for_corruption*/,
std::numeric_limits<uint64_t>::max() /*min_wal_number_to_keep*/,
PredecessorWALInfo() /*observed_predecessor_wal_info*/),
fragments_(),
in_fragmented_record_(false) {}
~FragmentBufferedReader() override {}
Expand Down
6 changes: 4 additions & 2 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class LogTest
std::string message_;

ReportCollector() : dropped_bytes_(0) {}
void Corruption(size_t bytes, const Status& status) override {
void Corruption(size_t bytes, const Status& status,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
Expand Down Expand Up @@ -825,7 +826,8 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
std::string message_;

ReportCollector() : dropped_bytes_(0) {}
void Corruption(size_t bytes, const Status& status) override {
void Corruption(size_t bytes, const Status& status,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
dropped_bytes_ += bytes;
message_.append(status.ToString());
}
Expand Down
6 changes: 4 additions & 2 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class Repairer {
Env* env;
std::shared_ptr<Logger> info_log;
uint64_t lognum;
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t log_number = kMaxSequenceNumber) override {
// We print error messages for corruption, but continue repairing.
ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
lognum, static_cast<int>(bytes), s.ToString().c_str());
log_number == kMaxSequenceNumber ? lognum : log_number,
static_cast<int>(bytes), s.ToString().c_str());
}
};

Expand Down
3 changes: 2 additions & 1 deletion db/transaction_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
s.ToString().c_str());
}
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,8 @@ class VersionSet {

struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t /*bytes*/, const Status& s) override {
void Corruption(size_t /*bytes*/, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
if (status->ok()) {
*status = s;
}
Expand Down
3 changes: 2 additions & 1 deletion db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ Status WalManager::ReadFirstLine(const std::string& fname,

Status* status;
bool ignore_error; // true if db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
void Corruption(size_t bytes, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
(this->ignore_error ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
Expand Down
3 changes: 2 additions & 1 deletion tools/ldb_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2768,7 +2768,8 @@ void ChangeCompactionStyleCommand::DoCommand() {
namespace {

struct StdErrReporter : public log::Reader::Reporter {
void Corruption(size_t /*bytes*/, const Status& s) override {
void Corruption(size_t /*bytes*/, const Status& s,
uint64_t /*log_number*/ = kMaxSequenceNumber) override {
std::cerr << "Corruption detected in log file " << s.ToString() << "\n";
}
};
Expand Down
Loading