Skip to content

Commit

Permalink
chore: fix clippy
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed May 14, 2024
1 parent e6303bf commit 42f058c
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 35 deletions.
36 changes: 25 additions & 11 deletions crates/curp/src/server/sp_wal/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{io, marker::PhantomData, sync::Arc};

use clippy_utilities::NumericCast;
use clippy_utilities::{NumericCast, OverflowArithmetic};
use serde::{de::DeserializeOwned, Serialize};
use sha2::{digest::Reset, Digest};
use utils::wal::{
Expand Down Expand Up @@ -59,7 +59,12 @@ enum WALFrame<C, H> {
#[cfg_attr(test, derive(PartialEq))]
pub(crate) enum DataFrame<C> {
/// A Frame containing a Insert entry
Insert { propose_id: ProposeId, cmd: Arc<C> },
Insert {
/// Propose Id
propose_id: ProposeId,
/// Command
cmd: Arc<C>,
},
/// A Frame containing the Remove entry
Remove(ProposeId),
}
Expand All @@ -68,8 +73,7 @@ impl<C> DataFrame<C> {
/// Gets the propose id encoded in this frame
pub(crate) fn propose_id(&self) -> ProposeId {
match *self {
DataFrame::Insert { propose_id, .. } => propose_id,
DataFrame::Remove(propose_id) => propose_id,
DataFrame::Remove(propose_id) | DataFrame::Insert { propose_id, .. } => propose_id,
}
}
}
Expand Down Expand Up @@ -134,8 +138,10 @@ where
let Some((frame, len)) = WALFrame::<C, H>::decode(next)? else {
return Err(WALError::MaybeEnded);
};
let decoded_bytes = src.get(cursor..cursor + len).ok_or(WALError::MaybeEnded)?;
cursor += len;
let decoded_bytes = src
.get(cursor..cursor.overflow_add(len))
.ok_or(WALError::MaybeEnded)?;
cursor = cursor.overflow_add(len);

match frame {
WALFrame::Data(data) => {
Expand Down Expand Up @@ -192,9 +198,9 @@ where
let frame_type = src[0];
match frame_type {
INVALID => Err(WALError::MaybeEnded),
INSERT => Self::decode_insert(&src),
REMOVE => Self::decode_remove(&src),
COMMIT => Self::decode_commit(&src),
INSERT => Self::decode_insert(src),
REMOVE => Self::decode_remove(src),
COMMIT => Self::decode_commit(src),
_ => Err(WALError::Corrupted(CorruptType::Codec(
"Unexpected frame type".to_owned(),
))),
Expand All @@ -204,8 +210,9 @@ where
/// Decodes an entry frame from source
#[allow(clippy::unwrap_used)]
fn decode_insert(mut src: &[u8]) -> Result<Option<(Self, usize)>, WALError> {
/// Size of the length encoded bytes
const LEN_SIZE: usize = 8;
let Some(propose_id) = Self::decode_propose_id(&src) else {
let Some(propose_id) = Self::decode_propose_id(src) else {
return Ok(None);
};
src = &src[PROPOSE_ID_SIZE..];
Expand All @@ -232,12 +239,18 @@ where

/// Decodes an seal index frame from source
fn decode_remove(src: &[u8]) -> Result<Option<(Self, usize)>, WALError> {
Ok(Self::decode_propose_id(&src)
Ok(Self::decode_propose_id(src)
.map(|id| WALFrame::Data(DataFrame::Remove(id)))
.map(|frame| (frame, PROPOSE_ID_SIZE)))
}

/// Decodes data frame header
#[allow(
clippy::unwrap_used,
clippy::unwrap_in_result,
clippy::indexing_slicing,
clippy::missing_asserts_for_indexing
)] // The operations are checked
fn decode_propose_id(src: &[u8]) -> Option<ProposeId> {
if src.len() < PROPOSE_ID_SIZE {
return None;
Expand Down Expand Up @@ -332,6 +345,7 @@ impl<H> FrameType for CommitFrame<H> {
}

impl<H> FrameEncoder for CommitFrame<H> {
#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)] // Won't overflow
fn encode(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(8 + self.checksum.len());
bytes.extend_from_slice(&[0; 8]);
Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/server/sp_wal/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(crate) enum CorruptType {
}

impl From<WALError> for io::Error {
#[inline]
fn from(err: WALError) -> Self {
match err {
WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"),
Expand Down
27 changes: 19 additions & 8 deletions crates/curp/src/server/sp_wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,20 @@ where
}

/// Keeps only commute commands
#[allow(
clippy::indexing_slicing, // Slicings are checked
clippy::pattern_type_mismatch, // Can't be fixed
)]
fn keep_commute_cmds(mut entries: Vec<PoolEntry<C>>) -> Vec<PoolEntry<C>> {
let commute = |entry: &PoolEntry<C>, others: &[PoolEntry<C>]| {
!others.iter().any(|e| e.is_conflict(&entry))
!others.iter().any(|e| e.is_conflict(entry))
};
// start from last element
entries.reverse();
let keep = entries
.iter()
.enumerate()
.take_while(|(i, ref e)| commute(*e, &entries[..*i]))
.take_while(|(i, e)| commute(e, &entries[..*i]))
.count();
entries.drain(..keep).collect()
}
Expand All @@ -141,18 +145,22 @@ impl<C: Command> PoolWALOps<C> for SpeculativePoolWAL<C> {
self.insert.lock().insert(entries)
}

#[allow(clippy::unwrap_used, clippy::unwrap_in_result)]
fn remove(&self, propose_ids: Vec<ProposeId>) -> io::Result<()> {
let removed = self.insert.lock().remove_propose_ids(&propose_ids)?;
let removed_ids: Vec<_> = removed.iter().map(Segment::propose_ids).flatten().collect();
for segment in removed {
let removed_insert_ids = self.insert.lock().remove_propose_ids(&propose_ids)?;
let removed_ids: Vec<_> = removed_insert_ids
.iter()
.flat_map(Segment::propose_ids)
.collect();
for segment in removed_insert_ids {
if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Insert(segment)) {
error!("Failed to send segment to dropping task: {e}");
}
}

let mut remove_l = self.remove.lock();
let removed = remove_l.remove_propose_ids(&removed_ids)?;
for segment in removed {
let removed_remove_ids = remove_l.remove_propose_ids(&removed_ids)?;
for segment in removed_remove_ids {
if let Err(e) = self.drop_tx.as_ref().unwrap().send(ToDrop::Remove(segment)) {
error!("Failed to send segment to dropping task: {e}");
}
Expand All @@ -172,6 +180,7 @@ impl<C: Command> PoolWALOps<C> for SpeculativePoolWAL<C> {
Ok(Self::keep_commute_cmds(entries))
}

#[allow(clippy::unwrap_used, clippy::unwrap_in_result)]
fn gc<F>(&self, check_fn: F) -> io::Result<()>
where
F: Fn(&ProposeId) -> bool,
Expand Down Expand Up @@ -219,6 +228,8 @@ where
}
}

/// The WAL type
#[allow(clippy::upper_case_acronyms)]
struct WAL<T, C> {
/// WAL segments
segments: Vec<Segment<T, WALCodec<C>>>,
Expand Down Expand Up @@ -278,7 +289,7 @@ where
.map(Segment::recover::<C>)
.collect::<Result<_>>()?;

self.next_segment_id = segments.last().map(Segment::segment_id).unwrap_or(0);
self.next_segment_id = segments.last().map_or(0, Segment::segment_id);
self.segments = segments;
self.open_new_segment()?;

Expand Down
25 changes: 16 additions & 9 deletions crates/curp/src/server/sp_wal/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const WAL_VERSION: u8 = 0x00;
/// The size of wal file header in bytes
const WAL_HEADER_SIZE: usize = 48;

/// Segment attributes
pub(super) trait SegmentAttr {
/// Segment file extension
fn ext() -> String;
Expand Down Expand Up @@ -72,9 +73,9 @@ where
r#type: T,
) -> io::Result<Self> {
let segment_name = Self::segment_name(segment_id);
let lfile = tmp_file.rename(segment_name)?;
let path = lfile.path();
let mut file = lfile.into_std();
let locked_file = tmp_file.rename(segment_name)?;
let path = locked_file.path();
let mut file = locked_file.into_std();
file.write_all(&Self::gen_header(segment_id))?;
file.flush()?;
file.sync_data()?;
Expand All @@ -93,16 +94,16 @@ where

/// Open an existing WAL segment file
pub(super) fn open(
lfile: LockedFile,
locked_file: LockedFile,
size_limit: u64,
codec: Codec,
r#type: T,
) -> Result<Self, WALError> {
let path = lfile.path();
let mut file = lfile.into_std();
let path = locked_file.path();
let mut file = locked_file.into_std();
let size = file.metadata()?.len();
let mut buf = vec![0; WAL_HEADER_SIZE];
let _ignore = file.read_exact(&mut buf)?;
file.read_exact(&mut buf)?;
let segment_id = Self::parse_header(&buf)?;

Ok(Self {
Expand Down Expand Up @@ -194,6 +195,7 @@ impl<T, Codec> Segment<T, Codec> {
}

/// Gets all items from the segment
#[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)] // Operations are checked
pub(super) fn get_all<Item>(&mut self) -> Result<Vec<Item>, WALError>
where
Codec: Decoder<Item = Item, Error = WALError>,
Expand Down Expand Up @@ -297,32 +299,37 @@ impl<T, Codec> Ord for Segment<T, Codec> {
}
}

/// Insert Segment type
pub(super) struct Insert;

impl SegmentAttr for Insert {
fn ext() -> String {
".inswal".to_string()
".inswal".to_owned()
}

fn r#type() -> Insert {
Insert
}
}

/// Remove Segment type
pub(super) struct Remove;

impl SegmentAttr for Remove {
fn ext() -> String {
".rmwal".to_string()
".rmwal".to_owned()
}

fn r#type() -> Remove {
Remove
}
}

/// `Insert` or `Remove` segment send to dropping task
pub(super) enum ToDrop<Codec> {
/// Insert
Insert(Segment<Insert, Codec>),
/// Remove
Remove(Segment<Remove, Codec>),
}

Expand Down
8 changes: 8 additions & 0 deletions crates/utils/src/wal/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub trait Decoder {
type Error: From<io::Error>;

/// Attempts to decode a frame from the provided buffer of bytes.
///
/// # Errors
///
/// This function will return an error if decoding has failed.
fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error>;
}

Expand All @@ -18,5 +22,9 @@ pub trait Encoder<Item> {
type Error: From<io::Error>;

/// Encodes a frame
///
/// # Errors
///
/// This function will return an error if encoding has failed.
fn encode(&mut self, item: Item) -> Result<Vec<u8>, Self::Error>;
}
Loading

0 comments on commit 42f058c

Please sign in to comment.