Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify the strategy of tolerating tail corruption when Version == V2. #255

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ impl LogFileFormat {

/// Decodes a slice of bytes into a `LogFileFormat`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileFormat> {
#[cfg(feature = "failpoints")]
{
// Set parsed abnormal DataLayout for `payload`.
fail::fail_point!("log_file_header::abnormal_decoded_payload", |_| Err(
Error::InvalidArgument("abnormal_decoded_payload".to_string())
));
}
let buf_len = buf.len();
if buf_len < Self::header_len() {
return Err(Error::Corruption("log file header too short".to_owned()));
Expand Down
38 changes: 33 additions & 5 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,43 @@ impl<F: FileSystem> LogFileReader<F> {
// [1] If the length lessed than the standard `LogFileFormat::header_len()`.
let header_len = LogFileFormat::header_len();
if file_size < header_len {
return Err(Error::Corruption("Invalid header of LogFile!".to_owned()));
return Err(Error::InvalidArgument(format!(
"invalid header len of log file, expected len: {}, actual len: {}",
header_len, file_size,
)));
}
// [2] Parse the format of the file.
let mut container =
vec![0; LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2)];
let expected_container_len =
LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2);
let mut container = vec![0; expected_container_len];
let size = self.read_to(0, &mut container[..])?;
container.truncate(size);
self.format = LogFileFormat::decode(&mut container.as_slice())?;
Ok(self.format)
match LogFileFormat::decode(&mut container.as_slice()) {
Err(Error::InvalidArgument(err_msg)) => {
if file_size <= expected_container_len {
// Here, it means that we parsed an corrupted V2 header.
Err(Error::InvalidArgument(err_msg))
} else {
// Here, the `file_size` is not expected, representing the
// whole file is corrupted.
Err(Error::Corruption(err_msg))
}
}
Err(e) => {
if file_size == LogFileFormat::header_len() {
// Here, it means that we parsed an corrupted V1 header. We
// mark this special err with InvalidArgument, to represent
// this log is corrupted on its header.
Err(Error::InvalidArgument(e.to_string()))
} else {
Err(e)
}
}
Ok(format) => {
self.format = format;
Ok(format)
}
}
}

#[inline]
Expand Down
3 changes: 2 additions & 1 deletion src/file_pipe_log/pipe_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,10 @@ impl<F: FileSystem> DualPipesBuilder<F> {
let is_last_file = index == chunk_count - 1 && i == file_count - 1;
match build_file_reader(file_system.as_ref(), f.handle.clone(), None) {
Err(e) => {
let is_tail_fail = matches!(e, Error::InvalidArgument(_));
if recovery_mode == RecoveryMode::TolerateAnyCorruption
|| recovery_mode == RecoveryMode::TolerateTailCorruption
&& is_last_file {
&& is_last_file && is_tail_fail {
warn!(
"File header is corrupted but ignored: {:?}:{}, {}",
queue, f.seq, e
Expand Down
27 changes: 24 additions & 3 deletions tests/failpoints/test_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ fn test_tail_corruption() {
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
append(&engine, rid, 1, 5, Some(&data));
drop(engine);
assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok());
assert!(Engine::open_with_file_system(cfg, fs.clone()).is_err());
}
// Version::V1 in header owns abnormal DataLayout.
{
Expand Down Expand Up @@ -506,13 +506,34 @@ fn test_tail_corruption() {
drop(engine);
assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok());
}
// DataLayout in header is corrupted for Version::V2, followed with records
// DataLayout in header is corrupted(decoding) for Version::V2, followed
// with records
{
let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return");
let _f = FailGuard::new("log_file_header::abnormal_decoded_payload", "return");
let dir = tempfile::Builder::new()
.prefix("test_tail_corruption_7")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
format_version: Version::V2,
..Default::default()
};
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
drop(engine);
let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
append(&engine, rid, 1, 5, Some(&data));
drop(engine);
assert!(Engine::open_with_file_system(cfg, fs.clone()).is_err());
}
// DataLayout in header is corrupted(encoding) for Version::V2, followed
// with records
{
let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return");
let dir = tempfile::Builder::new()
.prefix("test_tail_corruption_8")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
Expand Down