Skip to content

Commit

Permalink
Merge pull request #66 from ethereum-optimism/refcell/channel-reader
Browse files Browse the repository at this point in the history
feat(derive): Channel Reader Implementation
  • Loading branch information
refcell authored Mar 29, 2024
2 parents 2dd1baf + f6c4786 commit f777b0e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alloy-sol-types = { version = "0.6.3", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }

# Optional
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ where
let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size());
// Prune until it is reasonable again. The high-priority channel failed to be read,
// so we prune from there.
// TODO: make max channel bank size configurable via config
// TODO: debug pruning until max channel bank size
while total_size > MAX_CHANNEL_BANK_SIZE {
let id = self
.channel_queue
Expand All @@ -84,8 +86,8 @@ where
pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> {
let origin = *self.origin().ok_or(anyhow!("No origin"))?;

// Get the channel for the frame, or create a new one if it doesn't exist.
let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
// Create a new channel
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
Expand Down
111 changes: 111 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
@@ -1 +1,112 @@
//! This module contains the `ChannelReader` struct.
use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider},
types::{Batch, BlockInfo, StageError, StageResult},
};
use alloc::vec::Vec;
use alloy_rlp::Decodable;
use anyhow::anyhow;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec;

/// [ChannelReader] is a stateful stage that does the following:
#[derive(Debug)]
pub struct ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// The previous stage of the derivation pipeline.
prev: ChannelBank<DAP, CP>,
/// The batch reader.
next_batch: Option<BatchReader>,
}

impl<DAP, CP> ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: ChannelBank<DAP, CP>) -> Self {
Self {
prev,
next_batch: None,
}
}

/// Pulls out the next Batch from the available channel.
pub async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
self.next_channel();
return Err(e);
}
match self
.next_batch
.as_mut()
.unwrap()
.next_batch()
.ok_or(anyhow!("no batch"))
{
Ok(batch) => Ok(batch),
Err(e) => {
self.next_channel();
Err(StageError::Custom(e))
}
}
}

/// Creates the batch reader from available channel data.
async fn set_batch_reader(&mut self) -> StageResult<()> {
if self.next_batch.is_none() {
let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?;
self.next_batch = Some(BatchReader::from(&channel[..]));
}
Ok(())
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Forces the read to continue with the next channel, resetting any
/// decoding / decompression state to a fresh start.
pub fn next_channel(&mut self) {
self.next_batch = None;
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
/// The caller of the batch-reader should filter the results.
#[derive(Debug)]
pub(crate) struct BatchReader {
/// The raw data to decode.
data: Option<Vec<u8>>,
/// Decompressed data.
decompressed: Vec<u8>,
}

impl BatchReader {
/// Pulls out the next batch from the reader.
pub(crate) fn next_batch(&mut self) -> Option<Batch> {
if let Some(data) = self.data.take() {
self.decompressed = decompress_to_vec(&data).ok()?;
}
let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?;
Some(batch)
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self {
data: Some(data.to_vec()),
decompressed: Vec::new(),
}
}
}
4 changes: 3 additions & 1 deletion crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub use frame_queue::FrameQueue;
mod channel_bank;
pub use channel_bank::ChannelBank;

mod batch_queue;
mod channel_reader;
pub use channel_reader::ChannelReader;

mod batch_queue;
mod engine_queue;
mod payload_derivation;

0 comments on commit f777b0e

Please sign in to comment.