Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Channel Reader Implementation #65

Merged
merged 8 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ alloy-sol-types = { version = "0.6.3", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }

# Optional
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ where
pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> {
let origin = *self.origin().ok_or(anyhow!("No origin"))?;

// Get the channel for the frame, or create a new one if it doesn't exist.
let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
// Create a new channel
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
Expand Down
110 changes: 110 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
@@ -1 +1,111 @@
//! This module contains the `ChannelReader` struct.

use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider},
types::{Batch, BlockInfo, StageError, StageResult},
};
use alloc::vec::Vec;
use anyhow::anyhow;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec;

/// [ChannelReader] is a stateful stage that does the following:
#[derive(Debug)]
pub struct ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// The previous stage of the derivation pipeline.
prev: ChannelBank<DAP, CP>,
/// The batch reader.
next_batch: Option<BatchReader>,
}

impl<DAP, CP> ChannelReader<DAP, CP>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: ChannelBank<DAP, CP>) -> Self {
Self {
prev,
next_batch: None,
}
}

/// Pulls out the next Batch from the available channel.
pub async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
self.next_channel();
return Err(e);
}
match self
.next_batch
.as_mut()
.unwrap()
.next_batch()
.ok_or(StageError::NotEnoughData)
{
Ok(batch) => Ok(batch),
Err(e) => {
self.next_channel();
Err(e)
}
}
}

/// Creates the batch reader from available channel data.
async fn set_batch_reader(&mut self) -> StageResult<()> {
if self.next_batch.is_none() {
let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?;
clabby marked this conversation as resolved.
Show resolved Hide resolved
self.next_batch = Some(BatchReader::from(&channel[..]));
}
Ok(())
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Forces the read to continue with the next channel, resetting any
/// decoding / decompression state to a fresh start.
pub fn next_channel(&mut self) {
self.next_batch = None;
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
/// The caller of the batch-reader should filter the results.
#[derive(Debug)]
pub(crate) struct BatchReader {
/// The raw data to decode.
data: Option<Vec<u8>>,
/// Decompressed data.
decompressed: Vec<u8>,
}

impl BatchReader {
/// Pulls out the next batch from the reader.
pub(crate) fn next_batch(&mut self) -> Option<Batch> {
if let Some(data) = self.data.take() {
self.decompressed = decompress_to_vec(&data).ok()?;
}
let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?;
Some(batch)
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self {
data: Some(data.to_vec()),
decompressed: Vec::new(),
}
}
}
4 changes: 3 additions & 1 deletion crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub use frame_queue::FrameQueue;
mod channel_bank;
pub use channel_bank::ChannelBank;

mod batch_queue;
mod channel_reader;
pub use channel_reader::ChannelReader;

mod batch_queue;
mod engine_queue;
mod payload_derivation;
40 changes: 40 additions & 0 deletions crates/derive/src/types/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! This module contains the enumerable [Batch].

use super::batch_type::BatchType;
use super::single_batch::SingleBatch;
use crate::types::errors::DecodeError;

use alloy_rlp::Decodable;

// TODO: replace this with a span batch
/// Span Batch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SpanBatch {}

/// A Batch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Batch {
/// A single batch
Single(SingleBatch),
/// Span Batches
Span(SpanBatch),
}

impl Batch {
/// Attempts to decode a batch from a byte slice.
pub fn decode(r: &mut &[u8]) -> Result<Self, DecodeError> {
if r.is_empty() {
return Err(DecodeError::EmptyBuffer);
}
match BatchType::from(r[0]) {
BatchType::Single => {
let single_batch = SingleBatch::decode(r)?;
Ok(Batch::Single(single_batch))
}
BatchType::Span => {
// TODO: implement span batch decoding
unimplemented!()
}
}
}
}
67 changes: 67 additions & 0 deletions crates/derive/src/types/batch_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Contains the [BatchType] and its encodings.

use alloy_rlp::{Decodable, Encodable};

/// The single batch type identifier.
pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x01;

/// The span batch type identifier.
pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02;

/// The Batch Type.
#[derive(Debug, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum BatchType {
/// Single Batch.
Single = SINGLE_BATCH_TYPE,
/// Span Batch.
Span = SPAN_BATCH_TYPE,
}

impl From<u8> for BatchType {
fn from(val: u8) -> Self {
match val {
SINGLE_BATCH_TYPE => BatchType::Single,
SPAN_BATCH_TYPE => BatchType::Span,
_ => panic!("Invalid batch type"),
}
}
}

impl From<&[u8]> for BatchType {
fn from(buf: &[u8]) -> Self {
BatchType::from(buf[0])
}
}

impl Encodable for BatchType {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
let val = match self {
BatchType::Single => SINGLE_BATCH_TYPE,
BatchType::Span => SPAN_BATCH_TYPE,
};
val.encode(out);
}
}

impl Decodable for BatchType {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let val = u8::decode(buf)?;
Ok(BatchType::from(val))
}
}

#[cfg(test)]
mod test {
use super::*;
use alloc::vec::Vec;

#[test]
fn test_batch_type() {
let batch_type = BatchType::Single;
let mut buf = Vec::new();
batch_type.encode(&mut buf);
let decoded = BatchType::decode(&mut buf.as_slice()).unwrap();
assert_eq!(batch_type, decoded);
}
}
34 changes: 34 additions & 0 deletions crates/derive/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,37 @@ impl Display for StageError {
}
}
}

/// A decoding error.
#[derive(Debug)]
pub enum DecodeError {
/// The buffer is empty.
EmptyBuffer,
/// Alloy RLP Encoding Error.
AlloyRlpError(alloy_rlp::Error),
}

impl From<alloy_rlp::Error> for DecodeError {
fn from(e: alloy_rlp::Error) -> Self {
DecodeError::AlloyRlpError(e)
}
}

impl PartialEq<DecodeError> for DecodeError {
fn eq(&self, other: &DecodeError) -> bool {
matches!(
(self, other),
(DecodeError::EmptyBuffer, DecodeError::EmptyBuffer)
| (DecodeError::AlloyRlpError(_), DecodeError::AlloyRlpError(_))
)
}
}

impl Display for DecodeError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
DecodeError::EmptyBuffer => write!(f, "Empty buffer"),
DecodeError::AlloyRlpError(e) => write!(f, "Alloy RLP Decoding Error: {}", e),
}
}
}
8 changes: 7 additions & 1 deletion crates/derive/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
use alloc::vec::Vec;
use alloy_rlp::{Decodable, Encodable};

mod batch;
pub use batch::Batch;

mod batch_type;
pub use batch_type::BatchType;

mod system_config;
pub use system_config::{
SystemAccounts, SystemConfig, SystemConfigUpdateType, CONFIG_UPDATE_EVENT_VERSION_0,
Expand Down Expand Up @@ -43,7 +49,7 @@ mod channel;
pub use channel::Channel;

mod errors;
pub use errors::{StageError, StageResult};
pub use errors::{DecodeError, StageError, StageResult};

mod single_batch;
pub use single_batch::SingleBatch;
Expand Down
Loading