From cd3e2ef2aed2983b30a19fdec5e34fea77533953 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 8 May 2024 10:51:04 +0800 Subject: [PATCH 01/11] chore: move wal utils to util crate Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/utils/src/lib.rs | 2 ++ .../storage => utils/src}/wal/framed.rs | 4 +-- .../wal/util.rs => utils/src/wal/mod.rs} | 35 ++++++++++--------- .../storage => utils/src}/wal/pipeline.rs | 6 ++-- 4 files changed, 26 insertions(+), 21 deletions(-) rename crates/{curp/src/server/storage => utils/src}/wal/framed.rs (90%) rename crates/{curp/src/server/storage/wal/util.rs => utils/src/wal/mod.rs} (85%) rename crates/{curp/src/server/storage => utils/src}/wal/pipeline.rs (97%) diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index b3296b9cd..1e241cffe 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -207,6 +207,8 @@ pub mod task_manager; pub mod tokio_lock; /// utils for pass span context pub mod tracing; +/// WAL utils +pub mod wal; use ::tracing::debug; pub use parser::*; diff --git a/crates/curp/src/server/storage/wal/framed.rs b/crates/utils/src/wal/framed.rs similarity index 90% rename from crates/curp/src/server/storage/wal/framed.rs rename to crates/utils/src/wal/framed.rs index d5ea00e19..96d3c5e90 100644 --- a/crates/curp/src/server/storage/wal/framed.rs +++ b/crates/utils/src/wal/framed.rs @@ -1,7 +1,7 @@ use std::io; /// Decoding of frames via buffers. -pub(super) trait Decoder { +pub trait Decoder { /// The type of decoded frames. type Item; @@ -13,7 +13,7 @@ pub(super) trait Decoder { } /// Trait of helper objects to write out messages as bytes -pub(super) trait Encoder { +pub trait Encoder { /// The type of encoding errors. type Error: From; diff --git a/crates/curp/src/server/storage/wal/util.rs b/crates/utils/src/wal/mod.rs similarity index 85% rename from crates/curp/src/server/storage/wal/util.rs rename to crates/utils/src/wal/mod.rs index 080990990..657ea6e34 100644 --- a/crates/curp/src/server/storage/wal/util.rs +++ b/crates/utils/src/wal/mod.rs @@ -1,3 +1,9 @@ +/// Framed +pub mod framed; + +/// File pipeline +pub mod pipeline; + use std::{ fs::{File as StdFile, OpenOptions}, io, @@ -10,7 +16,7 @@ use tokio::fs::File as TokioFile; /// File that is exclusively locked #[derive(Debug)] -pub(super) struct LockedFile { +pub struct LockedFile { /// The inner std file file: Option, /// The path of the file @@ -19,7 +25,7 @@ pub(super) struct LockedFile { impl LockedFile { /// Opens the file in read and append mode - pub(super) fn open_rw(path: impl AsRef) -> io::Result { + pub fn open_rw(path: impl AsRef) -> io::Result { let file = OpenOptions::new() .create(true) .read(true) @@ -34,7 +40,7 @@ impl LockedFile { } /// Pre-allocates the file - pub(super) fn preallocate(&mut self, size: u64) -> io::Result<()> { + pub fn preallocate(&mut self, size: u64) -> io::Result<()> { if size == 0 { return Ok(()); } @@ -43,14 +49,14 @@ impl LockedFile { } /// Gets the path of this file - pub(super) fn path(&self) -> PathBuf { + pub fn path(&self) -> PathBuf { self.path.clone() } /// Renames the current file /// /// We will discard this file if the rename has failed - pub(super) fn rename(mut self, new_name: impl AsRef) -> io::Result { + pub fn rename(mut self, new_name: impl AsRef) -> io::Result { let mut new_path = parent_dir(&self.path); new_path.push(new_name.as_ref()); std::fs::rename(&self.path, &new_path)?; @@ -63,7 +69,7 @@ impl LockedFile { } /// Converts self to std file - pub(super) fn into_std(self) -> StdFile { + pub fn into_std(self) -> StdFile { let mut this = std::mem::ManuallyDrop::new(self); this.file .take() @@ -87,10 +93,7 @@ impl Drop for LockedFile { } /// Gets the all files with the extension under the given folder -pub(super) fn get_file_paths_with_ext( - dir: impl AsRef, - ext: &str, -) -> io::Result> { +pub fn get_file_paths_with_ext(dir: impl AsRef, ext: &str) -> io::Result> { let mut files = vec![]; for result in std::fs::read_dir(dir)? { let file = result?; @@ -104,14 +107,14 @@ pub(super) fn get_file_paths_with_ext( } /// Gets the parent dir -pub(super) fn parent_dir(dir: impl AsRef) -> PathBuf { +pub fn parent_dir(dir: impl AsRef) -> PathBuf { let mut parent = PathBuf::from(dir.as_ref()); let _ignore = parent.pop(); parent } /// Fsyncs the parent directory -pub(super) fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { +pub fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { let parent_dir = parent_dir(&dir); let parent = std::fs::File::open(parent_dir)?; parent.sync_all()?; @@ -120,24 +123,24 @@ pub(super) fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { } /// Gets the checksum of the slice, we use Sha256 as the hash function -pub(super) fn get_checksum(data: &[u8]) -> Output { +pub fn get_checksum(data: &[u8]) -> Output { let mut hasher = Sha256::new(); hasher.update(data); hasher.finalize() } /// Validates the the data with the given checksum -pub(super) fn validate_data(data: &[u8], checksum: &[u8]) -> bool { +pub fn validate_data(data: &[u8], checksum: &[u8]) -> bool { AsRef::<[u8]>::as_ref(&get_checksum(data)) == checksum } /// Checks whether the file exist -pub(super) fn is_exist(path: impl AsRef) -> bool { +pub fn is_exist(path: impl AsRef) -> bool { std::fs::metadata(path).is_ok() } /// Parses a u64 from u8 slice -pub(super) fn parse_u64(bytes_le: &[u8]) -> u64 { +pub fn parse_u64(bytes_le: &[u8]) -> u64 { assert_eq!(bytes_le.len(), 8, "The slice passed should be 8 bytes long"); u64::from_le_bytes( bytes_le diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/utils/src/wal/pipeline.rs similarity index 97% rename from crates/curp/src/server/storage/wal/pipeline.rs rename to crates/utils/src/wal/pipeline.rs index deadd5418..4ea9b89b5 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/utils/src/wal/pipeline.rs @@ -19,7 +19,7 @@ use super::util::LockedFile; const TEMP_FILE_EXT: &str = ".tmp"; /// The file pipeline, used for pipelining the creation of temp file -pub(super) struct FilePipeline { +pub struct FilePipeline { /// The directory where the temp files are created dir: PathBuf, /// The size of the temp file @@ -35,7 +35,7 @@ pub(super) struct FilePipeline { impl FilePipeline { /// Creates a new `FilePipeline` - pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { + pub fn new(dir: PathBuf, file_size: u64) -> io::Result { Self::clean_up(&dir)?; let (file_tx, file_rx) = flume::bounded(1); @@ -102,7 +102,7 @@ impl FilePipeline { } /// Stops the pipeline - pub(super) fn stop(&mut self) { + pub fn stop(&mut self) { self.stopped.store(true, Ordering::Relaxed); } From 5b6add20f4dd39ad9d9da8cdf33736442324e5a1 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 8 May 2024 11:36:48 +0800 Subject: [PATCH 02/11] chore: fix imports in utils wal Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 6 ++++++ crates/utils/Cargo.toml | 6 ++++++ crates/utils/src/wal/mod.rs | 11 ++++------- crates/utils/src/wal/pipeline.rs | 9 ++------- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6612efd3b..dd15cd73a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3303,6 +3303,10 @@ dependencies = [ "clippy-utilities", "dashmap", "derive_builder", + "event-listener", + "flume", + "fs2", + "futures", "getset", "madsim-tokio", "madsim-tonic", @@ -3315,6 +3319,8 @@ dependencies = [ "petgraph", "rand", "serde", + "sha2", + "tempfile", "test-macros", "thiserror", "toml", diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 965d7cb3d..efc808360 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -22,6 +22,10 @@ async-trait = { version = "0.1.80", optional = true } clippy-utilities = "0.2.0" dashmap = "5.5.3" derive_builder = "0.20.0" +event-listener = "5.3.0" +flume = "0.11.0" +fs2 = "0.4.3" +futures = "0.3.30" getset = "0.1" opentelemetry = { version = "0.22.0", features = ["trace"] } opentelemetry_sdk = { version = "0.22.1", features = ["trace"] } @@ -30,6 +34,8 @@ pbkdf2 = { version = "0.12.2", features = ["simple"] } petgraph = "0.6.4" rand = "0.8.5" serde = { version = "1.0.199", features = ["derive"] } +sha2 = "0.10.8" +tempfile = "3.10.1" thiserror = "1.0.60" tokio = { version = "0.2.25", package = "madsim-tokio", features = [ "sync", diff --git a/crates/utils/src/wal/mod.rs b/crates/utils/src/wal/mod.rs index 657ea6e34..b66cf7d89 100644 --- a/crates/utils/src/wal/mod.rs +++ b/crates/utils/src/wal/mod.rs @@ -12,7 +12,6 @@ use std::{ use fs2::FileExt; use sha2::{digest::Output, Digest, Sha256}; -use tokio::fs::File as TokioFile; /// File that is exclusively locked #[derive(Debug)] @@ -151,20 +150,18 @@ pub fn parse_u64(bytes_le: &[u8]) -> u64 { #[cfg(test)] mod tests { - use std::{io::Read, process::Command}; - use super::*; #[test] fn file_rename_is_ok() { - let mut tempdir = tempfile::tempdir().unwrap(); + let tempdir = tempfile::tempdir().unwrap(); let mut path = PathBuf::from(tempdir.path()); path.push("file.test"); let lfile = LockedFile::open_rw(&path).unwrap(); let new_name = "new_name.test"; let mut new_path = parent_dir(&path); new_path.push(new_name); - lfile.rename(new_name); + lfile.rename(new_name).unwrap(); assert!(!is_exist(path)); assert!(is_exist(new_path)); } @@ -172,10 +169,10 @@ mod tests { #[test] #[allow(clippy::verbose_file_reads)] // false positive fn file_open_is_exclusive() { - let mut tempdir = tempfile::tempdir().unwrap(); + let tempdir = tempfile::tempdir().unwrap(); let mut path = PathBuf::from(tempdir.path()); path.push("file.test"); - let mut lfile = LockedFile::open_rw(&path).unwrap(); + let _lfile = LockedFile::open_rw(&path).unwrap(); assert!( LockedFile::open_rw(&path).is_err(), "acquire lock should failed" diff --git a/crates/utils/src/wal/pipeline.rs b/crates/utils/src/wal/pipeline.rs index 4ea9b89b5..6cf2507ca 100644 --- a/crates/utils/src/wal/pipeline.rs +++ b/crates/utils/src/wal/pipeline.rs @@ -1,19 +1,15 @@ use std::{ io, - path::{Path, PathBuf}, + path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, - task::Poll, }; -use clippy_utilities::OverflowArithmetic; -use event_listener::Event; -use thiserror::Error; use tracing::error; -use super::util::LockedFile; +use super::LockedFile; /// The temp file extension const TEMP_FILE_EXT: &str = ".tmp"; @@ -158,7 +154,6 @@ impl std::fmt::Debug for FilePipeline { #[cfg(test)] mod tests { use super::*; - use crate::server::storage::wal::util::get_file_paths_with_ext; #[tokio::test] async fn file_pipeline_is_ok() { From fff581fb05d7b5e6b0123be9e5d750ca8ce0935d Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 8 May 2024 16:22:28 +0800 Subject: [PATCH 03/11] feat: implement sp wal codec Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/mod.rs | 3 + crates/curp/src/server/sp_storage/codec.rs | 326 +++++++++++++++++++++ crates/curp/src/server/sp_storage/error.rs | 40 +++ crates/curp/src/server/sp_storage/mod.rs | 5 + crates/utils/src/wal/mod.rs | 10 +- 5 files changed, 379 insertions(+), 5 deletions(-) create mode 100644 crates/curp/src/server/sp_storage/codec.rs create mode 100644 crates/curp/src/server/sp_storage/error.rs create mode 100644 crates/curp/src/server/sp_storage/mod.rs diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index b6fca3a99..bc9957c9e 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -54,6 +54,9 @@ mod lease_manager; /// Curp metrics mod metrics; +/// Speculative pool storage +mod sp_storage; + pub use storage::{db::DB, StorageApi, StorageError}; /// The Rpc Server to handle rpc requests diff --git a/crates/curp/src/server/sp_storage/codec.rs b/crates/curp/src/server/sp_storage/codec.rs new file mode 100644 index 000000000..dab106637 --- /dev/null +++ b/crates/curp/src/server/sp_storage/codec.rs @@ -0,0 +1,326 @@ +use std::{io, marker::PhantomData, sync::Arc}; + +use clippy_utilities::NumericCast; +use serde::{de::DeserializeOwned, Serialize}; +use sha2::{digest::Reset, Digest, Sha256}; +use utils::wal::{ + framed::{Decoder, Encoder}, + get_checksum, +}; + +use crate::rpc::ProposeId; + +use super::error::{CorruptType, WALError}; + +/// Invalid frame type +const INVALID: u8 = 0x00; +/// Entry frame type +const INSERT: u8 = 0x01; +/// Seal frame type +const REMOVE: u8 = 0x02; +/// Commit frame type +const COMMIT: u8 = 0x03; + +/// Getting the frame type +trait FrameType { + /// Returns the type of this frame + fn frame_type(&self) -> u8; +} + +/// Encoding of frames +trait FrameEncoder { + /// Encodes the current frame + fn encode(&self) -> Vec; +} + +/// The WAL codec +#[allow(clippy::upper_case_acronyms)] // The WAL needs to be all upper cases +#[derive(Debug)] +pub(super) struct WAL { + /// Frames stored in decoding + frames: Vec>, + /// The hasher state for decoding + hasher: H, +} + +/// Union type of WAL frames +#[derive(Debug)] +enum WALFrame { + /// Data frame type + Data(DataFrame), + /// Commit frame type + Commit(CommitFrame), +} + +/// The data frame +/// +/// Contains either a log entry or a seal index +#[derive(Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub(crate) enum DataFrame { + /// A Frame containing a Insert entry + Insert { propose_id: ProposeId, cmd: Arc }, + /// A Frame containing the Remove entry + Remove(ProposeId), +} + +/// The commit frame +/// +/// This frames contains a SHA256 checksum of all previous frames since last commit +#[derive(Debug)] +struct CommitFrame { + /// The SHA256 checksum + checksum: Vec, + /// Type of the hasher + phantom: PhantomData, +} + +impl WAL { + /// Creates a new WAL codec + pub(super) fn new() -> Self { + Self { + frames: Vec::new(), + hasher: Sha256::new(), + } + } +} + +impl Encoder>> for WAL +where + C: Serialize, + H: Digest, +{ + type Error = io::Error; + + /// Encodes a frame + fn encode(&mut self, frames: Vec>) -> Result, Self::Error> { + let mut frame_data = Vec::new(); + for frame in frames { + frame_data.extend_from_slice(&frame.encode()); + } + let commit_frame = CommitFrame::::new_from_data(&frame_data); + frame_data.extend_from_slice(&commit_frame.encode()); + + Ok(frame_data) + } +} + +impl Decoder for WAL +where + C: Serialize + DeserializeOwned, + H: Digest + Reset + Clone, +{ + type Item = Vec>; + + type Error = WALError; + + fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error> { + let mut current = 0; + loop { + let Some((frame, len)) = WALFrame::::decode(&src[current..])? else { + return Err(WALError::UnexpectedEof); + }; + let decoded_bytes = &src[current..current + len]; + current += len; + match frame { + WALFrame::Data(data) => { + self.frames.push(data); + self.hasher.update(decoded_bytes); + } + WALFrame::Commit(commit) => { + let checksum = self.hasher.clone().finalize(); + Digest::reset(&mut self.hasher); + if commit.validate(&checksum) { + return Ok((self.frames.drain(..).collect(), current)); + } + return Err(WALError::Corrupted(CorruptType::Checksum)); + } + } + } + } +} + +/// Encoded size of `ProposeID` in bytes +const PROPOSE_ID_SIZE: usize = 16; + +#[allow( + clippy::indexing_slicing, // Index slicings are checked + clippy::arithmetic_side_effects, // Arithmetics are checked + clippy::unnecessary_wraps // Use the wraps to make code more consistenct +)] +impl WALFrame +where + C: DeserializeOwned, + H: Digest, +{ + /// Decodes a frame from the buffer + /// + /// * The frame header memory layout + /// + /// 0 1 2 3 4 5 6 7 8 + /// |------+------+------+------+------+------+------+------| + /// | Type | Data | + /// |------+------+------+------+------+------+------+------| + /// + /// * The frame types + /// + /// |------------+-------+-------------------------------------------------------| + /// | Type | Value | Desc | + /// |------------+-------+-------------------------------------------------------| + /// | Invalid | 0x00 | Invalid type | + /// | Insert | 0x01 | Inserts a command | + /// | Remove | 0x02 | Removes a command | + /// | Commit | 0x03 | Stores the checksum | + /// |------------+-------+-------------------------------------------------------| + fn decode(src: &[u8]) -> Result, WALError> { + let frame_type = src[0]; + match frame_type { + INVALID => Err(WALError::UnexpectedEof), + INSERT => Self::decode_insert(&src), + REMOVE => Self::decode_remove(&src), + COMMIT => Self::decode_commit(&src), + _ => Err(WALError::Corrupted(CorruptType::Codec( + "Unexpected frame type".to_owned(), + ))), + } + } + + /// Decodes an entry frame from source + #[allow(clippy::unwrap_used)] + fn decode_insert(mut src: &[u8]) -> Result, WALError> { + const LEN_SIZE: usize = 8; + let Some(propose_id) = Self::decode_propose_id(&src) else { + return Ok(None); + }; + src = &src[PROPOSE_ID_SIZE..]; + let Ok(len_bytes) = src[..LEN_SIZE].try_into() else { + return Ok(None); + }; + src = &src[LEN_SIZE..]; + let len: usize = u64::from_le_bytes(len_bytes).numeric_cast(); + if src.len() < len { + return Ok(None); + } + let payload = &src[..len]; + let cmd: C = bincode::deserialize(payload) + .map_err(|e| WALError::Corrupted(CorruptType::Codec(e.to_string())))?; + + Ok(Some(( + Self::Data(DataFrame::Insert { + propose_id, + cmd: Arc::new(cmd), + }), + 24 + len, + ))) + } + + /// Decodes an seal index frame from source + fn decode_remove(src: &[u8]) -> Result, WALError> { + Ok(Self::decode_propose_id(&src) + .map(|id| WALFrame::Data(DataFrame::Remove(id))) + .map(|frame| (frame, PROPOSE_ID_SIZE))) + } + + /// Decodes data frame header + fn decode_propose_id(src: &[u8]) -> Option { + if src.len() < PROPOSE_ID_SIZE { + return None; + } + let mut seq_bytes = src[0..8].to_vec(); + seq_bytes.rotate_left(1); + seq_bytes[7] = 0; + let seq_num = u64::from_le_bytes(seq_bytes.try_into().unwrap()); + let client_id = u64::from_le_bytes(src[8..16].try_into().unwrap()); + Some(ProposeId(client_id, seq_num)) + } + + /// Decodes a commit frame from source + fn decode_commit(src: &[u8]) -> Result, WALError> { + let sum_size = ::output_size(); + Ok(src + .get(8..8 + sum_size) + .map(<[u8]>::to_vec) + .map(|checksum| { + Self::Commit(CommitFrame { + checksum, + phantom: PhantomData, + }) + }) + .map(|frame| (frame, 8 + sum_size))) + } +} + +impl FrameType for DataFrame { + fn frame_type(&self) -> u8 { + match *self { + DataFrame::Insert { .. } => INSERT, + DataFrame::Remove(_) => REMOVE, + } + } +} + +impl FrameEncoder for DataFrame +where + C: Serialize, +{ + #[allow(clippy::arithmetic_side_effects)] // The integer shift is safe + fn encode(&self) -> Vec { + match *self { + DataFrame::Insert { + propose_id: ProposeId(client_id, seq_num), + ref cmd, + } => { + assert_eq!(seq_num >> 56, 0, "seq num: {seq_num} too large"); + let entry_bytes = bincode::serialize(&cmd) + .unwrap_or_else(|_| unreachable!("serialization should never fail")); + let len = entry_bytes.len(); + let mut bytes = Vec::with_capacity(3 * 8 + entry_bytes.len()); + bytes.push(self.frame_type()); + bytes.extend_from_slice(&seq_num.to_le_bytes()[..7]); + bytes.extend_from_slice(&client_id.to_le_bytes()); + bytes.extend_from_slice(&len.to_le_bytes()); + bytes.extend_from_slice(&entry_bytes); + bytes + } + DataFrame::Remove(ProposeId(client_id, seq_num)) => { + assert_eq!(seq_num >> 56, 0, "seq num: {seq_num} too large"); + let mut bytes = Vec::with_capacity(2 * 8); + bytes.push(self.frame_type()); + bytes.extend_from_slice(&seq_num.to_le_bytes()[..7]); + bytes.extend_from_slice(&client_id.to_le_bytes()); + bytes + } + } + } +} + +impl CommitFrame { + /// Creates a commit frame of data + fn new_from_data(data: &[u8]) -> Self { + Self { + checksum: get_checksum::(data).to_vec(), + phantom: PhantomData, + } + } + + /// Validates the checksum + fn validate(&self, checksum: &[u8]) -> bool { + *checksum == self.checksum + } +} + +impl FrameType for CommitFrame { + fn frame_type(&self) -> u8 { + COMMIT + } +} + +impl FrameEncoder for CommitFrame { + fn encode(&self) -> Vec { + let mut bytes = Vec::with_capacity(8 + self.checksum.len()); + bytes.extend_from_slice(&[0; 8]); + bytes[0] = self.frame_type(); + bytes.extend_from_slice(&self.checksum); + bytes + } +} diff --git a/crates/curp/src/server/sp_storage/error.rs b/crates/curp/src/server/sp_storage/error.rs new file mode 100644 index 000000000..99b330e3c --- /dev/null +++ b/crates/curp/src/server/sp_storage/error.rs @@ -0,0 +1,40 @@ +use std::io; + +use thiserror::Error; + +/// Errors of the `WALStorage` +#[derive(Debug, Error)] +pub(crate) enum WALError { + #[error("WAL ended")] + UnexpectedEof, + /// The WAL corrupt error + #[error("WAL corrupted: {0}")] + Corrupted(CorruptType), + /// The IO error + #[error("IO error: {0}")] + IO(#[from] io::Error), +} + +/// The type of the `Corrupted` error +#[derive(Debug, Error)] +pub(crate) enum CorruptType { + /// Corrupt because of decode failure + #[error("Error occurred when decoding WAL: {0}")] + Codec(String), + /// Corrupt because of checksum failure + #[error("Checksumming for the file has failed")] + Checksum, + /// Corrupt because of some logs is missing + #[error("The recovered logs are not continue")] + LogNotContinue, +} + +impl WALError { + pub(super) fn io_or_corrupt(self) -> io::Result { + match self { + WALError::Corrupted(e) => Ok(e), + WALError::IO(e) => return Err(e), + WALError::UnexpectedEof => unreachable!("Should not call on WALError::MaybeEnded"), + } + } +} diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_storage/mod.rs new file mode 100644 index 000000000..3f470025b --- /dev/null +++ b/crates/curp/src/server/sp_storage/mod.rs @@ -0,0 +1,5 @@ +/// WAL codec +mod codec; + +/// WAL error +mod error; diff --git a/crates/utils/src/wal/mod.rs b/crates/utils/src/wal/mod.rs index b66cf7d89..1ba4a7b34 100644 --- a/crates/utils/src/wal/mod.rs +++ b/crates/utils/src/wal/mod.rs @@ -11,7 +11,7 @@ use std::{ }; use fs2::FileExt; -use sha2::{digest::Output, Digest, Sha256}; +use sha2::{digest::Output, Digest}; /// File that is exclusively locked #[derive(Debug)] @@ -122,15 +122,15 @@ pub fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { } /// Gets the checksum of the slice, we use Sha256 as the hash function -pub fn get_checksum(data: &[u8]) -> Output { - let mut hasher = Sha256::new(); +pub fn get_checksum(data: &[u8]) -> Output { + let mut hasher = H::new(); hasher.update(data); hasher.finalize() } /// Validates the the data with the given checksum -pub fn validate_data(data: &[u8], checksum: &[u8]) -> bool { - AsRef::<[u8]>::as_ref(&get_checksum(data)) == checksum +pub fn validate_data(data: &[u8], checksum: &[u8]) -> bool { + AsRef::<[u8]>::as_ref(&get_checksum::(data)) == checksum } /// Checks whether the file exist From 3469456ebe5ae095953ed4d0e07cb421b4c29cf0 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 9 May 2024 09:36:55 +0800 Subject: [PATCH 04/11] feat: implement sp wal segment Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/sp_storage/config.rs | 51 +++++ crates/curp/src/server/sp_storage/mod.rs | 55 +++++ crates/curp/src/server/sp_storage/segment.rs | 221 +++++++++++++++++++ 3 files changed, 327 insertions(+) create mode 100644 crates/curp/src/server/sp_storage/config.rs create mode 100644 crates/curp/src/server/sp_storage/segment.rs diff --git a/crates/curp/src/server/sp_storage/config.rs b/crates/curp/src/server/sp_storage/config.rs new file mode 100644 index 000000000..2372e6444 --- /dev/null +++ b/crates/curp/src/server/sp_storage/config.rs @@ -0,0 +1,51 @@ +use std::path::{Path, PathBuf}; + +/// Size in bytes per segment, default is 64MiB +const DEFAULT_INSERT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024; + +/// Size in bytes per segment, default is 1MiB +const DEFAULT_REMOVE_SEGMENT_SIZE: u64 = 1024 * 1024; + +/// The config for WAL +#[derive(Debug, Clone)] +pub(crate) struct WALConfig { + /// The path of this config + pub(super) dir: PathBuf, + /// The maximum size of this segment + /// + /// NOTE: This is a soft limit, the actual size may larger than this + pub(super) max_insert_segment_size: u64, + /// The maximum size of this segment + /// + /// NOTE: This is a soft limit, the actual size may larger than this + pub(super) max_remove_segment_size: u64, +} + +impl WALConfig { + /// Creates a new `WALConfig` + pub(crate) fn new(dir: impl AsRef) -> Self { + Self { + dir: dir.as_ref().into(), + max_insert_segment_size: DEFAULT_INSERT_SEGMENT_SIZE, + max_remove_segment_size: DEFAULT_REMOVE_SEGMENT_SIZE, + } + } + + /// Sets the `max_insert_segment_size` + pub(crate) fn with_max_insert_segment_size(self, size: u64) -> Self { + Self { + dir: self.dir, + max_remove_segment_size: self.max_remove_segment_size, + max_insert_segment_size: size, + } + } + + /// Sets the `max_remove_segment_size` + pub(crate) fn with_max_remove_segment_size(self, size: u64) -> Self { + Self { + dir: self.dir, + max_insert_segment_size: self.max_insert_segment_size, + max_remove_segment_size: size, + } + } +} diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_storage/mod.rs index 3f470025b..c4b676e43 100644 --- a/crates/curp/src/server/sp_storage/mod.rs +++ b/crates/curp/src/server/sp_storage/mod.rs @@ -1,5 +1,60 @@ +use std::{fs::File, sync::Arc}; + +use curp_external_api::cmd::Command; + +use crate::rpc::ProposeId; + +use self::{config::WALConfig, error::WALError}; + /// WAL codec mod codec; /// WAL error mod error; + +/// WAL config +mod config; + +/// WAL segment +mod segment; + +/// Operations of speculative pool WAL +pub(crate) trait PoolWALOps { + /// Insert a command to WAL + fn insert(&self, propose_id: ProposeId, cmd: Arc) -> Result<(), WALError>; + + /// Removes a command from WAL + fn remove(&self, propose_id: ProposeId) -> Result<(), WALError>; + + /// Recover all commands stored in WAL + fn recover(&self) -> Result, WALError>; +} + +/// WAL of speculative pool +struct SpeculativePoolWAL { + /// WAL config + config: WALConfig, +} + +impl PoolWALOps for SpeculativePoolWAL { + fn insert(&self, propose_id: ProposeId, cmd: Arc) -> Result<(), WALError> { + todo!() + } + + fn remove(&self, propose_id: ProposeId) -> Result<(), WALError> { + todo!() + } + + fn recover(&self) -> Result, WALError> { + todo!() + } +} + +struct InsertWAL { + /// WAL segments + segments: Vec, + /// Next segment id + next_segment_id: u64, +} + +struct RemoveWAL {} diff --git a/crates/curp/src/server/sp_storage/segment.rs b/crates/curp/src/server/sp_storage/segment.rs new file mode 100644 index 000000000..83de5c887 --- /dev/null +++ b/crates/curp/src/server/sp_storage/segment.rs @@ -0,0 +1,221 @@ +use std::{ + fs::File, + io::{self, Read, Write}, +}; + +use clippy_utilities::{NumericCast, OverflowArithmetic}; +use sha2::Sha256; +use utils::wal::{ + framed::{Decoder, Encoder}, + get_checksum, parse_u64, validate_data, LockedFile, +}; + +use crate::rpc::ProposeId; + +use super::{ + codec::DataFrame, + error::{CorruptType, WALError}, +}; + +/// The magic of the WAL file +const WAL_MAGIC: u32 = 0xa0ec_41ff; + +/// The current WAL version +const WAL_VERSION: u8 = 0x00; + +/// The size of wal file header in bytes +const WAL_HEADER_SIZE: usize = 48; + +pub trait SegmentAttr { + /// Segment file extension + fn ext() -> String; +} + +// TODO: merge reusable wal code +struct Segment { + /// The opened file of this segment + file: File, + /// The id of this segment + segment_id: u64, + /// The soft size limit of this segment + size_limit: u64, + /// The file size of the segment + size: u64, + /// Propose ids of this segment + propose_ids: Vec, + /// Codec of this segment + codec: Codec, + /// Type of this Segment + r#type: T, +} + +impl Segment +where + T: SegmentAttr, +{ + /// Creates a new `WALSegment` + pub(super) fn create( + tmp_file: LockedFile, + segment_id: u64, + size_limit: u64, + codec: Codec, + r#type: T, + ) -> io::Result { + let segment_name = Self::segment_name(segment_id); + let lfile = tmp_file.rename(segment_name)?; + let mut file = lfile.into_std(); + file.write_all(&Self::gen_header(segment_id))?; + file.flush()?; + file.sync_data()?; + + Ok(Self { + file, + segment_id, + size_limit, + size: 0, + propose_ids: Vec::new(), + codec, + r#type, + }) + } + + /// Open an existing WAL segment file + pub(super) fn open( + lfile: LockedFile, + size_limit: u64, + codec: Codec, + r#type: T, + ) -> Result { + let mut file = lfile.into_std(); + let size = file.metadata()?.len(); + let mut buf = vec![0; WAL_HEADER_SIZE]; + let _ignore = file.read_exact(&mut buf)?; + let segment_id = Self::parse_header(&buf)?; + + Ok(Self { + file, + segment_id, + size_limit, + size, + propose_ids: Vec::new(), + codec, + r#type, + }) + } + + /// Gets the file name of the WAL segment + fn segment_name(segment_id: u64) -> String { + format!("{segment_id:016x}{}", T::ext()) + } + + #[allow(clippy::doc_markdown)] // False positive for ASCII graph + /// Generate the header + /// + /// The header layout: + /// + /// 0 1 2 3 4 5 6 7 8 + /// |------+------+------+------+------+------+------+------| + /// | Magic | Reserved | Vsn | + /// |------+------+------+------+------+------+------+------| + /// | SegmentID | + /// |------+------+------+------+------+------+------+------| + /// | Checksum (32bytes) ... | + /// |------+------+------+------+------+------+------+------| + fn gen_header(segment_id: u64) -> Vec { + let mut buf = vec![]; + buf.extend(WAL_MAGIC.to_le_bytes()); + buf.extend(vec![0; 3]); + buf.push(WAL_VERSION); + buf.extend(segment_id.to_le_bytes()); + buf.extend(get_checksum::(&buf)); + buf + } + + /// Parse the header from the given buffer + #[allow( + clippy::unwrap_used, // Unwraps are used to convert slice to const length and is safe + clippy::arithmetic_side_effects, // Arithmetics cannot overflow + clippy::indexing_slicing // Index slicings are checked + )] + fn parse_header(src: &[u8]) -> Result { + let mut offset = 0; + let mut next_field = |len: usize| { + offset += len; + &src[(offset - len)..offset] + }; + let parse_error = Err(WALError::Corrupted(CorruptType::Codec( + "Segment file header parsing has failed".to_owned(), + ))); + if src.len() != WAL_HEADER_SIZE + || next_field(4) != WAL_MAGIC.to_le_bytes() + || next_field(3) != [0; 3] + || next_field(1) != [WAL_VERSION] + { + return parse_error; + } + let segment_id = parse_u64(next_field(8)); + let checksum = next_field(32); + + if !validate_data::(&src[0..24], checksum) { + return parse_error; + } + + Ok(segment_id) + } +} + +impl Segment { + /// Writes an item to the segment + pub(super) fn write_sync(&mut self, item: Item) -> io::Result<()> + where + Codec: Encoder, + { + let encoded = self.codec.encode(item)?; + self.file.write_all(&encoded)?; + self.size = self.size.overflow_add(encoded.len().numeric_cast()); + self.file.flush()?; + self.file.sync_data()?; + + Ok(()) + } + + /// Gets all items from the segment + pub(super) fn get_all(&mut self) -> Result, Err> + where + Err: From, + Codec: Decoder, + { + let mut buf = Vec::new(); + let _ignore = self.file.read_to_end(&mut buf)?; + let mut pos = 0; + let mut entries = Vec::new(); + while pos < buf.len() { + let (item, n) = self.codec.decode(&buf[pos..])?; + entries.push(item); + pos += n; + } + Ok(entries) + } + + /// Recover all entries of this segment + fn recover(&mut self) -> Result, WALError> + where + Codec: Decoder>, Error = WALError>, + { + let frames: Vec<_> = self.get_all()?.into_iter().flatten().collect(); + for frame in frames { + match frame { + DataFrame::Insert { propose_id, cmd } => todo!(), + DataFrame::Remove(_) => todo!(), + } + } + } +} + +struct Insert {} + +impl SegmentAttr for Insert { + fn ext() -> String { + ".inswal".to_string() + } +} From efac803435235b6c1657694016d5f55c94fc7abc Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 9 May 2024 14:52:16 +0800 Subject: [PATCH 05/11] feat: implement sp wal storage Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/sp_storage/codec.rs | 25 +- crates/curp/src/server/sp_storage/config.rs | 19 +- crates/curp/src/server/sp_storage/error.rs | 12 + crates/curp/src/server/sp_storage/mod.rs | 320 ++++++++++++++++++- crates/curp/src/server/sp_storage/segment.rs | 117 ++++++- 5 files changed, 454 insertions(+), 39 deletions(-) diff --git a/crates/curp/src/server/sp_storage/codec.rs b/crates/curp/src/server/sp_storage/codec.rs index dab106637..5d575c949 100644 --- a/crates/curp/src/server/sp_storage/codec.rs +++ b/crates/curp/src/server/sp_storage/codec.rs @@ -2,7 +2,7 @@ use std::{io, marker::PhantomData, sync::Arc}; use clippy_utilities::NumericCast; use serde::{de::DeserializeOwned, Serialize}; -use sha2::{digest::Reset, Digest, Sha256}; +use sha2::{digest::Reset, Digest}; use utils::wal::{ framed::{Decoder, Encoder}, get_checksum, @@ -36,7 +36,7 @@ trait FrameEncoder { /// The WAL codec #[allow(clippy::upper_case_acronyms)] // The WAL needs to be all upper cases #[derive(Debug)] -pub(super) struct WAL { +pub(super) struct WAL { /// Frames stored in decoding frames: Vec>, /// The hasher state for decoding @@ -45,7 +45,7 @@ pub(super) struct WAL { /// Union type of WAL frames #[derive(Debug)] -enum WALFrame { +enum WALFrame { /// Data frame type Data(DataFrame), /// Commit frame type @@ -64,6 +64,16 @@ pub(crate) enum DataFrame { Remove(ProposeId), } +impl DataFrame { + /// Gets the propose id encoded in this frame + pub(crate) fn propose_id(&self) -> ProposeId { + match *self { + DataFrame::Insert { propose_id, .. } => propose_id, + DataFrame::Remove(propose_id) => propose_id, + } + } +} + /// The commit frame /// /// This frames contains a SHA256 checksum of all previous frames since last commit @@ -75,12 +85,15 @@ struct CommitFrame { phantom: PhantomData, } -impl WAL { +impl WAL +where + H: Digest, +{ /// Creates a new WAL codec pub(super) fn new() -> Self { Self { frames: Vec::new(), - hasher: Sha256::new(), + hasher: H::new(), } } } @@ -117,7 +130,7 @@ where fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error> { let mut current = 0; loop { - let Some((frame, len)) = WALFrame::::decode(&src[current..])? else { + let Some((frame, len)) = WALFrame::::decode(&src[current..])? else { return Err(WALError::UnexpectedEof); }; let decoded_bytes = &src[current..current + len]; diff --git a/crates/curp/src/server/sp_storage/config.rs b/crates/curp/src/server/sp_storage/config.rs index 2372e6444..e57312d07 100644 --- a/crates/curp/src/server/sp_storage/config.rs +++ b/crates/curp/src/server/sp_storage/config.rs @@ -9,8 +9,10 @@ const DEFAULT_REMOVE_SEGMENT_SIZE: u64 = 1024 * 1024; /// The config for WAL #[derive(Debug, Clone)] pub(crate) struct WALConfig { - /// The path of this config - pub(super) dir: PathBuf, + /// The path of insert wal directory + pub(super) insert_dir: PathBuf, + /// The path of remove wal directory + pub(super) remove_dir: PathBuf, /// The maximum size of this segment /// /// NOTE: This is a soft limit, the actual size may larger than this @@ -24,8 +26,13 @@ pub(crate) struct WALConfig { impl WALConfig { /// Creates a new `WALConfig` pub(crate) fn new(dir: impl AsRef) -> Self { + let mut insert_dir: PathBuf = dir.as_ref().into(); + let mut remove_dir: PathBuf = dir.as_ref().into(); + insert_dir.push("insert"); + remove_dir.push("remove"); Self { - dir: dir.as_ref().into(), + insert_dir, + remove_dir, max_insert_segment_size: DEFAULT_INSERT_SEGMENT_SIZE, max_remove_segment_size: DEFAULT_REMOVE_SEGMENT_SIZE, } @@ -34,7 +41,8 @@ impl WALConfig { /// Sets the `max_insert_segment_size` pub(crate) fn with_max_insert_segment_size(self, size: u64) -> Self { Self { - dir: self.dir, + insert_dir: self.insert_dir, + remove_dir: self.remove_dir, max_remove_segment_size: self.max_remove_segment_size, max_insert_segment_size: size, } @@ -43,7 +51,8 @@ impl WALConfig { /// Sets the `max_remove_segment_size` pub(crate) fn with_max_remove_segment_size(self, size: u64) -> Self { Self { - dir: self.dir, + insert_dir: self.insert_dir, + remove_dir: self.remove_dir, max_insert_segment_size: self.max_insert_segment_size, max_remove_segment_size: size, } diff --git a/crates/curp/src/server/sp_storage/error.rs b/crates/curp/src/server/sp_storage/error.rs index 99b330e3c..3e4f899ba 100644 --- a/crates/curp/src/server/sp_storage/error.rs +++ b/crates/curp/src/server/sp_storage/error.rs @@ -38,3 +38,15 @@ impl WALError { } } } + +impl From for io::Error { + fn from(err: WALError) -> Self { + match err { + WALError::UnexpectedEof => { + io::Error::new(io::ErrorKind::UnexpectedEof, err.to_string()) + } + WALError::Corrupted(_) => io::Error::new(io::ErrorKind::InvalidData, err.to_string()), + WALError::IO(e) => e, + } + } +} diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_storage/mod.rs index c4b676e43..ee3955d21 100644 --- a/crates/curp/src/server/sp_storage/mod.rs +++ b/crates/curp/src/server/sp_storage/mod.rs @@ -1,10 +1,27 @@ -use std::{fs::File, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + io, + path::Path, + sync::Arc, +}; +use clippy_utilities::OverflowArithmetic; use curp_external_api::cmd::Command; +use itertools::Itertools; +use parking_lot::Mutex; +use serde::{de::DeserializeOwned, Serialize}; +use sha2::Sha256; +use tracing::{debug, error}; +use utils::wal::{get_file_paths_with_ext, pipeline::FilePipeline, LockedFile}; -use crate::rpc::ProposeId; +use crate::rpc::{PoolEntry, ProposeId}; -use self::{config::WALConfig, error::WALError}; +use self::{ + codec::DataFrame, + config::WALConfig, + error::WALError, + segment::{Segment, SegmentAttr, ToDrop}, +}; /// WAL codec mod codec; @@ -18,43 +35,312 @@ mod config; /// WAL segment mod segment; +/// WAL Result +type Result = std::result::Result; + +/// Codec of this WAL +type WALCodec = codec::WAL; + /// Operations of speculative pool WAL pub(crate) trait PoolWALOps { /// Insert a command to WAL - fn insert(&self, propose_id: ProposeId, cmd: Arc) -> Result<(), WALError>; + fn insert(&self, entries: Vec>) -> io::Result<()>; /// Removes a command from WAL - fn remove(&self, propose_id: ProposeId) -> Result<(), WALError>; + fn remove(&self, propose_ids: Vec) -> io::Result<()>; /// Recover all commands stored in WAL - fn recover(&self) -> Result, WALError>; + fn recover(&self) -> io::Result>>; + + /// Try GC by propose ids + /// + /// The `check_fn` should filter out obsolete propose ids + fn gc(&self, check_fn: F) -> io::Result<()> + where + F: Fn(&[ProposeId]) -> &[ProposeId]; } /// WAL of speculative pool -struct SpeculativePoolWAL { +struct SpeculativePoolWAL { /// WAL config config: WALConfig, + /// Insert WAL + insert: Mutex>, + /// Remove WAL + remove: Mutex>, + /// Drop tx + drop_tx: Option>>>, + /// Drop task handle + drop_task_handle: Option>, +} + +impl SpeculativePoolWAL +where + C: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + #[allow(unused)] + fn new(config: WALConfig) -> io::Result { + if !config.insert_dir.try_exists()? { + std::fs::create_dir_all(&config.insert_dir)?; + } + if !config.remove_dir.try_exists()? { + std::fs::create_dir_all(&config.insert_dir)?; + } + let (drop_tx, drop_rx) = flume::unbounded(); + let handle = Self::spawn_dropping_task(drop_rx); + + Ok(Self { + insert: Mutex::new(WAL::new( + &config.insert_dir, + config.max_insert_segment_size, + )?), + remove: Mutex::new(WAL::new( + &config.remove_dir, + config.max_remove_segment_size, + )?), + config, + drop_tx: Some(drop_tx), + drop_task_handle: Some(handle), + }) + } + + fn spawn_dropping_task( + drop_rx: flume::Receiver>>, + ) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + while let Ok(segment) = drop_rx.recv() { + match segment { + ToDrop::Insert(_) => debug!("Removing insert segment file"), + ToDrop::Remove(_) => debug!("Removing remove segment file"), + } + // The segment will be removed on drop + drop(segment); + } + }) + } } -impl PoolWALOps for SpeculativePoolWAL { - fn insert(&self, propose_id: ProposeId, cmd: Arc) -> Result<(), WALError> { - todo!() +impl PoolWALOps for SpeculativePoolWAL { + fn insert(&self, entries: Vec>) -> io::Result<()> { + self.insert.lock().insert(entries) + } + + fn remove(&self, propose_ids: Vec) -> io::Result<()> { + self.remove.lock().remove(propose_ids) + } + + fn recover(&self) -> io::Result>> { + let mut insert_l = self.insert.lock(); + let mut remove_l = self.remove.lock(); + let mut cmds = insert_l.recover(&self.config.insert_dir)?; + let removed = remove_l.recover(&self.config.remove_dir)?; + let mut to_invalidate = Vec::new(); + for id in removed { + if cmds.remove(&id).is_none() { + to_invalidate.push(id); + } + } + Ok(cmds + .into_iter() + .map(|(id, cmd)| PoolEntry::new(id, cmd)) + .collect()) } - fn remove(&self, propose_id: ProposeId) -> Result<(), WALError> { - todo!() + fn gc(&self, check_fn: F) -> io::Result<()> + where + F: Fn(&[ProposeId]) -> &[ProposeId], + { + let mut insert_l = self.insert.lock(); + let mut remove_l = self.remove.lock(); + let all_ids: Vec<_> = insert_l + .all_ids() + .into_iter() + .chain(remove_l.all_ids().into_iter()) + .unique() + .collect(); + let obsolete_ids = check_fn(&all_ids); + for segment in insert_l.invalidates_propose_ids(&obsolete_ids) { + if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) { + error!("Failed to send segment to dropping task: {e}"); + } + } + for segment in remove_l.invalidates_propose_ids(&obsolete_ids) { + if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) { + error!("Failed to send segment to dropping task: {e}"); + } + } + Ok(()) } +} - fn recover(&self) -> Result, WALError> { - todo!() +impl Drop for SpeculativePoolWAL { + #[allow(clippy::unwrap_used)] + fn drop(&mut self) { + // The task will exit after `drop_tx` is dropped + let _drop = self.drop_tx.take(); + if let Err(err) = self.drop_task_handle.take().unwrap().join() { + error!("Failed to join segment dropping task: {err:?}"); + } } } -struct InsertWAL { +struct WAL { /// WAL segments - segments: Vec, + segments: Vec>>, + /// The pipeline that pre-allocates files + // TODO: Fix conflict + pipeline: FilePipeline, /// Next segment id next_segment_id: u64, + /// The maximum size of this segment + max_segment_size: u64, } -struct RemoveWAL {} +impl WAL +where + T: SegmentAttr, + C: Serialize + DeserializeOwned, +{ + fn new(dir: impl AsRef, max_segment_size: u64) -> io::Result { + Ok(Self { + segments: Vec::new(), + pipeline: FilePipeline::new(dir.as_ref().into(), max_segment_size)?, + next_segment_id: 0, + max_segment_size, + }) + } + + fn write_frames(&mut self, item: Vec>) -> io::Result<()> { + let last_segment = self + .segments + .last_mut() + .unwrap_or_else(|| unreachable!("there should be at least on segment")); + last_segment.write_sync(item)?; + + if last_segment.is_full() { + self.open_new_segment()?; + } + + Ok(()) + } + + fn recover_frames(&mut self, dir: impl AsRef) -> Result>> { + let paths = get_file_paths_with_ext(dir, &segment::Insert::ext())?; + let lfiles: Vec<_> = paths + .into_iter() + .map(LockedFile::open_rw) + .collect::>()?; + let mut segments: Vec<_> = lfiles + .into_iter() + .map(|f| Segment::open(f, self.max_segment_size, WALCodec::new(), T::r#type())) + .collect::>()?; + + let logs: Vec<_> = segments + .iter_mut() + .map(Segment::recover::) + .map(|result| result.map_err(Into::into)) + .collect::>()?; + + segments.sort_unstable(); + self.next_segment_id = segments.last().map(Segment::segment_id).unwrap_or(0); + self.segments = segments; + + Ok(logs.into_iter().flatten().collect()) + } + + /// Opens a new WAL segment + fn open_new_segment(&mut self) -> io::Result<()> { + let lfile = self + .pipeline + .next() + .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; + + let segment = Segment::create( + lfile, + self.next_segment_id, + self.max_segment_size, + WALCodec::new(), + T::r#type(), + )?; + + self.segments.push(segment); + self.next_segment_id = self.next_segment_id.overflow_add(1); + + Ok(()) + } + + /// Gets all propose ids stored in this WAL + fn all_ids(&self) -> Vec { + self.segments + .iter() + .map(Segment::propose_ids) + .flatten() + .collect() + } + + /// Invalidates propose ids + fn invalidates_propose_ids( + &mut self, + propose_ids: &[ProposeId], + ) -> Vec>> { + let mut to_remove = Vec::new(); + for (pos, segment) in &mut self.segments.iter_mut().enumerate() { + if segment.invalidate_propose_ids(&propose_ids) { + to_remove.push(pos); + } + } + to_remove + .into_iter() + .map(|pos| self.segments.remove(pos)) + .collect() + } +} + +impl WAL +where + C: Serialize + DeserializeOwned, +{ + fn insert(&mut self, entries: Vec>) -> io::Result<()> { + self.write_frames(entries.into_iter().map(Into::into).collect()) + } + + fn recover(&mut self, dir: impl AsRef) -> Result>> { + Ok(self + .recover_frames(dir)? + .into_iter() + .filter_map(|frame| match frame { + DataFrame::Insert { propose_id, cmd } => Some((propose_id, cmd)), + DataFrame::Remove(_) => None, + }) + .collect()) + } +} + +impl WAL +where + C: Serialize + DeserializeOwned, +{ + fn remove(&mut self, ids: Vec) -> io::Result<()> { + self.write_frames(ids.into_iter().map(DataFrame::Remove).collect()) + } + + fn recover(&mut self, dir: impl AsRef) -> Result> { + Ok(self + .recover_frames(dir)? + .into_iter() + .filter_map(|frame| match frame { + DataFrame::Insert { .. } => None, + DataFrame::Remove(propose_id) => Some(propose_id), + }) + .collect()) + } +} + +impl From> for DataFrame { + fn from(entry: PoolEntry) -> Self { + DataFrame::Insert { + propose_id: entry.id, + cmd: entry.cmd, + } + } +} diff --git a/crates/curp/src/server/sp_storage/segment.rs b/crates/curp/src/server/sp_storage/segment.rs index 83de5c887..4d4e49351 100644 --- a/crates/curp/src/server/sp_storage/segment.rs +++ b/crates/curp/src/server/sp_storage/segment.rs @@ -1,10 +1,14 @@ use std::{ + collections::HashSet, fs::File, io::{self, Read, Write}, + path::PathBuf, }; use clippy_utilities::{NumericCast, OverflowArithmetic}; +use itertools::Itertools; use sha2::Sha256; +use tracing::error; use utils::wal::{ framed::{Decoder, Encoder}, get_checksum, parse_u64, validate_data, LockedFile, @@ -29,12 +33,19 @@ const WAL_HEADER_SIZE: usize = 48; pub trait SegmentAttr { /// Segment file extension fn ext() -> String; + /// The type of this segment + fn r#type() -> Self; } // TODO: merge reusable wal code -struct Segment { +/// WAL segment +/// +/// The underlying file of this segment will be removed on drop. +pub(super) struct Segment { /// The opened file of this segment file: File, + /// The path of the segment file, + path: PathBuf, /// The id of this segment segment_id: u64, /// The soft size limit of this segment @@ -42,7 +53,7 @@ struct Segment { /// The file size of the segment size: u64, /// Propose ids of this segment - propose_ids: Vec, + propose_ids: HashSet, /// Codec of this segment codec: Codec, /// Type of this Segment @@ -63,6 +74,7 @@ where ) -> io::Result { let segment_name = Self::segment_name(segment_id); let lfile = tmp_file.rename(segment_name)?; + let path = lfile.path(); let mut file = lfile.into_std(); file.write_all(&Self::gen_header(segment_id))?; file.flush()?; @@ -70,10 +82,11 @@ where Ok(Self { file, + path, segment_id, size_limit, size: 0, - propose_ids: Vec::new(), + propose_ids: HashSet::new(), codec, r#type, }) @@ -86,6 +99,7 @@ where codec: Codec, r#type: T, ) -> Result { + let path = lfile.path(); let mut file = lfile.into_std(); let size = file.metadata()?.len(); let mut buf = vec![0; WAL_HEADER_SIZE]; @@ -94,10 +108,11 @@ where Ok(Self { file, + path, segment_id, size_limit, size, - propose_ids: Vec::new(), + propose_ids: HashSet::new(), codec, r#type, }) @@ -198,24 +213,104 @@ impl Segment { } /// Recover all entries of this segment - fn recover(&mut self) -> Result, WALError> + pub(super) fn recover(&mut self) -> Result>, WALError> where Codec: Decoder>, Error = WALError>, { let frames: Vec<_> = self.get_all()?.into_iter().flatten().collect(); - for frame in frames { - match frame { - DataFrame::Insert { propose_id, cmd } => todo!(), - DataFrame::Remove(_) => todo!(), - } + assert!( + frames.iter().map(std::mem::discriminant).all_equal(), + "Recovered frames containing different variants" + ); + self.propose_ids = frames.iter().map(DataFrame::propose_id).collect(); + + Ok(frames) + } + + /// Gets all propose ids stored in this WAL + pub(super) fn propose_ids(&self) -> Vec { + self.propose_ids.clone().into_iter().collect() + } + + /// Remove invalid propose ids + /// + /// Returns `true` if this segment is obsolete and can be removed + pub(super) fn invalidate_propose_ids(&mut self, propose_ids: &[ProposeId]) -> bool { + for id in propose_ids { + let _ignore = self.propose_ids.remove(id); } + self.is_obsolete() + } + + /// Returns `true` if this segment is obsolete and can be removed + pub(super) fn is_obsolete(&self) -> bool { + self.propose_ids.is_empty() + } + + /// Checks if the segment is full + pub(super) fn is_full(&self) -> bool { + self.size >= self.size_limit + } + + /// Gets the segment id + pub(super) fn segment_id(&self) -> u64 { + self.segment_id + } +} + +impl Drop for Segment { + fn drop(&mut self) { + if let Err(err) = std::fs::remove_file(&self.path) { + error!("Failed to remove segment file: {err}"); + } + } +} + +impl PartialEq for Segment { + fn eq(&self, other: &Self) -> bool { + self.segment_id.eq(&other.segment_id) } } -struct Insert {} +impl Eq for Segment {} + +impl PartialOrd for Segment { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.segment_id.cmp(&other.segment_id)) + } +} + +impl Ord for Segment { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.segment_id.cmp(&other.segment_id) + } +} + +pub(super) struct Insert; impl SegmentAttr for Insert { fn ext() -> String { ".inswal".to_string() } + + fn r#type() -> Insert { + Insert + } +} + +pub(super) struct Remove; + +impl SegmentAttr for Remove { + fn ext() -> String { + ".rmwal".to_string() + } + + fn r#type() -> Remove { + Remove + } +} + +pub(super) enum ToDrop { + Insert(Segment), + Remove(Segment), } From 6f8c1001b5b668e803a02af768ccbe0e96328b20 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Sat, 11 May 2024 14:26:33 +0800 Subject: [PATCH 06/11] test: add sp wal tests Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/sp_storage/mod.rs | 4 ++++ crates/curp/src/server/sp_storage/tests.rs | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) create mode 100644 crates/curp/src/server/sp_storage/tests.rs diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_storage/mod.rs index ee3955d21..84936bb36 100644 --- a/crates/curp/src/server/sp_storage/mod.rs +++ b/crates/curp/src/server/sp_storage/mod.rs @@ -35,6 +35,10 @@ mod config; /// WAL segment mod segment; +/// WAL tests +#[cfg(test)] +mod tests; + /// WAL Result type Result = std::result::Result; diff --git a/crates/curp/src/server/sp_storage/tests.rs b/crates/curp/src/server/sp_storage/tests.rs new file mode 100644 index 000000000..d29fa327c --- /dev/null +++ b/crates/curp/src/server/sp_storage/tests.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use curp_test_utils::test_cmd::TestCommand; + +use crate::rpc::{PoolEntry, ProposeId}; + +use super::{config::WALConfig, PoolWALOps, SpeculativePoolWAL}; + +#[test] +fn it_works() { + let dir = tempfile::tempdir().unwrap(); + let config = WALConfig::new(dir) + .with_max_insert_segment_size(512) + .with_max_remove_segment_size(32); + let wal = SpeculativePoolWAL::::new(config).unwrap(); + let cmd = TestCommand::new_put(vec![1], 0); + let entry = PoolEntry::new(ProposeId(0, 1), Arc::new(cmd)); + wal.insert(vec![entry]).unwrap(); +} From 994edf4c90685831d9ac235bdb68ea5497ad460a Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Sat, 11 May 2024 14:42:41 +0800 Subject: [PATCH 07/11] chore: fix curp wal imports Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/storage/wal/codec.rs | 10 +++++----- crates/curp/src/server/storage/wal/mod.rs | 9 --------- crates/curp/src/server/storage/wal/segment.rs | 11 +++++++---- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/crates/curp/src/server/storage/wal/codec.rs b/crates/curp/src/server/storage/wal/codec.rs index b891ccbd1..285b31074 100644 --- a/crates/curp/src/server/storage/wal/codec.rs +++ b/crates/curp/src/server/storage/wal/codec.rs @@ -5,12 +5,12 @@ use curp_external_api::LogIndex; use serde::{de::DeserializeOwned, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; - -use super::{ - error::{CorruptType, WALError}, +use utils::wal::{ framed::{Decoder, Encoder}, - util::{get_checksum, validate_data}, + get_checksum, }; + +use super::error::{CorruptType, WALError}; use crate::log_entry::LogEntry; /// Invalid frame type @@ -280,7 +280,7 @@ impl CommitFrame { /// Creates a commit frame of data fn new_from_data(data: &[u8]) -> Self { Self { - checksum: get_checksum(data).to_vec(), + checksum: get_checksum::(data).to_vec(), } } diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index 4a97f2195..a8065b9b4 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -6,18 +6,9 @@ mod codec; /// WAL errors mod error; -/// File pipeline -mod pipeline; - /// WAL segment mod segment; -/// File utils -mod util; - -/// Framed traits -mod framed; - /// The magic of the WAL file const WAL_MAGIC: u32 = 0xd86e_0be2; diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 217d1f066..ac459901c 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -11,17 +11,20 @@ use clippy_utilities::{NumericCast, OverflowArithmetic}; use curp_external_api::LogIndex; use futures::{ready, FutureExt, SinkExt}; use serde::{de::DeserializeOwned, Serialize}; +use sha2::Sha256; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, sync::Mutex, }; use tokio_stream::StreamExt; +use utils::wal::{ + framed::{Decoder, Encoder}, + get_checksum, parse_u64, validate_data, LockedFile, +}; use super::{ codec::{DataFrame, WAL}, error::{CorruptType, WALError}, - framed::{Decoder, Encoder}, - util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; use crate::log_entry::LogEntry; @@ -242,7 +245,7 @@ impl WALSegment { buf.push(WAL_VERSION); buf.extend(base_index.to_le_bytes()); buf.extend(segment_id.to_le_bytes()); - buf.extend(get_checksum(&buf)); + buf.extend(get_checksum::(&buf)); buf } @@ -272,7 +275,7 @@ impl WALSegment { let segment_id = parse_u64(next_field(8)); let checksum = next_field(32); - if !validate_data(&src[0..24], checksum) { + if !validate_data::(&src[0..24], checksum) { return parse_error; } From de4ab231c1c6a6b53941dd4ba16b9c3b235da5ba Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Sat, 11 May 2024 14:54:00 +0800 Subject: [PATCH 08/11] chore: fix pool entry after rebase Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/sp_storage/mod.rs | 12 ++++++++---- crates/curp/src/server/sp_storage/segment.rs | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_storage/mod.rs index 84936bb36..ebab13e70 100644 --- a/crates/curp/src/server/sp_storage/mod.rs +++ b/crates/curp/src/server/sp_storage/mod.rs @@ -1,3 +1,4 @@ +#![allow(unused)] // TODO: remove this use std::{ collections::{HashMap, HashSet}, io, @@ -14,7 +15,7 @@ use sha2::Sha256; use tracing::{debug, error}; use utils::wal::{get_file_paths_with_ext, pipeline::FilePipeline, LockedFile}; -use crate::rpc::{PoolEntry, ProposeId}; +use crate::rpc::{PoolEntry, PoolEntryInner, ProposeId}; use self::{ codec::DataFrame, @@ -342,9 +343,12 @@ where impl From> for DataFrame { fn from(entry: PoolEntry) -> Self { - DataFrame::Insert { - propose_id: entry.id, - cmd: entry.cmd, + match entry.inner { + PoolEntryInner::Command(cmd) => DataFrame::Insert { + propose_id: entry.id, + cmd, + }, + PoolEntryInner::ConfChange(_) => unreachable!("should not insert conf change entry"), } } } diff --git a/crates/curp/src/server/sp_storage/segment.rs b/crates/curp/src/server/sp_storage/segment.rs index 4d4e49351..07786c1e5 100644 --- a/crates/curp/src/server/sp_storage/segment.rs +++ b/crates/curp/src/server/sp_storage/segment.rs @@ -30,7 +30,7 @@ const WAL_VERSION: u8 = 0x00; /// The size of wal file header in bytes const WAL_HEADER_SIZE: usize = 48; -pub trait SegmentAttr { +pub(super) trait SegmentAttr { /// Segment file extension fn ext() -> String; /// The type of this segment From 076de85bb6e521c16bd6637fb99913433c93db66 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 13 May 2024 09:38:46 +0800 Subject: [PATCH 09/11] WIP: sp wal Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/mod.rs | 4 +- crates/curp/src/server/sp_storage/tests.rs | 19 -- .../server/{sp_storage => sp_wal}/codec.rs | 19 +- .../server/{sp_storage => sp_wal}/config.rs | 0 .../server/{sp_storage => sp_wal}/error.rs | 12 +- .../src/server/{sp_storage => sp_wal}/mod.rs | 80 ++++---- .../server/{sp_storage => sp_wal}/segment.rs | 41 ++-- crates/curp/src/server/sp_wal/tests.rs | 183 ++++++++++++++++++ 8 files changed, 276 insertions(+), 82 deletions(-) delete mode 100644 crates/curp/src/server/sp_storage/tests.rs rename crates/curp/src/server/{sp_storage => sp_wal}/codec.rs (95%) rename crates/curp/src/server/{sp_storage => sp_wal}/config.rs (100%) rename crates/curp/src/server/{sp_storage => sp_wal}/error.rs (73%) rename crates/curp/src/server/{sp_storage => sp_wal}/mod.rs (84%) rename crates/curp/src/server/{sp_storage => sp_wal}/segment.rs (88%) create mode 100644 crates/curp/src/server/sp_wal/tests.rs diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index bc9957c9e..fdd6b73aa 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -54,8 +54,8 @@ mod lease_manager; /// Curp metrics mod metrics; -/// Speculative pool storage -mod sp_storage; +/// Speculative pool WAL +mod sp_wal; pub use storage::{db::DB, StorageApi, StorageError}; diff --git a/crates/curp/src/server/sp_storage/tests.rs b/crates/curp/src/server/sp_storage/tests.rs deleted file mode 100644 index d29fa327c..000000000 --- a/crates/curp/src/server/sp_storage/tests.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::sync::Arc; - -use curp_test_utils::test_cmd::TestCommand; - -use crate::rpc::{PoolEntry, ProposeId}; - -use super::{config::WALConfig, PoolWALOps, SpeculativePoolWAL}; - -#[test] -fn it_works() { - let dir = tempfile::tempdir().unwrap(); - let config = WALConfig::new(dir) - .with_max_insert_segment_size(512) - .with_max_remove_segment_size(32); - let wal = SpeculativePoolWAL::::new(config).unwrap(); - let cmd = TestCommand::new_put(vec![1], 0); - let entry = PoolEntry::new(ProposeId(0, 1), Arc::new(cmd)); - wal.insert(vec![entry]).unwrap(); -} diff --git a/crates/curp/src/server/sp_storage/codec.rs b/crates/curp/src/server/sp_wal/codec.rs similarity index 95% rename from crates/curp/src/server/sp_storage/codec.rs rename to crates/curp/src/server/sp_wal/codec.rs index 5d575c949..34fece6f3 100644 --- a/crates/curp/src/server/sp_storage/codec.rs +++ b/crates/curp/src/server/sp_wal/codec.rs @@ -128,13 +128,15 @@ where type Error = WALError; fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error> { - let mut current = 0; - loop { - let Some((frame, len)) = WALFrame::::decode(&src[current..])? else { - return Err(WALError::UnexpectedEof); + let mut cursor = 0; + while cursor < src.len() { + let next = src.get(cursor..).ok_or(WALError::MaybeEnded)?; + let Some((frame, len)) = WALFrame::::decode(next)? else { + return Err(WALError::MaybeEnded); }; - let decoded_bytes = &src[current..current + len]; - current += len; + let decoded_bytes = src.get(cursor..cursor + len).ok_or(WALError::MaybeEnded)?; + cursor += len; + match frame { WALFrame::Data(data) => { self.frames.push(data); @@ -144,12 +146,13 @@ where let checksum = self.hasher.clone().finalize(); Digest::reset(&mut self.hasher); if commit.validate(&checksum) { - return Ok((self.frames.drain(..).collect(), current)); + return Ok((self.frames.drain(..).collect(), cursor)); } return Err(WALError::Corrupted(CorruptType::Checksum)); } } } + Err(WALError::MaybeEnded) } } @@ -188,7 +191,7 @@ where fn decode(src: &[u8]) -> Result, WALError> { let frame_type = src[0]; match frame_type { - INVALID => Err(WALError::UnexpectedEof), + INVALID => Err(WALError::MaybeEnded), INSERT => Self::decode_insert(&src), REMOVE => Self::decode_remove(&src), COMMIT => Self::decode_commit(&src), diff --git a/crates/curp/src/server/sp_storage/config.rs b/crates/curp/src/server/sp_wal/config.rs similarity index 100% rename from crates/curp/src/server/sp_storage/config.rs rename to crates/curp/src/server/sp_wal/config.rs diff --git a/crates/curp/src/server/sp_storage/error.rs b/crates/curp/src/server/sp_wal/error.rs similarity index 73% rename from crates/curp/src/server/sp_storage/error.rs rename to crates/curp/src/server/sp_wal/error.rs index 3e4f899ba..840a61505 100644 --- a/crates/curp/src/server/sp_storage/error.rs +++ b/crates/curp/src/server/sp_wal/error.rs @@ -5,8 +5,12 @@ use thiserror::Error; /// Errors of the `WALStorage` #[derive(Debug, Error)] pub(crate) enum WALError { + /// The WAL segment might reach on end + /// + /// NOTE: This exists because we cannot tell the difference between a corrupted WAL + /// and a normally ended WAL, as the segment files are all preallocated with zeros #[error("WAL ended")] - UnexpectedEof, + MaybeEnded, /// The WAL corrupt error #[error("WAL corrupted: {0}")] Corrupted(CorruptType), @@ -34,7 +38,7 @@ impl WALError { match self { WALError::Corrupted(e) => Ok(e), WALError::IO(e) => return Err(e), - WALError::UnexpectedEof => unreachable!("Should not call on WALError::MaybeEnded"), + WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"), } } } @@ -42,9 +46,7 @@ impl WALError { impl From for io::Error { fn from(err: WALError) -> Self { match err { - WALError::UnexpectedEof => { - io::Error::new(io::ErrorKind::UnexpectedEof, err.to_string()) - } + WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"), WALError::Corrupted(_) => io::Error::new(io::ErrorKind::InvalidData, err.to_string()), WALError::IO(e) => e, } diff --git a/crates/curp/src/server/sp_storage/mod.rs b/crates/curp/src/server/sp_wal/mod.rs similarity index 84% rename from crates/curp/src/server/sp_storage/mod.rs rename to crates/curp/src/server/sp_wal/mod.rs index ebab13e70..2504ea055 100644 --- a/crates/curp/src/server/sp_storage/mod.rs +++ b/crates/curp/src/server/sp_wal/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use clippy_utilities::OverflowArithmetic; -use curp_external_api::cmd::Command; +use curp_external_api::cmd::{Command, ConflictCheck}; use itertools::Itertools; use parking_lot::Mutex; use serde::{de::DeserializeOwned, Serialize}; @@ -81,7 +81,7 @@ struct SpeculativePoolWAL { impl SpeculativePoolWAL where - C: Serialize + DeserializeOwned + Send + Sync + 'static, + C: Command, { #[allow(unused)] fn new(config: WALConfig) -> io::Result { @@ -89,7 +89,7 @@ where std::fs::create_dir_all(&config.insert_dir)?; } if !config.remove_dir.try_exists()? { - std::fs::create_dir_all(&config.insert_dir)?; + std::fs::create_dir_all(&config.remove_dir)?; } let (drop_tx, drop_rx) = flume::unbounded(); let handle = Self::spawn_dropping_task(drop_rx); @@ -118,11 +118,27 @@ where ToDrop::Insert(_) => debug!("Removing insert segment file"), ToDrop::Remove(_) => debug!("Removing remove segment file"), } - // The segment will be removed on drop - drop(segment); + if let Err(err) = std::fs::remove_file(segment.path()) { + error!("Failed to remove segment file: {err}"); + } } }) } + + /// Keeps only commute commands + fn keep_commute_cmds(mut entries: Vec>) -> Vec> { + let commute = |entry: &PoolEntry, others: &[PoolEntry]| { + !others.iter().any(|e| e.is_conflict(&entry)) + }; + // start from last element + entries.reverse(); + let keep = entries + .iter() + .enumerate() + .take_while(|(i, ref e)| commute(*e, &entries[..*i])) + .count(); + entries.drain(..keep).collect() + } } impl PoolWALOps for SpeculativePoolWAL { @@ -139,16 +155,11 @@ impl PoolWALOps for SpeculativePoolWAL { let mut remove_l = self.remove.lock(); let mut cmds = insert_l.recover(&self.config.insert_dir)?; let removed = remove_l.recover(&self.config.remove_dir)?; - let mut to_invalidate = Vec::new(); - for id in removed { - if cmds.remove(&id).is_none() { - to_invalidate.push(id); - } - } - Ok(cmds + let entries = cmds .into_iter() - .map(|(id, cmd)| PoolEntry::new(id, cmd)) - .collect()) + .filter_map(|(id, cmd)| (!removed.contains(&id)).then_some(PoolEntry::new(id, cmd))) + .collect(); + Ok(Self::keep_commute_cmds(entries)) } fn gc(&self, check_fn: F) -> io::Result<()> @@ -157,19 +168,12 @@ impl PoolWALOps for SpeculativePoolWAL { { let mut insert_l = self.insert.lock(); let mut remove_l = self.remove.lock(); - let all_ids: Vec<_> = insert_l - .all_ids() - .into_iter() - .chain(remove_l.all_ids().into_iter()) - .unique() - .collect(); - let obsolete_ids = check_fn(&all_ids); - for segment in insert_l.invalidates_propose_ids(&obsolete_ids) { + for segment in insert_l.gc(&check_fn) { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) { error!("Failed to send segment to dropping task: {e}"); } } - for segment in remove_l.invalidates_propose_ids(&obsolete_ids) { + for segment in remove_l.gc(&check_fn) { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) { error!("Failed to send segment to dropping task: {e}"); } @@ -182,7 +186,7 @@ impl Drop for SpeculativePoolWAL { #[allow(clippy::unwrap_used)] fn drop(&mut self) { // The task will exit after `drop_tx` is dropped - let _drop = self.drop_tx.take(); + drop(self.drop_tx.take()); if let Err(err) = self.drop_task_handle.take().unwrap().join() { error!("Failed to join segment dropping task: {err:?}"); } @@ -230,7 +234,7 @@ where } fn recover_frames(&mut self, dir: impl AsRef) -> Result>> { - let paths = get_file_paths_with_ext(dir, &segment::Insert::ext())?; + let paths = get_file_paths_with_ext(dir, &T::ext())?; let lfiles: Vec<_> = paths .into_iter() .map(LockedFile::open_rw) @@ -239,16 +243,15 @@ where .into_iter() .map(|f| Segment::open(f, self.max_segment_size, WALCodec::new(), T::r#type())) .collect::>()?; - + segments.sort_unstable(); let logs: Vec<_> = segments .iter_mut() .map(Segment::recover::) - .map(|result| result.map_err(Into::into)) - .collect::>()?; + .collect::>()?; - segments.sort_unstable(); self.next_segment_id = segments.last().map(Segment::segment_id).unwrap_or(0); self.segments = segments; + self.open_new_segment(); Ok(logs.into_iter().flatten().collect()) } @@ -283,14 +286,19 @@ where .collect() } - /// Invalidates propose ids - fn invalidates_propose_ids( - &mut self, - propose_ids: &[ProposeId], - ) -> Vec>> { + /// Try GC by propose ids + /// + /// The `check_fn` should filter out obsolete propose ids + fn gc(&mut self, check_fn: &F) -> Vec>> + where + F: Fn(&[ProposeId]) -> &[ProposeId], + { + let all_ids = self.all_ids(); + let obsolete_ids = check_fn(&all_ids); + let mut to_remove = Vec::new(); for (pos, segment) in &mut self.segments.iter_mut().enumerate() { - if segment.invalidate_propose_ids(&propose_ids) { + if segment.invalidate_propose_ids(&obsolete_ids) { to_remove.push(pos); } } @@ -309,7 +317,7 @@ where self.write_frames(entries.into_iter().map(Into::into).collect()) } - fn recover(&mut self, dir: impl AsRef) -> Result>> { + fn recover(&mut self, dir: impl AsRef) -> Result)>> { Ok(self .recover_frames(dir)? .into_iter() diff --git a/crates/curp/src/server/sp_storage/segment.rs b/crates/curp/src/server/sp_wal/segment.rs similarity index 88% rename from crates/curp/src/server/sp_storage/segment.rs rename to crates/curp/src/server/sp_wal/segment.rs index 07786c1e5..a26d7e013 100644 --- a/crates/curp/src/server/sp_storage/segment.rs +++ b/crates/curp/src/server/sp_wal/segment.rs @@ -2,7 +2,7 @@ use std::{ collections::HashSet, fs::File, io::{self, Read, Write}, - path::PathBuf, + path::{Path, PathBuf}, }; use clippy_utilities::{NumericCast, OverflowArithmetic}; @@ -171,7 +171,7 @@ where let segment_id = parse_u64(next_field(8)); let checksum = next_field(32); - if !validate_data::(&src[0..24], checksum) { + if !validate_data::(&src[0..16], checksum) { return parse_error; } @@ -195,17 +195,27 @@ impl Segment { } /// Gets all items from the segment - pub(super) fn get_all(&mut self) -> Result, Err> + pub(super) fn get_all(&mut self) -> Result, WALError> where - Err: From, - Codec: Decoder, + Codec: Decoder, { let mut buf = Vec::new(); let _ignore = self.file.read_to_end(&mut buf)?; let mut pos = 0; let mut entries = Vec::new(); while pos < buf.len() { - let (item, n) = self.codec.decode(&buf[pos..])?; + let (item, n) = match self.codec.decode(&buf[pos..]) { + Ok(d) => d, + Err(WALError::MaybeEnded) => { + if !buf[pos..].iter().all(|b| *b == 0) { + return Err(WALError::Corrupted(CorruptType::Codec( + "Read zero".to_owned(), + ))); + } + return Ok(entries); + } + Err(e) => return Err(e), + }; entries.push(item); pos += n; } @@ -256,13 +266,10 @@ impl Segment { pub(super) fn segment_id(&self) -> u64 { self.segment_id } -} -impl Drop for Segment { - fn drop(&mut self) { - if let Err(err) = std::fs::remove_file(&self.path) { - error!("Failed to remove segment file: {err}"); - } + /// Gets the file path of this segment + pub(super) fn path(&self) -> &Path { + self.path.as_path() } } @@ -314,3 +321,13 @@ pub(super) enum ToDrop { Insert(Segment), Remove(Segment), } + +impl ToDrop { + /// Gets the segment path + pub(super) fn path(&self) -> &Path { + match *self { + ToDrop::Insert(ref seg) => seg.path(), + ToDrop::Remove(ref seg) => seg.path(), + } + } +} diff --git a/crates/curp/src/server/sp_wal/tests.rs b/crates/curp/src/server/sp_wal/tests.rs new file mode 100644 index 000000000..46e999285 --- /dev/null +++ b/crates/curp/src/server/sp_wal/tests.rs @@ -0,0 +1,183 @@ +use std::{path::Path, sync::Arc}; + +use curp_test_utils::test_cmd::TestCommand; +use tempfile::TempDir; + +use crate::rpc::{PoolEntry, ProposeId}; + +use super::{config::WALConfig, PoolWALOps, SpeculativePoolWAL}; + +#[test] +fn wal_insert_should_work() { + let tmp_dir = tempfile::tempdir().unwrap(); + let wal = init_wal(&tmp_dir, &[]); + + let mut gen = EntryGenerator::new(); + for i in 0..50 { + wal.insert(gen.take(i)).unwrap(); + } +} + +#[test] +fn wal_insert_remove_should_work() { + const NUM_ENTRIES: usize = 1000; + const INSERT_CHUNK_SIZE: usize = 10; + const REMOVE_CHUNK_SIZE: usize = 5; + let tmp_dir = tempfile::tempdir().unwrap(); + let mut gen = EntryGenerator::new(); + let wal = init_wal(&tmp_dir, &[]); + + let entries = gen.take(NUM_ENTRIES); + for chunk in entries.chunks_exact(INSERT_CHUNK_SIZE) { + wal.insert(chunk.to_vec()).unwrap(); + } + for chunk in entries.chunks_exact(REMOVE_CHUNK_SIZE) { + wal.remove(chunk.into_iter().map(|e| e.id).collect()) + .unwrap(); + } +} + +#[test] +fn wal_insert_remove_in_different_thread_is_ok() { + const NUM_ENTRIES: usize = 1000; + let tmp_dir = tempfile::tempdir().unwrap(); + let wal = Arc::new(init_wal(&tmp_dir, &[])); + let wal_c = Arc::clone(&wal); + + let handle = std::thread::spawn(move || { + let mut gen = EntryGenerator::new(); + for e in gen.take(NUM_ENTRIES) { + wal.insert(vec![e]); + } + }); + + let mut gen = EntryGenerator::new(); + for e in gen.take(NUM_ENTRIES) { + wal_c.remove(vec![e.id]); + } + handle.join().unwrap(); +} + +#[test] +fn wal_insert_recover_is_ok() { + const NUM_ENTRIES: usize = 1000; + const INSERT_CHUNK_SIZE: usize = 10; + let temp_dir = tempfile::tempdir().unwrap(); + let mut gen = EntryGenerator::new(); + let wal = init_wal(&temp_dir, &[]); + + let entries = gen.take(NUM_ENTRIES); + for chunk in entries.chunks_exact(INSERT_CHUNK_SIZE) { + wal.insert(chunk.to_vec()).unwrap(); + } + drop(wal); + let _wal = init_wal(&temp_dir, &entries); +} + +#[test] +fn wal_should_recover_commute_cmds() { + const NUM_ENTRIES: usize = 1000; + const INSERT_CHUNK_SIZE: usize = 10; + let temp_dir = tempfile::tempdir().unwrap(); + let mut gen = EntryGenerator::new(); + let wal = init_wal(&temp_dir, &[]); + let cmds = vec![ + TestCommand::new_put(vec![0], 0), + TestCommand::new_put(vec![1], 0), + TestCommand::new_put(vec![2], 0), + TestCommand::new_put(vec![3], 0), + TestCommand::new_put(vec![4], 0), + TestCommand::new_put(vec![1], 0), + TestCommand::new_put(vec![2], 0), + ]; + let entries: Vec<_> = cmds + .into_iter() + .enumerate() + .map(|(i, c)| PoolEntry::new(ProposeId(i as u64, 0), Arc::new(c))) + .collect(); + for e in entries.clone() { + wal.insert(vec![e]).unwrap(); + } + drop(wal); + let _wal = init_wal(&temp_dir, &entries[3..7]); +} + +#[test] +fn wal_insert_remove_then_recover_is_ok() { + const NUM_ENTRIES: usize = 1000; + const INSERT_CHUNK_SIZE: usize = 10; + const REMOVE_CHUNK_SIZE: usize = 5; + let temp_dir = tempfile::tempdir().unwrap(); + let mut gen = EntryGenerator::new(); + let wal = init_wal(&temp_dir, &[]); + + let entries = gen.take(NUM_ENTRIES); + for chunk in entries.chunks_exact(INSERT_CHUNK_SIZE) { + wal.insert(chunk.to_vec()).unwrap(); + } + for chunk in entries[0..200].chunks_exact(REMOVE_CHUNK_SIZE) { + wal.remove(chunk.into_iter().map(|e| e.id).collect()) + .unwrap(); + } + drop(wal); + let _wal = init_wal(&temp_dir, &entries[200..]); +} + +#[test] +fn wal_gc_is_ok() {} + +fn init_wal( + dir: impl AsRef, + expect: &[PoolEntry], +) -> SpeculativePoolWAL { + let config = WALConfig::new(dir) + .with_max_insert_segment_size(512) + .with_max_remove_segment_size(32); + let wal = SpeculativePoolWAL::::new(config).unwrap(); + let mut recovered = wal.recover().unwrap(); + recovered.sort_unstable(); + let mut expect_sorted = expect.to_vec(); + expect_sorted.sort_unstable(); + assert_eq!( + &recovered, + &expect_sorted, + "recovered: {}, expect: {}", + recovered.len(), + expect.len() + ); + wal +} + +struct EntryGenerator { + next: u32, +} + +impl EntryGenerator { + fn new() -> Self { + Self { next: 0 } + } + + fn next(&mut self) -> PoolEntry { + self.next += 1; + let cmd = TestCommand::new_put(vec![self.next], 0); + PoolEntry::new(ProposeId(self.next as u64, 1), Arc::new(cmd)) + } + + fn take(&mut self, n: usize) -> Vec> { + std::iter::repeat_with(|| self.next()).take(n).collect() + } +} + +impl Eq for PoolEntry {} + +impl PartialOrd for PoolEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.id.cmp(&other.id)) + } +} + +impl Ord for PoolEntry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.id.cmp(&other.id) + } +} From e6303bfdc9a5f98722c80cc1ad079cb935c5f7c2 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 13 May 2024 16:26:22 +0800 Subject: [PATCH 10/11] fix: do not return error in wal pipeline when not exist Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/sp_wal/codec.rs | 70 ++++++++++++++ crates/curp/src/server/sp_wal/config.rs | 1 + crates/curp/src/server/sp_wal/error.rs | 13 --- crates/curp/src/server/sp_wal/mod.rs | 116 +++++++++++++++-------- crates/curp/src/server/sp_wal/segment.rs | 84 +++++++++++++--- crates/curp/src/server/sp_wal/tests.rs | 28 ++++-- crates/utils/src/wal/pipeline.rs | 17 +++- 7 files changed, 256 insertions(+), 73 deletions(-) diff --git a/crates/curp/src/server/sp_wal/codec.rs b/crates/curp/src/server/sp_wal/codec.rs index 34fece6f3..a6fa5516a 100644 --- a/crates/curp/src/server/sp_wal/codec.rs +++ b/crates/curp/src/server/sp_wal/codec.rs @@ -340,3 +340,73 @@ impl FrameEncoder for CommitFrame { bytes } } + +#[cfg(test)] +mod tests { + use sha2::Sha256; + + use super::*; + use crate::rpc::ProposeId; + + #[tokio::test] + async fn frame_encode_decode_is_ok() { + let mut codec = WAL::::new(); + let insert_frame = DataFrame::Insert { + propose_id: ProposeId(1, 2), + cmd: Arc::new(1), + }; + + let remove_frame = DataFrame::Remove(ProposeId(3, 4)); + let mut encoded = codec.encode(vec![insert_frame]).unwrap(); + encoded.extend_from_slice(&codec.encode(vec![remove_frame]).unwrap()); + + let (insert_frame_get, len) = codec.decode(&encoded).unwrap(); + let (remove_frame_get, _) = codec.decode(&encoded[len..]).unwrap(); + let DataFrame::Insert { + propose_id, + ref cmd, + } = insert_frame_get[0] + else { + panic!("frame should be type: DataFrame::Insert"); + }; + + let DataFrame::Remove(propose_id_remove) = remove_frame_get[0] else { + panic!("frame should be type: DataFrame::Remove"); + }; + + assert_eq!(propose_id, ProposeId(1, 2)); + assert_eq!(*cmd.as_ref(), 1); + assert_eq!(propose_id_remove, ProposeId(3, 4)); + } + + #[tokio::test] + async fn frame_zero_write_will_be_detected() { + let mut codec = WAL::::new(); + let insert_frame = DataFrame::Insert { + propose_id: ProposeId(1, 2), + cmd: Arc::new(1), + }; + let mut encoded = codec.encode(vec![insert_frame]).unwrap(); + encoded[0] = 0; + + let err = codec.decode(&encoded).unwrap_err(); + assert!(matches!(err, WALError::MaybeEnded), "error {err} not match"); + } + + #[tokio::test] + async fn frame_corrupt_will_be_detected() { + let mut codec = WAL::::new(); + let insert_frame = DataFrame::Insert { + propose_id: ProposeId(1, 2), + cmd: Arc::new(1), + }; + let mut encoded = codec.encode(vec![insert_frame]).unwrap(); + encoded[1] = 0; + + let err = codec.decode(&encoded).unwrap_err(); + assert!( + matches!(err, WALError::Corrupted(_)), + "error {err} not match" + ); + } +} diff --git a/crates/curp/src/server/sp_wal/config.rs b/crates/curp/src/server/sp_wal/config.rs index e57312d07..8c0f4b3c8 100644 --- a/crates/curp/src/server/sp_wal/config.rs +++ b/crates/curp/src/server/sp_wal/config.rs @@ -24,6 +24,7 @@ pub(crate) struct WALConfig { } impl WALConfig { + #[allow(unused)] /// Creates a new `WALConfig` pub(crate) fn new(dir: impl AsRef) -> Self { let mut insert_dir: PathBuf = dir.as_ref().into(); diff --git a/crates/curp/src/server/sp_wal/error.rs b/crates/curp/src/server/sp_wal/error.rs index 840a61505..ff8bcd3fe 100644 --- a/crates/curp/src/server/sp_wal/error.rs +++ b/crates/curp/src/server/sp_wal/error.rs @@ -28,19 +28,6 @@ pub(crate) enum CorruptType { /// Corrupt because of checksum failure #[error("Checksumming for the file has failed")] Checksum, - /// Corrupt because of some logs is missing - #[error("The recovered logs are not continue")] - LogNotContinue, -} - -impl WALError { - pub(super) fn io_or_corrupt(self) -> io::Result { - match self { - WALError::Corrupted(e) => Ok(e), - WALError::IO(e) => return Err(e), - WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"), - } - } } impl From for io::Error { diff --git a/crates/curp/src/server/sp_wal/mod.rs b/crates/curp/src/server/sp_wal/mod.rs index 2504ea055..ddf8ce7b5 100644 --- a/crates/curp/src/server/sp_wal/mod.rs +++ b/crates/curp/src/server/sp_wal/mod.rs @@ -1,14 +1,7 @@ -#![allow(unused)] // TODO: remove this -use std::{ - collections::{HashMap, HashSet}, - io, - path::Path, - sync::Arc, -}; +use std::{collections::HashSet, io, path::Path, sync::Arc}; use clippy_utilities::OverflowArithmetic; use curp_external_api::cmd::{Command, ConflictCheck}; -use itertools::Itertools; use parking_lot::Mutex; use serde::{de::DeserializeOwned, Serialize}; use sha2::Sha256; @@ -59,10 +52,10 @@ pub(crate) trait PoolWALOps { /// Try GC by propose ids /// - /// The `check_fn` should filter out obsolete propose ids + /// The `check_fn` should returns `true` if the propose id is obsolete fn gc(&self, check_fn: F) -> io::Result<()> where - F: Fn(&[ProposeId]) -> &[ProposeId]; + F: Fn(&ProposeId) -> bool; } /// WAL of speculative pool @@ -84,6 +77,7 @@ where C: Command, { #[allow(unused)] + /// Creates a new `SpeculativePoolWAL` fn new(config: WALConfig) -> io::Result { if !config.insert_dir.try_exists()? { std::fs::create_dir_all(&config.insert_dir)?; @@ -109,6 +103,7 @@ where }) } + /// Spawns the segment file removal task fn spawn_dropping_task( drop_rx: flume::Receiver>>, ) -> std::thread::JoinHandle<()> { @@ -147,13 +142,28 @@ impl PoolWALOps for SpeculativePoolWAL { } fn remove(&self, propose_ids: Vec) -> io::Result<()> { - self.remove.lock().remove(propose_ids) + let removed = self.insert.lock().remove_propose_ids(&propose_ids)?; + let removed_ids: Vec<_> = removed.iter().map(Segment::propose_ids).flatten().collect(); + for segment in removed { + if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) { + error!("Failed to send segment to dropping task: {e}"); + } + } + + let mut remove_l = self.remove.lock(); + let removed = remove_l.remove_propose_ids(&removed_ids)?; + for segment in removed { + if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) { + error!("Failed to send segment to dropping task: {e}"); + } + } + remove_l.remove(propose_ids) } fn recover(&self) -> io::Result>> { let mut insert_l = self.insert.lock(); let mut remove_l = self.remove.lock(); - let mut cmds = insert_l.recover(&self.config.insert_dir)?; + let cmds = insert_l.recover(&self.config.insert_dir)?; let removed = remove_l.recover(&self.config.remove_dir)?; let entries = cmds .into_iter() @@ -164,16 +174,16 @@ impl PoolWALOps for SpeculativePoolWAL { fn gc(&self, check_fn: F) -> io::Result<()> where - F: Fn(&[ProposeId]) -> &[ProposeId], + F: Fn(&ProposeId) -> bool, { let mut insert_l = self.insert.lock(); let mut remove_l = self.remove.lock(); - for segment in insert_l.gc(&check_fn) { + for segment in insert_l.gc(&check_fn)? { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) { error!("Failed to send segment to dropping task: {e}"); } } - for segment in remove_l.gc(&check_fn) { + for segment in remove_l.gc(&check_fn)? { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) { error!("Failed to send segment to dropping task: {e}"); } @@ -193,6 +203,22 @@ impl Drop for SpeculativePoolWAL { } } +#[cfg(test)] +impl SpeculativePoolWAL +where + C: Command, +{ + /// Propose ids of the `Insert` segment + fn insert_segment_ids(&self) -> Vec> { + self.insert.lock().segment_ids() + } + + /// Propose ids of the `Remove` segment + fn remove_segment_ids(&self) -> Vec> { + self.remove.lock().segment_ids() + } +} + struct WAL { /// WAL segments segments: Vec>>, @@ -210,6 +236,7 @@ where T: SegmentAttr, C: Serialize + DeserializeOwned, { + /// Creates a new `WAL` fn new(dir: impl AsRef, max_segment_size: u64) -> io::Result { Ok(Self { segments: Vec::new(), @@ -219,6 +246,7 @@ where }) } + /// Writes frames to the log fn write_frames(&mut self, item: Vec>) -> io::Result<()> { let last_segment = self .segments @@ -233,6 +261,7 @@ where Ok(()) } + /// Recovers all frames fn recover_frames(&mut self, dir: impl AsRef) -> Result>> { let paths = get_file_paths_with_ext(dir, &T::ext())?; let lfiles: Vec<_> = paths @@ -251,7 +280,7 @@ where self.next_segment_id = segments.last().map(Segment::segment_id).unwrap_or(0); self.segments = segments; - self.open_new_segment(); + self.open_new_segment()?; Ok(logs.into_iter().flatten().collect()) } @@ -277,35 +306,42 @@ where Ok(()) } - /// Gets all propose ids stored in this WAL - fn all_ids(&self) -> Vec { - self.segments - .iter() - .map(Segment::propose_ids) - .flatten() - .collect() - } - /// Try GC by propose ids /// /// The `check_fn` should filter out obsolete propose ids - fn gc(&mut self, check_fn: &F) -> Vec>> + fn gc(&mut self, check_fn: &F) -> Result>>> where - F: Fn(&[ProposeId]) -> &[ProposeId], + F: Fn(&ProposeId) -> bool, { - let all_ids = self.all_ids(); - let obsolete_ids = check_fn(&all_ids); + for segment in &mut self.segments { + segment.gc(check_fn); + } + self.remove_obsoletes() + } - let mut to_remove = Vec::new(); - for (pos, segment) in &mut self.segments.iter_mut().enumerate() { - if segment.invalidate_propose_ids(&obsolete_ids) { - to_remove.push(pos); - } + /// Removes invalid propose ids + fn remove_propose_ids(&mut self, ids: &[ProposeId]) -> Result>>> { + for segment in &mut self.segments { + segment.remove_propose_ids(ids); } - to_remove - .into_iter() - .map(|pos| self.segments.remove(pos)) - .collect() + self.remove_obsoletes() + } + + /// Remove obsolete segments + fn remove_obsoletes(&mut self) -> Result>>> { + let (to_remove, segments): (Vec<_>, Vec<_>) = + self.segments.drain(..).partition(Segment::is_obsolete); + self.segments = segments; + if self.segments.is_empty() { + self.open_new_segment()?; + } + Ok(to_remove) + } + + /// Returns the segment ids of this [`WAL`]. + #[cfg(test)] + fn segment_ids(&self) -> Vec> { + self.segments.iter().map(Segment::propose_ids).collect() } } @@ -313,10 +349,12 @@ impl WAL where C: Serialize + DeserializeOwned, { + /// Removes entries from this segment fn insert(&mut self, entries: Vec>) -> io::Result<()> { self.write_frames(entries.into_iter().map(Into::into).collect()) } + /// Recovers all entries fn recover(&mut self, dir: impl AsRef) -> Result)>> { Ok(self .recover_frames(dir)? @@ -333,10 +371,12 @@ impl WAL where C: Serialize + DeserializeOwned, { + /// Removes propose ids from this segment fn remove(&mut self, ids: Vec) -> io::Result<()> { self.write_frames(ids.into_iter().map(DataFrame::Remove).collect()) } + /// Recovers all propose ids fn recover(&mut self, dir: impl AsRef) -> Result> { Ok(self .recover_frames(dir)? diff --git a/crates/curp/src/server/sp_wal/segment.rs b/crates/curp/src/server/sp_wal/segment.rs index a26d7e013..da5a3813c 100644 --- a/crates/curp/src/server/sp_wal/segment.rs +++ b/crates/curp/src/server/sp_wal/segment.rs @@ -8,7 +8,6 @@ use std::{ use clippy_utilities::{NumericCast, OverflowArithmetic}; use itertools::Itertools; use sha2::Sha256; -use tracing::error; use utils::wal::{ framed::{Decoder, Encoder}, get_checksum, parse_u64, validate_data, LockedFile, @@ -57,7 +56,7 @@ pub(super) struct Segment { /// Codec of this segment codec: Codec, /// Type of this Segment - r#type: T, + _type: T, } impl Segment @@ -88,7 +87,7 @@ where size: 0, propose_ids: HashSet::new(), codec, - r#type, + _type: r#type, }) } @@ -114,7 +113,7 @@ where size, propose_ids: HashSet::new(), codec, - r#type, + _type: r#type, }) } @@ -237,19 +236,19 @@ impl Segment { Ok(frames) } - /// Gets all propose ids stored in this WAL - pub(super) fn propose_ids(&self) -> Vec { - self.propose_ids.clone().into_iter().collect() + /// GC invalid propose ids + pub(super) fn gc(&mut self, check_fn: F) + where + F: Fn(&ProposeId) -> bool, + { + self.propose_ids.retain(|id| !check_fn(id)); } - /// Remove invalid propose ids - /// - /// Returns `true` if this segment is obsolete and can be removed - pub(super) fn invalidate_propose_ids(&mut self, propose_ids: &[ProposeId]) -> bool { + /// Removes invalid propose ids + pub(super) fn remove_propose_ids(&mut self, propose_ids: &[ProposeId]) { for id in propose_ids { let _ignore = self.propose_ids.remove(id); } - self.is_obsolete() } /// Returns `true` if this segment is obsolete and can be removed @@ -271,6 +270,11 @@ impl Segment { pub(super) fn path(&self) -> &Path { self.path.as_path() } + + /// Gets all propose ids stored in this WAL + pub(super) fn propose_ids(&self) -> Vec { + self.propose_ids.clone().into_iter().collect() + } } impl PartialEq for Segment { @@ -331,3 +335,59 @@ impl ToDrop { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::server::sp_wal::codec; + + type TestSeg = Segment>; + + #[test] + fn gen_parse_header_is_correct() { + fn corrupt(mut header: Vec, pos: usize) -> Vec { + header[pos] ^= 1; + header + } + for id in 0..100 { + let header = TestSeg::gen_header(id); + let id_parsed = TestSeg::parse_header(&header).unwrap(); + assert_eq!(id, id_parsed); + for pos in 0..8 * 2 { + assert!(TestSeg::parse_header(&corrupt(header.clone(), pos)).is_err()); + } + } + } + + #[tokio::test] + async fn segment_log_recovery_is_ok() { + const SEGMENT_ID: u64 = 1; + const SIZE_LIMIT: u64 = 5; + let dir = tempfile::tempdir().unwrap(); + let mut tmp_path = dir.path().to_path_buf(); + tmp_path.push("test.tmp"); + let segment_name = TestSeg::segment_name(SEGMENT_ID); + let mut wal_path = dir.path().to_path_buf(); + wal_path.push(segment_name); + let file = LockedFile::open_rw(&tmp_path).unwrap(); + let mut segment = + TestSeg::create(file, SEGMENT_ID, SIZE_LIMIT, codec::WAL::new(), Insert).unwrap(); + + let frames: Vec<_> = (0..100) + .map(|i| DataFrame::Insert { + propose_id: ProposeId(i, 2), + cmd: Arc::new(i as i32), + }) + .collect(); + segment.write_sync(frames.clone()).unwrap(); + + drop(segment); + + let file = LockedFile::open_rw(wal_path).unwrap(); + let mut segment = TestSeg::open(file, SIZE_LIMIT, codec::WAL::new(), Insert).unwrap(); + let recovered: Vec<_> = segment.recover().unwrap(); + assert_eq!(frames, recovered); + } +} diff --git a/crates/curp/src/server/sp_wal/tests.rs b/crates/curp/src/server/sp_wal/tests.rs index 46e999285..276b29a78 100644 --- a/crates/curp/src/server/sp_wal/tests.rs +++ b/crates/curp/src/server/sp_wal/tests.rs @@ -1,7 +1,6 @@ -use std::{path::Path, sync::Arc}; +use std::{collections::HashSet, path::Path, sync::Arc}; use curp_test_utils::test_cmd::TestCommand; -use tempfile::TempDir; use crate::rpc::{PoolEntry, ProposeId}; @@ -47,13 +46,13 @@ fn wal_insert_remove_in_different_thread_is_ok() { let handle = std::thread::spawn(move || { let mut gen = EntryGenerator::new(); for e in gen.take(NUM_ENTRIES) { - wal.insert(vec![e]); + wal.insert(vec![e]).unwrap(); } }); let mut gen = EntryGenerator::new(); for e in gen.take(NUM_ENTRIES) { - wal_c.remove(vec![e.id]); + wal_c.remove(vec![e.id]).unwrap(); } handle.join().unwrap(); } @@ -79,7 +78,6 @@ fn wal_should_recover_commute_cmds() { const NUM_ENTRIES: usize = 1000; const INSERT_CHUNK_SIZE: usize = 10; let temp_dir = tempfile::tempdir().unwrap(); - let mut gen = EntryGenerator::new(); let wal = init_wal(&temp_dir, &[]); let cmds = vec![ TestCommand::new_put(vec![0], 0), @@ -124,7 +122,25 @@ fn wal_insert_remove_then_recover_is_ok() { } #[test] -fn wal_gc_is_ok() {} +fn wal_gc_is_ok() { + const NUM_ENTRIES: usize = 1000; + const INSERT_CHUNK_SIZE: usize = 10; + let tmp_dir = tempfile::tempdir().unwrap(); + let mut gen = EntryGenerator::new(); + let wal = init_wal(&tmp_dir, &[]); + + let entries = gen.take(NUM_ENTRIES); + for chunk in entries.chunks_exact(INSERT_CHUNK_SIZE) { + wal.insert(chunk.to_vec()).unwrap(); + } + let ids = wal.insert_segment_ids(); + + let to_gc: HashSet<_> = ids.iter().take(10).flatten().collect(); + let num = to_gc.len(); + wal.gc(|id| to_gc.contains(id)).unwrap(); + drop(wal); + let _wal = init_wal(&tmp_dir, &entries[num..]); +} fn init_wal( dir: impl AsRef, diff --git a/crates/utils/src/wal/pipeline.rs b/crates/utils/src/wal/pipeline.rs index 6cf2507ca..57acb49de 100644 --- a/crates/utils/src/wal/pipeline.rs +++ b/crates/utils/src/wal/pipeline.rs @@ -115,9 +115,18 @@ impl FilePipeline { fn clean_up(dir: &PathBuf) -> io::Result<()> { for result in std::fs::read_dir(dir)? { let file = result?; - if let Some(filename) = file.file_name().to_str() { - if filename.ends_with(TEMP_FILE_EXT) { - std::fs::remove_file(file.path())?; + if file + .file_name() + .to_str() + .map(|fname| fname.ends_with(TEMP_FILE_EXT)) + .unwrap_or(false) + { + if let Err(err) = std::fs::remove_file(file.path()) { + // The file has already been deleted, continue + if matches!(err.kind(), io::ErrorKind::NotFound) { + continue; + } + return Err(err); } } } @@ -161,7 +170,7 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size).unwrap(); - let check_size = |mut file: LockedFile| { + let check_size = |file: LockedFile| { let file = file.into_std(); assert_eq!(file.metadata().unwrap().len(), file_size,); }; From efa63a3006131a794e13fc9ccbaefc8ca22bdd52 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 14 May 2024 10:17:53 +0800 Subject: [PATCH 11/11] chore: fix clippy Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 6 --- crates/curp/Cargo.toml | 1 - crates/curp/src/server/sp_wal/codec.rs | 36 +++++++++++----- crates/curp/src/server/sp_wal/error.rs | 1 + crates/curp/src/server/sp_wal/mod.rs | 27 ++++++++---- crates/curp/src/server/sp_wal/segment.rs | 25 +++++++---- crates/utils/Cargo.toml | 2 - crates/utils/src/wal/framed.rs | 8 ++++ crates/utils/src/wal/mod.rs | 53 +++++++++++++++++++++--- crates/utils/src/wal/pipeline.rs | 15 ++++++- workspace-hack/Cargo.toml | 3 -- 11 files changed, 130 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd15cd73a..73d860c98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,7 +617,6 @@ dependencies = [ "engine", "event-listener", "flume", - "fs2", "futures", "indexmap 2.2.6", "itertools", @@ -3303,10 +3302,8 @@ dependencies = [ "clippy-utilities", "dashmap", "derive_builder", - "event-listener", "flume", "fs2", - "futures", "getset", "madsim-tokio", "madsim-tonic", @@ -3665,8 +3662,6 @@ dependencies = [ "bytes", "cc", "clap", - "crypto-common", - "digest", "either", "futures-channel", "futures-util", @@ -3683,7 +3678,6 @@ dependencies = [ "predicates", "serde", "serde_json", - "sha2", "syn 1.0.109", "syn 2.0.63", "time", diff --git a/crates/curp/Cargo.toml b/crates/curp/Cargo.toml index f4ee05b1d..69be99d26 100644 --- a/crates/curp/Cargo.toml +++ b/crates/curp/Cargo.toml @@ -24,7 +24,6 @@ derive_builder = "0.20.0" engine = { path = "../engine" } event-listener = "5.3.0" flume = "0.11.0" -fs2 = "0.4.3" futures = "0.3.21" indexmap = "2.2.6" itertools = "0.12" diff --git a/crates/curp/src/server/sp_wal/codec.rs b/crates/curp/src/server/sp_wal/codec.rs index a6fa5516a..495faf951 100644 --- a/crates/curp/src/server/sp_wal/codec.rs +++ b/crates/curp/src/server/sp_wal/codec.rs @@ -1,6 +1,6 @@ use std::{io, marker::PhantomData, sync::Arc}; -use clippy_utilities::NumericCast; +use clippy_utilities::{NumericCast, OverflowArithmetic}; use serde::{de::DeserializeOwned, Serialize}; use sha2::{digest::Reset, Digest}; use utils::wal::{ @@ -59,7 +59,12 @@ enum WALFrame { #[cfg_attr(test, derive(PartialEq))] pub(crate) enum DataFrame { /// A Frame containing a Insert entry - Insert { propose_id: ProposeId, cmd: Arc }, + Insert { + /// Propose Id + propose_id: ProposeId, + /// Command + cmd: Arc, + }, /// A Frame containing the Remove entry Remove(ProposeId), } @@ -68,8 +73,7 @@ impl DataFrame { /// Gets the propose id encoded in this frame pub(crate) fn propose_id(&self) -> ProposeId { match *self { - DataFrame::Insert { propose_id, .. } => propose_id, - DataFrame::Remove(propose_id) => propose_id, + DataFrame::Remove(propose_id) | DataFrame::Insert { propose_id, .. } => propose_id, } } } @@ -134,8 +138,10 @@ where let Some((frame, len)) = WALFrame::::decode(next)? else { return Err(WALError::MaybeEnded); }; - let decoded_bytes = src.get(cursor..cursor + len).ok_or(WALError::MaybeEnded)?; - cursor += len; + let decoded_bytes = src + .get(cursor..cursor.overflow_add(len)) + .ok_or(WALError::MaybeEnded)?; + cursor = cursor.overflow_add(len); match frame { WALFrame::Data(data) => { @@ -192,9 +198,9 @@ where let frame_type = src[0]; match frame_type { INVALID => Err(WALError::MaybeEnded), - INSERT => Self::decode_insert(&src), - REMOVE => Self::decode_remove(&src), - COMMIT => Self::decode_commit(&src), + INSERT => Self::decode_insert(src), + REMOVE => Self::decode_remove(src), + COMMIT => Self::decode_commit(src), _ => Err(WALError::Corrupted(CorruptType::Codec( "Unexpected frame type".to_owned(), ))), @@ -204,8 +210,9 @@ where /// Decodes an entry frame from source #[allow(clippy::unwrap_used)] fn decode_insert(mut src: &[u8]) -> Result, WALError> { + /// Size of the length encoded bytes const LEN_SIZE: usize = 8; - let Some(propose_id) = Self::decode_propose_id(&src) else { + let Some(propose_id) = Self::decode_propose_id(src) else { return Ok(None); }; src = &src[PROPOSE_ID_SIZE..]; @@ -232,12 +239,18 @@ where /// Decodes an seal index frame from source fn decode_remove(src: &[u8]) -> Result, WALError> { - Ok(Self::decode_propose_id(&src) + Ok(Self::decode_propose_id(src) .map(|id| WALFrame::Data(DataFrame::Remove(id))) .map(|frame| (frame, PROPOSE_ID_SIZE))) } /// Decodes data frame header + #[allow( + clippy::unwrap_used, + clippy::unwrap_in_result, + clippy::indexing_slicing, + clippy::missing_asserts_for_indexing + )] // The operations are checked fn decode_propose_id(src: &[u8]) -> Option { if src.len() < PROPOSE_ID_SIZE { return None; @@ -332,6 +345,7 @@ impl FrameType for CommitFrame { } impl FrameEncoder for CommitFrame { + #[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)] // Won't overflow fn encode(&self) -> Vec { let mut bytes = Vec::with_capacity(8 + self.checksum.len()); bytes.extend_from_slice(&[0; 8]); diff --git a/crates/curp/src/server/sp_wal/error.rs b/crates/curp/src/server/sp_wal/error.rs index ff8bcd3fe..3dbe17af0 100644 --- a/crates/curp/src/server/sp_wal/error.rs +++ b/crates/curp/src/server/sp_wal/error.rs @@ -31,6 +31,7 @@ pub(crate) enum CorruptType { } impl From for io::Error { + #[inline] fn from(err: WALError) -> Self { match err { WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"), diff --git a/crates/curp/src/server/sp_wal/mod.rs b/crates/curp/src/server/sp_wal/mod.rs index ddf8ce7b5..3c8b95d8d 100644 --- a/crates/curp/src/server/sp_wal/mod.rs +++ b/crates/curp/src/server/sp_wal/mod.rs @@ -121,16 +121,20 @@ where } /// Keeps only commute commands + #[allow( + clippy::indexing_slicing, // Slicings are checked + clippy::pattern_type_mismatch, // Can't be fixed + )] fn keep_commute_cmds(mut entries: Vec>) -> Vec> { let commute = |entry: &PoolEntry, others: &[PoolEntry]| { - !others.iter().any(|e| e.is_conflict(&entry)) + !others.iter().any(|e| e.is_conflict(entry)) }; // start from last element entries.reverse(); let keep = entries .iter() .enumerate() - .take_while(|(i, ref e)| commute(*e, &entries[..*i])) + .take_while(|(i, e)| commute(e, &entries[..*i])) .count(); entries.drain(..keep).collect() } @@ -141,18 +145,22 @@ impl PoolWALOps for SpeculativePoolWAL { self.insert.lock().insert(entries) } + #[allow(clippy::unwrap_used, clippy::unwrap_in_result)] fn remove(&self, propose_ids: Vec) -> io::Result<()> { - let removed = self.insert.lock().remove_propose_ids(&propose_ids)?; - let removed_ids: Vec<_> = removed.iter().map(Segment::propose_ids).flatten().collect(); - for segment in removed { + let removed_insert_ids = self.insert.lock().remove_propose_ids(&propose_ids)?; + let removed_ids: Vec<_> = removed_insert_ids + .iter() + .flat_map(Segment::propose_ids) + .collect(); + for segment in removed_insert_ids { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) { error!("Failed to send segment to dropping task: {e}"); } } let mut remove_l = self.remove.lock(); - let removed = remove_l.remove_propose_ids(&removed_ids)?; - for segment in removed { + let removed_remove_ids = remove_l.remove_propose_ids(&removed_ids)?; + for segment in removed_remove_ids { if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) { error!("Failed to send segment to dropping task: {e}"); } @@ -172,6 +180,7 @@ impl PoolWALOps for SpeculativePoolWAL { Ok(Self::keep_commute_cmds(entries)) } + #[allow(clippy::unwrap_used, clippy::unwrap_in_result)] fn gc(&self, check_fn: F) -> io::Result<()> where F: Fn(&ProposeId) -> bool, @@ -219,6 +228,8 @@ where } } +/// The WAL type +#[allow(clippy::upper_case_acronyms)] struct WAL { /// WAL segments segments: Vec>>, @@ -278,7 +289,7 @@ where .map(Segment::recover::) .collect::>()?; - self.next_segment_id = segments.last().map(Segment::segment_id).unwrap_or(0); + self.next_segment_id = segments.last().map_or(0, Segment::segment_id); self.segments = segments; self.open_new_segment()?; diff --git a/crates/curp/src/server/sp_wal/segment.rs b/crates/curp/src/server/sp_wal/segment.rs index da5a3813c..2bc95c31d 100644 --- a/crates/curp/src/server/sp_wal/segment.rs +++ b/crates/curp/src/server/sp_wal/segment.rs @@ -29,6 +29,7 @@ const WAL_VERSION: u8 = 0x00; /// The size of wal file header in bytes const WAL_HEADER_SIZE: usize = 48; +/// Segment attributes pub(super) trait SegmentAttr { /// Segment file extension fn ext() -> String; @@ -72,9 +73,9 @@ where r#type: T, ) -> io::Result { let segment_name = Self::segment_name(segment_id); - let lfile = tmp_file.rename(segment_name)?; - let path = lfile.path(); - let mut file = lfile.into_std(); + let locked_file = tmp_file.rename(segment_name)?; + let path = locked_file.path(); + let mut file = locked_file.into_std(); file.write_all(&Self::gen_header(segment_id))?; file.flush()?; file.sync_data()?; @@ -93,16 +94,16 @@ where /// Open an existing WAL segment file pub(super) fn open( - lfile: LockedFile, + locked_file: LockedFile, size_limit: u64, codec: Codec, r#type: T, ) -> Result { - let path = lfile.path(); - let mut file = lfile.into_std(); + let path = locked_file.path(); + let mut file = locked_file.into_std(); let size = file.metadata()?.len(); let mut buf = vec![0; WAL_HEADER_SIZE]; - let _ignore = file.read_exact(&mut buf)?; + file.read_exact(&mut buf)?; let segment_id = Self::parse_header(&buf)?; Ok(Self { @@ -194,6 +195,7 @@ impl Segment { } /// Gets all items from the segment + #[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)] // Operations are checked pub(super) fn get_all(&mut self) -> Result, WALError> where Codec: Decoder, @@ -297,11 +299,12 @@ impl Ord for Segment { } } +/// Insert Segment type pub(super) struct Insert; impl SegmentAttr for Insert { fn ext() -> String { - ".inswal".to_string() + ".inswal".to_owned() } fn r#type() -> Insert { @@ -309,11 +312,12 @@ impl SegmentAttr for Insert { } } +/// Remove Segment type pub(super) struct Remove; impl SegmentAttr for Remove { fn ext() -> String { - ".rmwal".to_string() + ".rmwal".to_owned() } fn r#type() -> Remove { @@ -321,8 +325,11 @@ impl SegmentAttr for Remove { } } +/// `Insert` or `Remove` segment send to dropping task pub(super) enum ToDrop { + /// Insert Insert(Segment), + /// Remove Remove(Segment), } diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index efc808360..ae76f6fca 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -22,10 +22,8 @@ async-trait = { version = "0.1.80", optional = true } clippy-utilities = "0.2.0" dashmap = "5.5.3" derive_builder = "0.20.0" -event-listener = "5.3.0" flume = "0.11.0" fs2 = "0.4.3" -futures = "0.3.30" getset = "0.1" opentelemetry = { version = "0.22.0", features = ["trace"] } opentelemetry_sdk = { version = "0.22.1", features = ["trace"] } diff --git a/crates/utils/src/wal/framed.rs b/crates/utils/src/wal/framed.rs index 96d3c5e90..55a5cf1f2 100644 --- a/crates/utils/src/wal/framed.rs +++ b/crates/utils/src/wal/framed.rs @@ -9,6 +9,10 @@ pub trait Decoder { type Error: From; /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// # Errors + /// + /// This function will return an error if decoding has failed. fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error>; } @@ -18,5 +22,9 @@ pub trait Encoder { type Error: From; /// Encodes a frame + /// + /// # Errors + /// + /// This function will return an error if encoding has failed. fn encode(&mut self, item: Item) -> Result, Self::Error>; } diff --git a/crates/utils/src/wal/mod.rs b/crates/utils/src/wal/mod.rs index 1ba4a7b34..d0e869737 100644 --- a/crates/utils/src/wal/mod.rs +++ b/crates/utils/src/wal/mod.rs @@ -24,6 +24,11 @@ pub struct LockedFile { impl LockedFile { /// Opens the file in read and append mode + /// + /// # Errors + /// + /// This function will return an error if file operations fail. + #[inline] pub fn open_rw(path: impl AsRef) -> io::Result { let file = OpenOptions::new() .create(true) @@ -39,6 +44,11 @@ impl LockedFile { } /// Pre-allocates the file + /// + /// # Errors + /// + /// This function will return an error if file operations fail. + #[inline] pub fn preallocate(&mut self, size: u64) -> io::Result<()> { if size == 0 { return Ok(()); @@ -48,6 +58,8 @@ impl LockedFile { } /// Gets the path of this file + #[inline] + #[must_use] pub fn path(&self) -> PathBuf { self.path.clone() } @@ -55,6 +67,11 @@ impl LockedFile { /// Renames the current file /// /// We will discard this file if the rename has failed + /// + /// # Errors + /// + /// This function will return an error if file operations fail. + #[inline] pub fn rename(mut self, new_name: impl AsRef) -> io::Result { let mut new_path = parent_dir(&self.path); new_path.push(new_name.as_ref()); @@ -68,6 +85,8 @@ impl LockedFile { } /// Converts self to std file + #[inline] + #[must_use] pub fn into_std(self) -> StdFile { let mut this = std::mem::ManuallyDrop::new(self); this.file @@ -84,6 +103,7 @@ impl LockedFile { } impl Drop for LockedFile { + #[inline] fn drop(&mut self) { if self.file.is_some() && is_exist(self.path()) { let _ignore = std::fs::remove_file(self.path()); @@ -91,8 +111,13 @@ impl Drop for LockedFile { } } -/// Gets the all files with the extension under the given folder -pub fn get_file_paths_with_ext(dir: impl AsRef, ext: &str) -> io::Result> { +/// Gets the all files with the extension under the given folder. +/// +/// # Errors +/// +/// This function will return an error if failed to read the given directory. +#[inline] +pub fn get_file_paths_with_ext>(dir: A, ext: &str) -> io::Result> { let mut files = vec![]; for result in std::fs::read_dir(dir)? { let file = result?; @@ -106,14 +131,20 @@ pub fn get_file_paths_with_ext(dir: impl AsRef, ext: &str) -> io::Result) -> PathBuf { +#[inline] +pub fn parent_dir>(dir: A) -> PathBuf { let mut parent = PathBuf::from(dir.as_ref()); let _ignore = parent.pop(); parent } /// Fsyncs the parent directory -pub fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { +/// +/// # Errors +/// +/// This function will return an error if directory operations fail. +#[inline] +pub fn sync_parent_dir>(dir: A) -> io::Result<()> { let parent_dir = parent_dir(&dir); let parent = std::fs::File::open(parent_dir)?; parent.sync_all()?; @@ -122,6 +153,8 @@ pub fn sync_parent_dir(dir: impl AsRef) -> io::Result<()> { } /// Gets the checksum of the slice, we use Sha256 as the hash function +#[inline] +#[must_use] pub fn get_checksum(data: &[u8]) -> Output { let mut hasher = H::new(); hasher.update(data); @@ -129,16 +162,26 @@ pub fn get_checksum(data: &[u8]) -> Output { } /// Validates the the data with the given checksum +#[inline] +#[must_use] pub fn validate_data(data: &[u8], checksum: &[u8]) -> bool { AsRef::<[u8]>::as_ref(&get_checksum::(data)) == checksum } /// Checks whether the file exist -pub fn is_exist(path: impl AsRef) -> bool { +#[inline] +#[must_use] +pub fn is_exist>(path: A) -> bool { std::fs::metadata(path).is_ok() } /// Parses a u64 from u8 slice +/// +/// # Panics +/// +/// Panics if `bytes_le` is not exactly 8 bytes long +#[inline] +#[must_use] pub fn parse_u64(bytes_le: &[u8]) -> u64 { assert_eq!(bytes_le.len(), 8, "The slice passed should be 8 bytes long"); u64::from_le_bytes( diff --git a/crates/utils/src/wal/pipeline.rs b/crates/utils/src/wal/pipeline.rs index 57acb49de..c5126f30e 100644 --- a/crates/utils/src/wal/pipeline.rs +++ b/crates/utils/src/wal/pipeline.rs @@ -15,6 +15,7 @@ use super::LockedFile; const TEMP_FILE_EXT: &str = ".tmp"; /// The file pipeline, used for pipelining the creation of temp file +#[allow(clippy::module_name_repetitions)] pub struct FilePipeline { /// The directory where the temp files are created dir: PathBuf, @@ -31,6 +32,11 @@ pub struct FilePipeline { impl FilePipeline { /// Creates a new `FilePipeline` + /// + /// # Errors + /// + /// This function will return an error if failed to clean up the given directory. + #[inline] pub fn new(dir: PathBuf, file_size: u64) -> io::Result { Self::clean_up(&dir)?; @@ -98,6 +104,7 @@ impl FilePipeline { } /// Stops the pipeline + #[inline] pub fn stop(&mut self) { self.stopped.store(true, Ordering::Relaxed); } @@ -118,8 +125,7 @@ impl FilePipeline { if file .file_name() .to_str() - .map(|fname| fname.ends_with(TEMP_FILE_EXT)) - .unwrap_or(false) + .is_some_and(|fname| fname.ends_with(TEMP_FILE_EXT)) { if let Err(err) = std::fs::remove_file(file.path()) { // The file has already been deleted, continue @@ -135,6 +141,7 @@ impl FilePipeline { } impl Drop for FilePipeline { + #[inline] fn drop(&mut self) { self.stop(); } @@ -143,6 +150,7 @@ impl Drop for FilePipeline { impl Iterator for FilePipeline { type Item = io::Result; + #[inline] fn next(&mut self) -> Option { if self.stopped.load(Ordering::Relaxed) { return None; @@ -151,11 +159,14 @@ impl Iterator for FilePipeline { } } +#[allow(clippy::missing_fields_in_debug)] // `flume::IntoIter` does not implement `Debug` impl std::fmt::Debug for FilePipeline { + #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FilePipeline") .field("dir", &self.dir) .field("file_size", &self.file_size) + .field("stopped", &self.stopped) .finish() } } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index b78fe3aac..8fe8b3a57 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -16,8 +16,6 @@ publish = false axum = { version = "0.6" } bytes = { version = "1" } clap = { version = "4", features = ["derive"] } -crypto-common = { version = "0.1", default-features = false, features = ["std"] } -digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1", default-features = false, features = ["use_std"] } futures-channel = { version = "0.3", features = ["sink"] } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } @@ -34,7 +32,6 @@ petgraph = { version = "0.6" } predicates = { version = "3", default-features = false, features = ["diff"] } serde = { version = "1", features = ["derive", "rc"] } serde_json = { version = "1", features = ["raw_value"] } -sha2 = { version = "0.10" } time = { version = "0.3", features = ["formatting", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "sync", "time"] } tokio-util = { version = "0.7", features = ["codec", "io"] }