Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of missing cases of retry on corruption #13007

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions db/db_io_failure_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,80 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
Random rnd(300);
bool retry = false;

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ReadFooterFromFileInternal:0", [&](void* arg) {
Slice* data = static_cast<Slice*>(arg);
if (!retry) {
std::memcpy(const_cast<char*>(data->data()),
rnd.RandomString(static_cast<int>(data->size())).c_str(),
data->size());
retry = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put("key1", "val1"));
Status s = Flush();
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
1);

std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_NOK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
Random rnd(300);
bool retry = false;

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"ReadTablePropertiesHelper:0", [&](void* arg) {
Slice* data = static_cast<Slice*>(arg);
if (!retry) {
std::memcpy(const_cast<char*>(data->data()),
rnd.RandomString(static_cast<int>(data->size())).c_str(),
data->size());
retry = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put("key1", "val1"));
Status s = Flush();
if (std::get<2>(GetParam())) {
ASSERT_OK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
1);

std::string val;
ReadOptions ro;
ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
} else {
ASSERT_NOK(s);
ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

// The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
// 3. Retry with verify_and_reconstruct_read IOOption
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
Expand Down
22 changes: 4 additions & 18 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,26 +680,12 @@ Status BlockBasedTable::Open(
if (s.ok()) {
s = ReadFooterFromFile(opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
// If the footer is corrupted and the FS supports checksum verification and
// correction, try reading the footer again
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
if (CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions retry_opts = opts;
retry_opts.verify_and_reconstruct_read = true;
s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
if (s.ok()) {
RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
}
kBlockBasedTableMagicNumber, ioptions.stats);
}
if (!s.ok()) {
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
}
return s;
}
if (!IsSupportedFormatVersion(footer.format_version())) {
Expand Down
36 changes: 32 additions & 4 deletions table/format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,12 @@ std::string Footer::ToString() const {
return result;
}

Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
static Status ReadFooterFromFileInternal(const IOOptions& opts,
RandomAccessFileReader* file,
FileSystem& fs,
FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) {
if (file_size < Footer::kMinEncodedLength) {
return Status::Corruption("file is too short (" +
std::to_string(file_size) +
Expand Down Expand Up @@ -516,6 +518,8 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
}
}

TEST_SYNC_POINT_CALLBACK("ReadFooterFromFileInternal:0", &footer_input);

// Check that we actually read the whole footer from the file. It may be
// that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) {
Expand Down Expand Up @@ -543,6 +547,30 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
return Status::OK();
}

Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number,
Statistics* stats) {
Status s =
ReadFooterFromFileInternal(opts, file, fs, prefetch_buffer, file_size,
footer, enforce_table_magic_number);
if (s.IsCorruption() &&
CheckFSFeatureSupport(&fs, FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions new_opts = opts;
new_opts.verify_and_reconstruct_read = true;
footer->Reset();
s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer,
file_size, footer,
enforce_table_magic_number);
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT);
if (s.ok()) {
RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
}
anand1976 marked this conversation as resolved.
Show resolved Hide resolved
return s;
}

namespace {
// Custom handling for the last byte of a block, to avoid invoking streaming
// API to get an effective block checksum. This function is its own inverse
Expand Down
13 changes: 12 additions & 1 deletion table/format.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ class Footer {
// Create empty. Populate using DecodeFrom.
Footer() {}

void Reset() {
table_magic_number_ = kNullTableMagicNumber;
format_version_ = kInvalidFormatVersion;
base_context_checksum_ = 0;
metaindex_handle_ = BlockHandle::NullBlockHandle();
index_handle_ = BlockHandle::NullBlockHandle();
checksum_type_ = kInvalidChecksumType;
block_trailer_size_ = 0;
}

// Deserialize a footer (populate fields) from `input` and check for various
// corruptions. `input_offset` is the offset within the target file of
// `input` buffer, which is needed for verifying format_version >= 6 footer.
Expand Down Expand Up @@ -304,7 +314,8 @@ class FooterBuilder {
Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number = 0);
uint64_t enforce_table_magic_number = 0,
Statistics* stats = nullptr);

// Computes a checksum using the given ChecksumType. Sometimes we need to
// include one more input byte logically at the end but not part of the main
Expand Down
Loading
Loading