From 607b47ab0db9507cd5b864fc37918286c34abbb7 Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Tue, 2 Aug 2022 21:17:36 +0800 Subject: [PATCH 1/3] Unify the strategy of tolerating tail corruption when Version == V2. Signed-off-by: Lucasliang --- src/file_pipe_log/log_file.rs | 38 +++++++++++++++++++++++++++---- src/file_pipe_log/pipe_builder.rs | 3 ++- tests/failpoints/test_engine.rs | 2 +- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index e7327c85..d656f1a8 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -243,15 +243,43 @@ impl LogFileReader { // [1] If the length lessed than the standard `LogFileFormat::header_len()`. let header_len = LogFileFormat::header_len(); if file_size < header_len { - return Err(Error::Corruption("Invalid header of LogFile!".to_owned())); + return Err(Error::InvalidArgument(format!( + "invalid header len of log file, expected len: {}, actual len: {}", + header_len, file_size, + ))); } // [2] Parse the format of the file. - let mut container = - vec![0; LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2)]; + let expected_container_len = + LogFileFormat::header_len() + LogFileFormat::payload_len(Version::V2); + let mut container = vec![0; expected_container_len]; let size = self.read_to(0, &mut container[..])?; container.truncate(size); - self.format = LogFileFormat::decode(&mut container.as_slice())?; - Ok(self.format) + match LogFileFormat::decode(&mut container.as_slice()) { + Err(Error::InvalidArgument(err_msg)) => { + if file_size <= expected_container_len { + // Here, it means that we parsed an corrupted V2 header. + Err(Error::InvalidArgument(err_msg)) + } else { + // Here, the `file_size` is not expected, representing the + // whole file is corrupted. + Err(Error::Corruption(err_msg)) + } + } + Err(e) => { + if file_size == LogFileFormat::header_len() { + // Here, it means that we parsed an corrupted V1 header. We + // mark this special err with InvalidArgument, to represent + // this log is corrupted on its header. + Err(Error::InvalidArgument(e.to_string())) + } else { + Err(e) + } + } + Ok(format) => { + self.format = format; + Ok(format) + } + } } #[inline] diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index e6b61929..c7a209aa 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -303,9 +303,10 @@ impl DualPipesBuilder { let is_last_file = index == chunk_count - 1 && i == file_count - 1; match build_file_reader(file_system.as_ref(), f.handle.clone(), None) { Err(e) => { + let is_tail_fail = matches!(e, Error::InvalidArgument(_)); if recovery_mode == RecoveryMode::TolerateAnyCorruption || recovery_mode == RecoveryMode::TolerateTailCorruption - && is_last_file { + && is_last_file && is_tail_fail { warn!( "File header is corrupted but ignored: {:?}:{}, {}", queue, f.seq, e diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index bb498fc3..37e1dcd4 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -450,7 +450,7 @@ fn test_tail_corruption() { let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); append(&engine, rid, 1, 5, Some(&data)); drop(engine); - assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_err()); } // Version::V1 in header owns abnormal DataLayout. { From dbfd99ffe5149541bb629f7d9c9efd8026967aa0 Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Tue, 2 Aug 2022 21:51:09 +0800 Subject: [PATCH 2/3] Supply extra uts for abnormal decoding on Version::V2 header Signed-off-by: Lucasliang --- src/file_pipe_log/format.rs | 7 +++++++ tests/failpoints/test_engine.rs | 25 +++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/file_pipe_log/format.rs b/src/file_pipe_log/format.rs index 902943f8..748112ac 100644 --- a/src/file_pipe_log/format.rs +++ b/src/file_pipe_log/format.rs @@ -182,6 +182,13 @@ impl LogFileFormat { data_layout: DataLayout::NoAlignment, }); } + #[cfg(feature = "failpoints")] + { + // Set parsed abnormal DataLayout for `payload`. + fail::fail_point!("log_file_header::abnormal_decoded_payload", |_| Err( + Error::InvalidArgument("abnormal_decoded_payload".to_string()) + )); + } if_chain::if_chain! { if payload_len > 0; if buf_len >= Self::header_len() + payload_len; diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 37e1dcd4..58be136b 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -506,13 +506,34 @@ fn test_tail_corruption() { drop(engine); assert!(Engine::open_with_file_system(cfg, fs.clone()).is_ok()); } - // DataLayout in header is corrupted for Version::V2, followed with records + // DataLayout in header is corrupted(decoding) for Version::V2, followed + // with records { - let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let _f = FailGuard::new("log_file_header::abnormal_decoded_payload", "return"); let dir = tempfile::Builder::new() .prefix("test_tail_corruption_7") .tempdir() .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + format_version: Version::V2, + ..Default::default() + }; + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + drop(engine); + let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + append(&engine, rid, 1, 5, Some(&data)); + drop(engine); + assert!(Engine::open_with_file_system(cfg, fs.clone()).is_err()); + } + // DataLayout in header is corrupted(encoding) for Version::V2, followed + // with records + { + let _f = FailGuard::new("log_file_header::corrupted_data_layout", "return"); + let dir = tempfile::Builder::new() + .prefix("test_tail_corruption_8") + .tempdir() + .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), target_file_size: ReadableSize(1), From 6717f8fdce58346fb291371917f2c88f2d4df3d4 Mon Sep 17 00:00:00 2001 From: Lucasliang Date: Tue, 2 Aug 2022 21:55:51 +0800 Subject: [PATCH 3/3] Refine code-style in format.rs Signed-off-by: Lucasliang --- src/file_pipe_log/format.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/file_pipe_log/format.rs b/src/file_pipe_log/format.rs index 748112ac..ca74f992 100644 --- a/src/file_pipe_log/format.rs +++ b/src/file_pipe_log/format.rs @@ -151,6 +151,13 @@ impl LogFileFormat { /// Decodes a slice of bytes into a `LogFileFormat`. pub fn decode(buf: &mut &[u8]) -> Result { + #[cfg(feature = "failpoints")] + { + // Set parsed abnormal DataLayout for `payload`. + fail::fail_point!("log_file_header::abnormal_decoded_payload", |_| Err( + Error::InvalidArgument("abnormal_decoded_payload".to_string()) + )); + } let buf_len = buf.len(); if buf_len < Self::header_len() { return Err(Error::Corruption("log file header too short".to_owned())); @@ -182,13 +189,6 @@ impl LogFileFormat { data_layout: DataLayout::NoAlignment, }); } - #[cfg(feature = "failpoints")] - { - // Set parsed abnormal DataLayout for `payload`. - fail::fail_point!("log_file_header::abnormal_decoded_payload", |_| Err( - Error::InvalidArgument("abnormal_decoded_payload".to_string()) - )); - } if_chain::if_chain! { if payload_len > 0; if buf_len >= Self::header_len() + payload_len;