Skip to content

Commit

Permalink
remove min/max id from storage
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 23, 2024
1 parent 34f49c6 commit ba279f3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 117 deletions.
50 changes: 33 additions & 17 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use cache::BlockCache;
use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;
use reth_tracing::tracing::{debug, instrument};
use storage::{RemoveNotificationsRange, Storage};
use storage::Storage;

/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx.
///
Expand Down Expand Up @@ -44,7 +44,9 @@ impl Wal {
/// Fills the block cache with the notifications from the storage.
#[instrument(target = "exex::wal", skip(self))]
fn fill_block_cache(&mut self) -> eyre::Result<()> {
for entry in self.storage.iter_notifications(..) {
let Some(files_range) = self.storage.files_range()? else { return Ok(()) };

for entry in self.storage.iter_notifications(files_range) {
let (file_id, notification) = entry?;

let committed_chain = notification.committed_chain();
Expand All @@ -65,13 +67,14 @@ impl Wal {
}

/// Commits the notification to WAL.
#[instrument(target = "exex::wal", fields(
#[instrument(target = "exex::wal", skip_all, fields(
reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()),
committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range())
))]
pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
debug!("Writing notification to WAL");
let file_id = self.storage.write_notification(notification)?;
let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1);
self.storage.write_notification(file_id, notification)?;

debug!(?file_id, "Inserting notification blocks into the block cache");
self.block_cache.insert_notification_blocks_with_file_id(file_id, notification);
Expand All @@ -98,6 +101,7 @@ impl Wal {
// First, pop items from the back of the cache until we find the notification with the
// specified block. When found, save the file ID of that notification.
let mut remove_from_file_id = None;
let mut remove_to_file_id = None;
let mut lowest_removed_block = None;
while let Some((file_id, block)) = self.block_cache.pop_back() {
debug!(?file_id, ?block, "Popped back block from the block cache");
Expand Down Expand Up @@ -127,11 +131,14 @@ impl Wal {
}

remove_from_file_id = Some(file_id);
remove_to_file_id.get_or_insert(file_id);
}

// If the specified block is still not found, we can't do anything and just return. The
// cache was empty.
let Some(remove_from_file_id) = remove_from_file_id else {
let Some((remove_from_file_id, remove_to_file_id)) =
remove_from_file_id.zip(remove_to_file_id)
else {
debug!("No blocks were rolled back");
return Ok(None)
};
Expand All @@ -141,10 +148,9 @@ impl Wal {
debug!(?remove_from_file_id, "Block cache was rolled back");

// Remove notifications from the storage.
let removed_notifications = self
.storage
.take_notifications(RemoveNotificationsRange::FromFileId(remove_from_file_id))?;
debug!(removed_notifications = ?removed_notifications.len(), "WAL was rolled back");
let removed_notifications =
self.storage.take_notifications(remove_from_file_id..=remove_to_file_id)?;
debug!(removed_notifications = ?removed_notifications.len(), "Storage was rolled back");

Ok(Some((lowest_removed_block.expect("qed"), removed_notifications)))
}
Expand Down Expand Up @@ -197,19 +203,26 @@ impl Wal {

// Remove notifications from the storage from the beginning up to the unfinalized block, not
// inclusive.
let (mut file_range_start, mut file_range_end) = (None, None);
while let Some((file_id, _)) = self.block_cache.front() {
if file_id == remove_to_file_id {
break
}
self.block_cache.pop_front();

file_range_start.get_or_insert(file_id);
file_range_end = Some(file_id);
}
debug!(?remove_to_file_id, "Block cache was finalized");

// Remove notifications from the storage.
let removed_notifications = self
.storage
.remove_notifications(RemoveNotificationsRange::ToFileId(remove_to_file_id))?;
debug!(?removed_notifications, "WAL was finalized");
if let Some((file_range_start, file_range_end)) = file_range_start.zip(file_range_end) {
let removed_notifications =
self.storage.remove_notifications(file_range_start..=file_range_end)?;
debug!(?removed_notifications, "Storage was finalized");
} else {
debug!("No notifications were finalized from the storage");
}

Ok(())
}
Expand All @@ -232,7 +245,12 @@ mod tests {
};

fn read_notifications(wal: &Wal) -> eyre::Result<Vec<ExExNotification>> {
wal.storage.iter_notifications(..).map(|entry| Ok(entry?.1)).collect::<eyre::Result<_>>()
let Some(files_range) = wal.storage.files_range()? else { return Ok(Vec::new()) };

wal.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.1))
.collect::<eyre::Result<_>>()
}

#[test]
Expand Down Expand Up @@ -349,7 +367,7 @@ mod tests {
// back to is the block with number 0.
let rollback_result = wal.rollback((blocks[1].number, blocks[1].hash()).into())?;
assert_eq!(wal.block_cache.iter().collect::<Vec<_>>(), vec![]);
assert_eq!(wal.storage.iter_notifications(..).collect::<eyre::Result<Vec<_>>>()?, vec![]);
assert_eq!(read_notifications(&wal)?, vec![]);
assert_eq!(
rollback_result,
Some((
Expand Down Expand Up @@ -467,8 +485,6 @@ mod tests {
wal.block_cache.iter().collect::<Vec<_>>(),
[committed_notification_2_cache, reorged_notification_cache].concat()
);
assert_eq!(wal.storage.min_id, Some(2));
assert_eq!(wal.storage.max_id, Some(3));
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);

Ok(())
Expand Down
138 changes: 38 additions & 100 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
fs::File,
io::{Read, Write},
ops::{Bound, RangeBounds, RangeInclusive},
ops::RangeInclusive,
path::{Path, PathBuf},
};

Expand All @@ -18,8 +18,6 @@ use tracing::instrument;
pub(super) struct Storage {
/// The path to the WAL file.
path: PathBuf,
pub(super) min_id: Option<u64>,
pub(super) max_id: Option<u64>,
}

impl Storage {
Expand All @@ -28,20 +26,7 @@ impl Storage {
pub(super) fn new(path: impl AsRef<Path>) -> eyre::Result<Self> {
reth_fs_util::create_dir_all(&path)?;

let (mut min_id, mut max_id) = (None, None);

for entry in reth_fs_util::read_dir(&path)? {
let entry = entry?;
let file_name = entry.file_name();
let file_id = Self::parse_filename(&file_name.to_string_lossy())?;

min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id)));
max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id)));
}

debug!(?min_id, ?max_id, "Initialized WAL storage");

Ok(Self { path: path.as_ref().to_path_buf(), min_id, max_id })
Ok(Self { path: path.as_ref().to_path_buf() })
}

fn file_path(&self, id: u64) -> PathBuf {
Expand All @@ -55,23 +40,6 @@ impl Storage {
.ok_or_eyre(format!("failed to parse file name: {filename}"))
}

fn adjust_file_range(&self, range: impl RangeBounds<u64>) -> Option<RangeInclusive<u64>> {
let (min_id, max_id) = self.min_id.zip(self.max_id)?;

let start = match range.start_bound() {
Bound::Included(start) => *start,
Bound::Excluded(start) => *start + 1,
Bound::Unbounded => min_id,
};
let end = match range.end_bound() {
Bound::Included(end) => *end,
Bound::Excluded(end) => *end - 1,
Bound::Unbounded => max_id,
};

Some(start..=end)
}

/// Removes notification for the given file ID from the storage.
#[instrument(target = "exex::wal::storage", skip(self))]
fn remove_notification(&self, file_id: u64) {
Expand All @@ -81,35 +49,36 @@ impl Storage {
}
}

/// Returns the range of file IDs in the storage.
///
/// If there are no files in the storage, returns `None`.
pub(super) fn files_range(&self) -> eyre::Result<Option<RangeInclusive<u64>>> {
let mut min_id = None;
let mut max_id = None;

for entry in reth_fs_util::read_dir(&self.path)? {
let entry = entry?;
let file_name = entry.file_name();
let file_id = Self::parse_filename(&file_name.to_string_lossy())?;

min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id)));
max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id)));
}

Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id))
}

/// Removes notifications from the storage according to the given range.
///
/// # Returns
///
/// Number of removed notifications.
pub(super) fn remove_notifications(
&mut self,
range: RemoveNotificationsRange,
) -> eyre::Result<usize> {
let adjusted_range = match range {
RemoveNotificationsRange::FromFileId(from_file_id) => {
self.adjust_file_range(from_file_id..)
}
RemoveNotificationsRange::ToFileId(to_file_id) => self.adjust_file_range(..to_file_id),
};
let Some(adjusted_range) = adjusted_range else { return Ok(0) };

for id in adjusted_range.clone() {
pub(super) fn remove_notifications(&self, range: RangeInclusive<u64>) -> eyre::Result<usize> {
for id in range.clone() {
self.remove_notification(id);
}

match range {
RemoveNotificationsRange::FromFileId(from_file_id) => {
self.max_id = from_file_id.checked_sub(1)
}
RemoveNotificationsRange::ToFileId(to_file_id) => self.min_id = Some(to_file_id),
};

Ok(adjusted_range.count())
Ok(range.count())
}

/// Removes notifications from the storage according to the given range.
Expand All @@ -118,45 +87,23 @@ impl Storage {
///
/// Notifications that were removed.
pub(super) fn take_notifications(
&mut self,
range: RemoveNotificationsRange,
&self,
range: RangeInclusive<u64>,
) -> eyre::Result<Vec<ExExNotification>> {
let adjusted_range = match range {
RemoveNotificationsRange::FromFileId(from_file_id) => {
self.adjust_file_range(from_file_id..)
}
RemoveNotificationsRange::ToFileId(to_file_id) => self.adjust_file_range(..to_file_id),
};
let Some(adjusted_range) = adjusted_range else { return Ok(Vec::new()) };

let notifications =
self.iter_notifications(adjusted_range).collect::<eyre::Result<Vec<_>>>()?;
let notifications = self.iter_notifications(range).collect::<eyre::Result<Vec<_>>>()?;

for (id, _) in &notifications {
self.remove_notification(*id);
}

match range {
RemoveNotificationsRange::FromFileId(from_file_id) => {
self.max_id = from_file_id.checked_sub(1)
}
RemoveNotificationsRange::ToFileId(to_file_id) => self.min_id = Some(to_file_id),
};

Ok(notifications.into_iter().map(|(_, notification)| notification).collect())
}

pub(super) fn iter_notifications(
&self,
range: impl RangeBounds<u64>,
) -> Box<dyn Iterator<Item = eyre::Result<(u64, ExExNotification)>> + '_> {
let Some(range) = self.adjust_file_range(range) else {
return Box::new(std::iter::empty())
};

Box::new(
range.map(move |id| self.read_notification(id).map(|notification| (id, notification))),
)
range: RangeInclusive<u64>,
) -> impl Iterator<Item = eyre::Result<(u64, ExExNotification)>> + '_ {
range.map(move |id| self.read_notification(id).map(|notification| (id, notification)))
}

/// Reads the notification from the file with the given id.
Expand All @@ -170,30 +117,20 @@ impl Storage {

/// Writes the notification to the file with the given id.
pub(super) fn write_notification(
&mut self,
&self,
file_id: u64,
notification: &ExExNotification,
) -> eyre::Result<u64> {
let file_id = self.max_id.map_or(0, |id| id + 1);
self.min_id = self.min_id.map_or(Some(file_id), |min_id| Some(min_id.min(file_id)));
self.max_id = self.max_id.map_or(Some(file_id), |max_id| Some(max_id.max(file_id)));

) -> eyre::Result<()> {
debug!(?file_id, "Writing notification to WAL");

let file_path = self.file_path(file_id);
let mut file = File::create_new(&file_path)?;
write_notification(&mut file, notification)?;

Ok(file_id)
Ok(())
}
}

pub(super) enum RemoveNotificationsRange {
/// Remove notifications starting from the given file ID, inclusive.
FromFileId(u64),
/// Remove notifications up to the given file ID, exclusive.
ToFileId(u64),
}

fn write_notification(w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> {
rmp_serde::encode::write(w, notification)?;
w.flush()?;
Expand All @@ -220,7 +157,7 @@ mod tests {
let mut rng = generators::rng();

let temp_dir = tempfile::tempdir()?;
let mut storage = Storage::new(&temp_dir)?;
let storage = Storage::new(&temp_dir)?;

let old_block = random_block(&mut rng, 0, Default::default())
.seal_with_senders()
Expand All @@ -235,8 +172,9 @@ mod tests {
};

// Do a round trip serialization and deserialization
storage.write_notification(&notification)?;
let deserialized_notification = storage.read_notification(0)?;
let file_id = 0;
storage.write_notification(file_id, &notification)?;
let deserialized_notification = storage.read_notification(file_id)?;
assert_eq!(deserialized_notification, notification);

Ok(())
Expand Down

0 comments on commit ba279f3

Please sign in to comment.