From 81ffa207ce2718523cca4d3232846917bd1af652 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 26 Mar 2024 15:44:23 -0700 Subject: [PATCH 1/8] feat(derive): batch type for the channel reader --- crates/derive/src/types/batch.rs | 56 +++++++++++++++++++++++ crates/derive/src/types/batch_type.rs | 66 +++++++++++++++++++++++++++ crates/derive/src/types/mod.rs | 6 +++ 3 files changed, 128 insertions(+) create mode 100644 crates/derive/src/types/batch.rs create mode 100644 crates/derive/src/types/batch_type.rs diff --git a/crates/derive/src/types/batch.rs b/crates/derive/src/types/batch.rs new file mode 100644 index 000000000..d2daea5ff --- /dev/null +++ b/crates/derive/src/types/batch.rs @@ -0,0 +1,56 @@ +//! This module contains the enumerable [Batch]. + +use super::batch_type::BatchType; +use super::single_batch::SingleBatch; + +use alloy_rlp::{Decodable, Encodable}; + +// TODO: replace this with a span batch +/// Span Batch. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SpanBatch {} + +/// A Batch. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Batch { + /// A single batch + Single(SingleBatch), + /// Span Batches + Span(SpanBatch), +} + +impl Decodable for Batch { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + // The buffer must have at least one identifier byte. + if buf.is_empty() { + return Err(alloy_rlp::Error::Custom("Empty buffer")); + } + match BatchType::from(buf[0]) { + BatchType::Single => { + let single_batch = SingleBatch::decode(buf)?; + Ok(Batch::Single(single_batch)) + } + BatchType::Span => { + // TODO: implement span batch decoding + unimplemented!() + } + } + } +} + +impl Encodable for Batch { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + match self { + Batch::Single(single_batch) => { + BatchType::Single.encode(out); + single_batch.encode(out); + } + Batch::Span(_) => { + // TODO: implement span batch encoding + unimplemented!() + } + } + } +} + + diff --git a/crates/derive/src/types/batch_type.rs b/crates/derive/src/types/batch_type.rs new file mode 100644 index 000000000..ac973c04b --- /dev/null +++ b/crates/derive/src/types/batch_type.rs @@ -0,0 +1,66 @@ +//! Contains the [BatchType] and its encodings. + +use alloy_rlp::{Decodable, Encodable}; + +/// The single batch type identifier. +pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x01; + +/// The span batch type identifier. +pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02; + +/// The Batch Type. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BatchType { + /// Single Batch. + Single = SINGLE_BATCH_TYPE as isize, + /// Span Batch. + Span = SPAN_BATCH_TYPE as isize, +} + +impl From for BatchType { + fn from(val: u8) -> Self { + match val { + SINGLE_BATCH_TYPE => BatchType::Single, + SPAN_BATCH_TYPE => BatchType::Span, + _ => panic!("Invalid batch type"), + } + } +} + +impl From<&[u8]> for BatchType { + fn from(buf: &[u8]) -> Self { + BatchType::from(buf[0]) + } +} + +impl Encodable for BatchType { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + let val = match self { + BatchType::Single => SINGLE_BATCH_TYPE, + BatchType::Span => SPAN_BATCH_TYPE, + }; + val.encode(out); + } +} + +impl Decodable for BatchType { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let val = u8::decode(buf)?; + Ok(BatchType::from(val)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use alloc::vec::Vec; + + #[test] + fn test_batch_type() { + let batch_type = BatchType::Single; + let mut buf = Vec::new(); + batch_type.encode(&mut buf); + let decoded = BatchType::decode(&mut buf.as_slice()).unwrap(); + assert_eq!(batch_type, decoded); + } +} diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 91b7725cb..3207b6540 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -3,6 +3,12 @@ use alloc::vec::Vec; use alloy_rlp::{Decodable, Encodable}; +mod batch; +pub use batch::Batch; + +mod batch_type; +pub use batch_type::BatchType; + mod system_config; pub use system_config::{ SystemAccounts, SystemConfig, SystemConfigUpdateType, CONFIG_UPDATE_EVENT_VERSION_0, From 6f666ead4b35f751c7e44dfc06973ef75169cddc Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 26 Mar 2024 15:47:46 -0700 Subject: [PATCH 2/8] fix(derive): batch type lints --- crates/derive/src/types/batch.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/derive/src/types/batch.rs b/crates/derive/src/types/batch.rs index d2daea5ff..cb09ec1e1 100644 --- a/crates/derive/src/types/batch.rs +++ b/crates/derive/src/types/batch.rs @@ -34,7 +34,7 @@ impl Decodable for Batch { // TODO: implement span batch decoding unimplemented!() } - } + } } } @@ -52,5 +52,3 @@ impl Encodable for Batch { } } } - - From e8575199268c9e1481bb6633ea45740289a171e5 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 27 Mar 2024 12:02:18 -0700 Subject: [PATCH 3/8] feat(derive): channel reader implementation with batch reader --- Cargo.lock | 1 + crates/derive/Cargo.toml | 1 + crates/derive/src/stages/channel_bank.rs | 9 +- crates/derive/src/stages/channel_reader.rs | 111 +++++++++++++++++++++ crates/derive/src/stages/mod.rs | 4 +- 5 files changed, 123 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56ee426f4..8cca54c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,7 @@ dependencies = [ "anyhow", "async-trait", "hashbrown", + "miniz_oxide", "serde", "tokio", "unsigned-varint", diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index 38e640b31..fbdb6947a 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -19,6 +19,7 @@ alloy-sol-types = { version = "0.6.3", default-features = false } async-trait = "0.1.77" hashbrown = "0.14.3" unsigned-varint = "0.8.0" +miniz_oxide = { version = "0.7.2" } # Optional serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true } diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index c872c9ce1..6d981ab8a 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -68,7 +68,12 @@ where /// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE]. /// Prunes from the high-priority channel since it failed to be read. pub fn prune(&mut self) -> StageResult<()> { - let mut total_size = self.size(); + // Check total size + let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()); + // Prune until it is reasonable again. The high-priority channel failed to be read, + // so we prune from there. + // TODO: make max channel bank size configurable via config + // TODO: debug pruning until max channel bank size while total_size > MAX_CHANNEL_BANK_SIZE { let id = self .channel_queue @@ -87,8 +92,8 @@ where pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> { let origin = *self.origin().ok_or(anyhow!("No origin"))?; + // Get the channel for the frame, or create a new one if it doesn't exist. let current_channel = self.channels.entry(frame.id).or_insert_with(|| { - // Create a new channel let channel = Channel::new(frame.id, origin); self.channel_queue.push_back(frame.id); channel diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 8b1378917..e128a56fb 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -1 +1,112 @@ +//! This module contains the `ChannelReader` struct. +use super::channel_bank::ChannelBank; +use crate::{ + traits::{ChainProvider, DataAvailabilityProvider}, + types::{Batch, BlockInfo, StageError, StageResult}, +}; +use alloc::vec::Vec; +use alloy_rlp::Decodable; +use anyhow::anyhow; +use core::fmt::Debug; +use miniz_oxide::inflate::decompress_to_vec; + +/// [ChannelReader] is a stateful stage that does the following: +#[derive(Debug)] +pub struct ChannelReader +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, +{ + /// The previous stage of the derivation pipeline. + prev: ChannelBank, + /// The batch reader. + next_batch: Option, +} + +impl ChannelReader +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, +{ + /// Create a new [ChannelReader] stage. + pub fn new(prev: ChannelBank) -> Self { + Self { + prev, + next_batch: None, + } + } + + /// Pulls out the next Batch from the available channel. + pub async fn next_batch(&mut self) -> StageResult { + if let Err(e) = self.set_batch_reader().await { + self.next_channel(); + return Err(e); + } + match self + .next_batch + .as_mut() + .unwrap() + .next_batch() + .ok_or(anyhow!("no batch")) + { + Ok(batch) => Ok(batch), + Err(e) => { + self.next_channel(); + Err(StageError::Custom(e)) + } + } + } + + /// Creates the batch reader from available channel data. + async fn set_batch_reader(&mut self) -> StageResult<()> { + if self.next_batch.is_none() { + let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; + self.next_batch = Some(BatchReader::from(&channel[..])); + } + Ok(()) + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Forces the read to continue with the next channel, resetting any + /// decoding / decompression state to a fresh start. + pub fn next_channel(&mut self) { + self.next_batch = None; + } +} + +/// Batch Reader provides a function that iteratively consumes batches from the reader. +/// The L1Inclusion block is also provided at creation time. +/// Warning: the batch reader can read every batch-type. +/// The caller of the batch-reader should filter the results. +#[derive(Debug)] +pub(crate) struct BatchReader { + /// The raw data to decode. + data: Option>, + /// Decompressed data. + decompressed: Vec, +} + +impl BatchReader { + /// Pulls out the next batch from the reader. + pub(crate) fn next_batch(&mut self) -> Option { + if let Some(data) = self.data.take() { + self.decompressed = decompress_to_vec(&data).ok()?; + } + let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?; + Some(batch) + } +} + +impl From<&[u8]> for BatchReader { + fn from(data: &[u8]) -> Self { + Self { + data: Some(data.to_vec()), + decompressed: Vec::new(), + } + } +} diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index b1e079bbf..761995d9b 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -24,7 +24,9 @@ pub use frame_queue::FrameQueue; mod channel_bank; pub use channel_bank::ChannelBank; -mod batch_queue; mod channel_reader; +pub use channel_reader::ChannelReader; + +mod batch_queue; mod engine_queue; mod payload_derivation; From 8f3a81da69ee09dc4c659b22259be7fdd2c5318c Mon Sep 17 00:00:00 2001 From: refcell Date: Sat, 30 Mar 2024 17:31:17 -0400 Subject: [PATCH 4/8] fix(derive): channel bank impl --- crates/derive/src/stages/channel_bank.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 6d981ab8a..08ccdb6e3 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -68,12 +68,7 @@ where /// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE]. /// Prunes from the high-priority channel since it failed to be read. pub fn prune(&mut self) -> StageResult<()> { - // Check total size - let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()); - // Prune until it is reasonable again. The high-priority channel failed to be read, - // so we prune from there. - // TODO: make max channel bank size configurable via config - // TODO: debug pruning until max channel bank size + let mut total_size = self.size(); while total_size > MAX_CHANNEL_BANK_SIZE { let id = self .channel_queue From 68148c3b2b7457c232febe3089bbfcb99d84e6a3 Mon Sep 17 00:00:00 2001 From: refcell Date: Mon, 1 Apr 2024 23:23:14 -0400 Subject: [PATCH 5/8] Update crates/derive/src/types/batch_type.rs Co-authored-by: clabby --- crates/derive/src/types/batch_type.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/derive/src/types/batch_type.rs b/crates/derive/src/types/batch_type.rs index ac973c04b..068452816 100644 --- a/crates/derive/src/types/batch_type.rs +++ b/crates/derive/src/types/batch_type.rs @@ -10,11 +10,12 @@ pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02; /// The Batch Type. #[derive(Debug, Clone, PartialEq, Eq)] +#[repr(u8)] pub enum BatchType { /// Single Batch. - Single = SINGLE_BATCH_TYPE as isize, + Single = SINGLE_BATCH_TYPE, /// Span Batch. - Span = SPAN_BATCH_TYPE as isize, + Span = SPAN_BATCH_TYPE, } impl From for BatchType { From 6f86f63e9397f11dfed5d19125513a0933051713 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 2 Apr 2024 20:00:00 -0400 Subject: [PATCH 6/8] fix(derive): channel reader fixes --- crates/derive/src/stages/channel_reader.rs | 13 +++++---- crates/derive/src/types/batch.rs | 30 ++++++------------- crates/derive/src/types/errors.rs | 34 ++++++++++++++++++++++ crates/derive/src/types/mod.rs | 2 +- 4 files changed, 52 insertions(+), 27 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index e128a56fb..47cfe1192 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -6,7 +6,6 @@ use crate::{ types::{Batch, BlockInfo, StageError, StageResult}, }; use alloc::vec::Vec; -use alloy_rlp::Decodable; use anyhow::anyhow; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec; @@ -48,12 +47,12 @@ where .as_mut() .unwrap() .next_batch() - .ok_or(anyhow!("no batch")) + .ok_or(StageError::NotEnoughData) { Ok(batch) => Ok(batch), Err(e) => { self.next_channel(); - Err(StageError::Custom(e)) + Err(e) } } } @@ -61,7 +60,11 @@ where /// Creates the batch reader from available channel data. async fn set_batch_reader(&mut self) -> StageResult<()> { if self.next_batch.is_none() { - let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; + let channel = match self.prev.next_data().await { + Ok(Some(channel)) => channel, + Ok(None) => return Err(anyhow!("no channel").into()), + Err(err) => return Err(err), + }; self.next_batch = Some(BatchReader::from(&channel[..])); } Ok(()) @@ -97,7 +100,7 @@ impl BatchReader { if let Some(data) = self.data.take() { self.decompressed = decompress_to_vec(&data).ok()?; } - let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?; + let batch = Batch::try_from(self.decompressed.as_ref()).ok()?; Some(batch) } } diff --git a/crates/derive/src/types/batch.rs b/crates/derive/src/types/batch.rs index cb09ec1e1..fdac3d8a9 100644 --- a/crates/derive/src/types/batch.rs +++ b/crates/derive/src/types/batch.rs @@ -2,8 +2,9 @@ use super::batch_type::BatchType; use super::single_batch::SingleBatch; +use crate::types::errors::DecodeError; -use alloy_rlp::{Decodable, Encodable}; +use alloy_rlp::Decodable; // TODO: replace this with a span batch /// Span Batch. @@ -19,15 +20,17 @@ pub enum Batch { Span(SpanBatch), } -impl Decodable for Batch { - fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { - // The buffer must have at least one identifier byte. +impl TryFrom<&[u8]> for Batch { + type Error = DecodeError; + + fn try_from(bytes: &[u8]) -> Result { + let mut buf = bytes; if buf.is_empty() { - return Err(alloy_rlp::Error::Custom("Empty buffer")); + return Err(Self::Error::EmptyBuffer); } match BatchType::from(buf[0]) { BatchType::Single => { - let single_batch = SingleBatch::decode(buf)?; + let single_batch = SingleBatch::decode(&mut buf)?; Ok(Batch::Single(single_batch)) } BatchType::Span => { @@ -37,18 +40,3 @@ impl Decodable for Batch { } } } - -impl Encodable for Batch { - fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { - match self { - Batch::Single(single_batch) => { - BatchType::Single.encode(out); - single_batch.encode(out); - } - Batch::Span(_) => { - // TODO: implement span batch encoding - unimplemented!() - } - } - } -} diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 98d3220b8..b5659922c 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -43,3 +43,37 @@ impl Display for StageError { } } } + +/// A decoding error. +#[derive(Debug)] +pub enum DecodeError { + /// The buffer is empty. + EmptyBuffer, + /// Alloy RLP Encoding Error. + AlloyRlpError(alloy_rlp::Error), +} + +impl From for DecodeError { + fn from(e: alloy_rlp::Error) -> Self { + DecodeError::AlloyRlpError(e) + } +} + +impl PartialEq for DecodeError { + fn eq(&self, other: &DecodeError) -> bool { + matches!( + (self, other), + (DecodeError::EmptyBuffer, DecodeError::EmptyBuffer) + | (DecodeError::AlloyRlpError(_), DecodeError::AlloyRlpError(_)) + ) + } +} + +impl Display for DecodeError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + DecodeError::EmptyBuffer => write!(f, "Empty buffer"), + DecodeError::AlloyRlpError(e) => write!(f, "Alloy RLP Decoding Error: {}", e), + } + } +} diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 3207b6540..2025ab547 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -49,7 +49,7 @@ mod channel; pub use channel::Channel; mod errors; -pub use errors::{StageError, StageResult}; +pub use errors::{DecodeError, StageError, StageResult}; mod single_batch; pub use single_batch::SingleBatch; From 5568ae2337c365185cfd1b36da8ceb0b6e3ae1b2 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 2 Apr 2024 20:01:07 -0400 Subject: [PATCH 7/8] fix(derive): revert unfurrling change --- crates/derive/src/stages/channel_reader.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 47cfe1192..e69d5118a 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -60,11 +60,7 @@ where /// Creates the batch reader from available channel data. async fn set_batch_reader(&mut self) -> StageResult<()> { if self.next_batch.is_none() { - let channel = match self.prev.next_data().await { - Ok(Some(channel)) => channel, - Ok(None) => return Err(anyhow!("no channel").into()), - Err(err) => return Err(err), - }; + let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; self.next_batch = Some(BatchReader::from(&channel[..])); } Ok(()) From f295d49fdf057114c47c7752e65520178de07a92 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 2 Apr 2024 21:19:38 -0400 Subject: [PATCH 8/8] fix(derive): batch decoding --- crates/derive/src/stages/channel_reader.rs | 2 +- crates/derive/src/types/batch.rs | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index e69d5118a..05c2edc78 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -96,7 +96,7 @@ impl BatchReader { if let Some(data) = self.data.take() { self.decompressed = decompress_to_vec(&data).ok()?; } - let batch = Batch::try_from(self.decompressed.as_ref()).ok()?; + let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?; Some(batch) } } diff --git a/crates/derive/src/types/batch.rs b/crates/derive/src/types/batch.rs index fdac3d8a9..5b52bd8e3 100644 --- a/crates/derive/src/types/batch.rs +++ b/crates/derive/src/types/batch.rs @@ -20,17 +20,15 @@ pub enum Batch { Span(SpanBatch), } -impl TryFrom<&[u8]> for Batch { - type Error = DecodeError; - - fn try_from(bytes: &[u8]) -> Result { - let mut buf = bytes; - if buf.is_empty() { - return Err(Self::Error::EmptyBuffer); +impl Batch { + /// Attempts to decode a batch from a byte slice. + pub fn decode(r: &mut &[u8]) -> Result { + if r.is_empty() { + return Err(DecodeError::EmptyBuffer); } - match BatchType::from(buf[0]) { + match BatchType::from(r[0]) { BatchType::Single => { - let single_batch = SingleBatch::decode(&mut buf)?; + let single_batch = SingleBatch::decode(r)?; Ok(Batch::Single(single_batch)) } BatchType::Span => {