Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading block based log format #249

Merged
merged 25 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
44abe11
[New feature]Build basics for DIO.
LykxSassinator Jul 20, 2022
c27d043
Supply extra supports for building the basic structure for different …
LykxSassinator Jul 21, 2022
53b363a
Supply the reading strategy when `cfg.format_data_layout == Alignment`.
LykxSassinator Jul 21, 2022
4ccf139
Supply `alignment-mode` flag in `stress` tool for supporting setting …
LykxSassinator Jul 21, 2022
5e15c90
Code-style cleanning.
LykxSassinator Jul 21, 2022
884826c
Refactor the DataLayout for compatibilities to the future fragmented …
LykxSassinator Jul 22, 2022
a1be483
Supply abnormal testcases.
LykxSassinator Jul 22, 2022
d095195
Bugfixes for uts in format.rs.
LykxSassinator Jul 22, 2022
94232a8
Bugfix for the checking in `LogFileReader::read()`.
LykxSassinator Jul 22, 2022
8e075ba
Refactor the DataLayout and remove over-designed parts of codes.
LykxSassinator Jul 22, 2022
2337263
Add extra annotations and refinement to `next()` in `reader.rs`.
LykxSassinator Jul 22, 2022
3855fee
Bugfix on strategy for aligned writting and reading.
LykxSassinator Jul 25, 2022
07405da
Fix code-style errs.
LykxSassinator Jul 25, 2022
19b50cd
Bugfix for reading tail paddings when DataLayout == Alignment.
LykxSassinator Jul 25, 2022
bf75542
Merge branch 'tikv:master' into basics_for_dio
LykxSassinator Jul 26, 2022
0cf8875
Serialized the DataLayout in u64 and dump it into header of log files.
LykxSassinator Jul 26, 2022
a218c73
Supply etra and necessary fail_points to make production code path sa…
LykxSassinator Jul 27, 2022
8aa79a0
Fix the bug while parsing the header of log file and make code-style …
LykxSassinator Jul 28, 2022
248a3cc
Refine the code style.
LykxSassinator Jul 28, 2022
7c5893c
Bugfix when read corrupted header with Version::V1.
LykxSassinator Jul 28, 2022
e430d1f
Refine the info of Error::InvalidArguement in format.rs when parse an…
LykxSassinator Jul 29, 2022
35410d6
Supply extra corner cases for testing, especially the case that recov…
LykxSassinator Jul 29, 2022
3b251ec
Refine the code-style and design of uts.
LykxSassinator Aug 1, 2022
c867868
Bugfix for doctest.
LykxSassinator Aug 1, 2022
4287b8d
Refine the strategy for tolerating tail corruptions.
LykxSassinator Aug 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fail = "0.5"
fs2 = "0.4"
hashbrown = "0.12"
hex = "0.4"
if_chain = "1.0.2"
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
lazy_static = "1.3"
libc = "0.2"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Config {
pub recovery_mode: RecoveryMode,
/// Minimum I/O size for reading log files during recovery.
///
/// Default: "4KB". Minimum: "512B".
/// Default: "16KB". Minimum: "512B".
tabokie marked this conversation as resolved.
Show resolved Hide resolved
pub recovery_read_block_size: ReadableSize,
/// The number of threads used to scan and recovery log files.
///
Expand Down
6 changes: 5 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ where
script: String,
file_system: Arc<F>,
) -> Result<()> {
use crate::file_pipe_log::{RecoveryConfig, ReplayMachine};
use crate::file_pipe_log::{LogFileFormat, RecoveryConfig, ReplayMachine};
use crate::pipe_log::DataLayout;

if !path.exists() {
return Err(Error::InvalidArgument(format!(
Expand All @@ -458,6 +459,7 @@ where
..Default::default()
};
let recovery_mode = cfg.recovery_mode;
let file_format = LogFileFormat::new(cfg.format_version, DataLayout::default());
let read_block_size = cfg.recovery_read_block_size.0;
let mut builder = FilePipeLogBuilder::new(cfg, file_system.clone(), Vec::new());
builder.scan()?;
Expand All @@ -469,6 +471,7 @@ where
RecoveryConfig {
queue: LogQueue::Append,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand All @@ -481,6 +484,7 @@ where
RecoveryConfig {
queue: LogQueue::Rewrite,
mode: recovery_mode,
file_format,
concurrency: 1,
read_block_size,
},
Expand Down
238 changes: 224 additions & 14 deletions src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

//! Representations of objects in filesystem.

use std::cmp::Ordering;
use std::io::BufRead;
use std::path::{Path, PathBuf};

use num_traits::{FromPrimitive, ToPrimitive};

use crate::codec::{self, NumberEncoder};
use crate::pipe_log::{FileId, LogQueue, Version};
use crate::pipe_log::{DataLayout, FileId, LogQueue, Version};
use crate::{Error, Result};

/// Width to format log sequence number.
Expand All @@ -20,6 +21,43 @@ const LOG_REWRITE_SUFFIX: &str = ".rewrite";
/// File header.
const LOG_FILE_MAGIC_HEADER: &[u8] = b"RAFT-LOG-FILE-HEADER-9986AB3E47F320B394C8E84916EB0ED5";

pub(crate) fn is_valid_paddings(buf: &[u8]) -> Result<bool> {
let len = buf.len();
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
let thd_len = std::mem::size_of::<u64>(); // u64 size
match len.cmp(&thd_len) {
Ordering::Less => {
// len < 8
for ele in buf.iter() {
if *ele != 0 {
return Ok(false);
}
}
Ok(true)
}
Ordering::Equal => {
// len == 8
let paddings = buf[..].to_vec();
if codec::decode_u64(&mut &paddings[..])? == 0 {
Ok(true)
} else {
Ok(false)
}
}
Ordering::Greater => {
// len > 8
let head_paddings = buf[0..thd_len].to_vec();
let tail_paddings = buf[len - thd_len..].to_vec();
if codec::decode_u64(&mut &head_paddings[..])? == 0
&& codec::decode_u64(&mut &tail_paddings[..])? == 0
{
Ok(true)
} else {
Ok(false)
}
}
}
}

/// `FileNameExt` offers file name formatting extensions to [`FileId`].
pub trait FileNameExt: Sized {
fn parse_file_name(file_name: &str) -> Option<Self>;
Expand Down Expand Up @@ -79,28 +117,57 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
}

/// In-memory representation of `Format` in log files.
#[derive(Clone, Default)]
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct LogFileFormat {
version: Version,
data_layout: DataLayout,
}

impl LogFileFormat {
pub fn new(version: Version, data_layout: DataLayout) -> Self {
Self {
version,
data_layout,
}
}

/// Length of whole `LogFileFormat` written on storage.
pub fn enc_len(&self) -> usize {
Self::header_len() + Self::payload_len(self.version)
}

/// Length of header written on storage.
pub const fn len() -> usize {
pub const fn header_len() -> usize {
LOG_FILE_MAGIC_HEADER.len() + std::mem::size_of::<Version>()
}

/// Length of serialized `DataLayout` written on storage.
pub const fn payload_len(version: Version) -> usize {
match version {
Version::V1 => 0,
Version::V2 => DataLayout::len(),
}
}

pub fn from_version(version: Version) -> Self {
Self { version }
Self {
version,
data_layout: DataLayout::default(),
}
}

pub fn version(&self) -> Version {
self.version
}

pub fn data_layout(&self) -> DataLayout {
self.data_layout
}

/// Decodes a slice of bytes into a `LogFileFormat`.
pub fn decode(buf: &mut &[u8]) -> Result<LogFileFormat> {
if buf.len() < Self::len() {
let buf_len = buf.len();
if buf_len < Self::header_len() {
return Err(Error::Corruption("log file header too short".to_owned()));
}
if !buf.starts_with(LOG_FILE_MAGIC_HEADER) {
Expand All @@ -109,21 +176,60 @@ impl LogFileFormat {
));
}
buf.consume(LOG_FILE_MAGIC_HEADER.len());
let v = codec::decode_u64(buf)?;
if let Some(version) = Version::from_u64(v) {
Ok(Self { version })
// Parse `Version` of LogFileFormat from header of the file.
let version = {
let dec_version = codec::decode_u64(buf)?;
if let Some(v) = Version::from_u64(dec_version) {
v
} else {
return Err(Error::Corruption(format!(
"unrecognized log file version: {}",
dec_version
)));
}
};
// Parse `DataLayout` of LogFileFormat from header of the file.
let payload_len = Self::payload_len(version);
if payload_len == 0 {
// No alignment.
Ok(Self {
version,
data_layout: DataLayout::default(),
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
})
} else if payload_len > 0 && buf_len >= Self::header_len() + payload_len {
// If the decoded `payload_len > 0`, serialized data_layout
// should be extracted from the file.
let layout_block_size = codec::decode_u64(buf)?;
Ok(Self {
version,
data_layout: if layout_block_size == 0 {
DataLayout::default()
} else {
DataLayout::Alignment(layout_block_size)
},
})
} else {
Err(Error::Corruption(format!(
"unrecognized log file version: {}",
v
"unrecognized log file data_layout, len: {}",
buf_len - Self::header_len()
)))
}
}

/// Encodes this header and appends the bytes to the provided buffer.
pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
buf.encode_u64(self.version.to_u64().unwrap())?;
buf.encode_u64(self.version.to_u64().unwrap())?; // encode version
if Self::payload_len(self.version) > 0 {
buf.encode_u64(self.data_layout.to_u64())?; // encode datay_layout
let corrupted_data_layout = || {
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
fail::fail_point!("log_file_header::corrupted_data_layout", |_| true);
false
};
if corrupted_data_layout() {
buf.pop();
}
}
let corrupted = || {
fail::fail_point!("log_file_header::corrupted", |_| true);
false
Expand All @@ -133,13 +239,39 @@ impl LogFileFormat {
}
Ok(())
}

/// Return the aligned block size.
#[inline]
pub fn get_aligned_block_size(&self) -> usize {
self.data_layout.to_u64() as usize
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pipe_log::LogFileContext;

#[test]
fn test_check_paddings_is_valid() {
// normal buffer
let mut buf = vec![0; 128];
// len < 8
assert!(is_valid_paddings(&buf[0..6]).unwrap());
// len == 8
assert!(is_valid_paddings(&buf[120..]).unwrap());
// len > 8
assert!(is_valid_paddings(&buf[..]).unwrap());

// abnormal buffer
buf[125] = 3_u8;
assert!(is_valid_paddings(&buf[120..125]).unwrap());
assert!(!is_valid_paddings(&buf[124..127]).unwrap());
assert!(!is_valid_paddings(&buf[120..]).unwrap());
assert!(!is_valid_paddings(&buf[110..]).unwrap());
assert!(!is_valid_paddings(&buf[..]).unwrap());
}

#[test]
fn test_file_name() {
let file_name: &str = "0000000000000123.raftlog";
Expand Down Expand Up @@ -172,23 +304,101 @@ mod tests {
assert_eq!(version, version2);
}

#[test]
fn test_data_layout() {
let data_layout = DataLayout::default();
assert_eq!(data_layout.to_u64(), DataLayout::NoAlignment.to_u64());
assert_eq!(DataLayout::Alignment(16).to_u64(), 16);
assert_eq!(DataLayout::from_u64(0), DataLayout::default());
assert_eq!(DataLayout::from_u64(4096), DataLayout::Alignment(4096));
assert_eq!(DataLayout::len(), 8);
}

#[test]
fn test_file_header() {
let header1 = LogFileFormat::default();
assert_eq!(header1.version().to_u64().unwrap(), 1);
assert_eq!(header1.data_layout().to_u64(), 0);
let header2 = LogFileFormat::from_version(Version::default());
assert_eq!(header2.version().to_u64(), header1.version().to_u64());
let header3 = LogFileFormat::from_version(Version::default());
assert_eq!(header1.data_layout().to_u64(), 0);
let header3 = LogFileFormat::from_version(header1.version());
assert_eq!(header3.version(), header1.version());
assert_eq!(header1.data_layout().to_u64(), 0);
assert_eq!(header1.enc_len(), LogFileFormat::header_len());
assert_eq!(header2.enc_len(), LogFileFormat::header_len());
assert_eq!(header3.enc_len(), LogFileFormat::header_len());
let header4 = LogFileFormat {
version: Version::V2,
data_layout: DataLayout::Alignment(16),
};
assert_eq!(
header4.enc_len(),
LogFileFormat::header_len() + LogFileFormat::payload_len(header4.version)
);
}

#[test]
fn test_encoding_decoding_file_format() {
fn enc_dec_file_format(file_format: LogFileFormat) -> Result<LogFileFormat> {
let mut buf = Vec::with_capacity(
LogFileFormat::header_len() + LogFileFormat::payload_len(file_format.version),
);
assert!(file_format.encode(&mut buf).is_ok());
LogFileFormat::decode(&mut &buf[..])
}
// header with aligned-sized data_layout
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let version = Version::V2;
let data_layout = DataLayout::Alignment(4096);
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(version.to_u64().unwrap()).is_ok());
assert!(buf.encode_u64(data_layout.to_u64()).is_ok());
assert_eq!(
LogFileFormat::decode(&mut &buf[..]).unwrap(),
LogFileFormat::new(version, data_layout)
);
}
// header with abnormal version
{
let mut buf = Vec::with_capacity(LogFileFormat::header_len());
let abnormal_version = 4_u64; /* abnormal version */
buf.extend_from_slice(LOG_FILE_MAGIC_HEADER);
assert!(buf.encode_u64(abnormal_version).is_ok());
assert!(buf.encode_u64(16).is_ok());
assert!(LogFileFormat::decode(&mut &buf[..]).is_err());
}
// header with Version::default and DataLayout::Alignment(_)
{
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(0));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
let file_format = LogFileFormat::new(Version::default(), DataLayout::Alignment(4096));
assert_eq!(
LogFileFormat::new(Version::default(), DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
}
// header with Version::V2 and DataLayout::Alignment(0)
{
let file_format = LogFileFormat::new(Version::V2, DataLayout::Alignment(0));
assert_eq!(
LogFileFormat::new(Version::V2, DataLayout::NoAlignment),
enc_dec_file_format(file_format).unwrap()
);
}
}

#[test]
fn test_file_context() {
let mut file_context =
LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
LogFileContext::new(FileId::dummy(LogQueue::Append), LogFileFormat::default());
assert_eq!(file_context.get_signature(), None);
file_context.id.seq = 10;
file_context.version = Version::V2;
file_context.format.version = Version::V2;
assert_eq!(file_context.get_signature().unwrap(), 10);
let abnormal_seq = (file_context.id.seq << 32) as u64 + 100_u64;
file_context.id.seq = abnormal_seq;
Expand Down
Loading