Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(derive): OriginProvider Trait Usage #87

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ where
Self { cfg, prev, telemetry, is_last_in_span: false, batch: None, builder }
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Loads a [SingleBatch] from the [AttributesProvider] if needed.
pub async fn load_batch(&mut self, parent: L2BlockInfo) -> StageResult<SingleBatch> {
if self.batch.is_none() {
Expand Down Expand Up @@ -150,6 +145,17 @@ where
}
}

impl<P, T, AB> OriginProvider for AttributesQueue<P, T, AB>
where
P: AttributesProvider + OriginProvider + Debug,
T: TelemetryProvider + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<P, T, AB> ResettableStage for AttributesQueue<P, T, AB>
where
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::{
stages::channel_reader::ChannelReader,
traits::{
ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher,
ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
types::{
Expand Down Expand Up @@ -83,11 +83,6 @@ where
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Returns if the previous batch was the last in the span.
pub fn is_last_in_span(&self) -> bool {
self.next_spans.is_empty()
Expand Down Expand Up @@ -353,6 +348,18 @@ where
}
}

impl<DAP, CP, BF, T> OriginProvider for BatchQueue<DAP, CP, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, BF, T> ResettableStage for BatchQueue<DAP, CP, BF, T>
where
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::frame_queue::FrameQueue;
use crate::{
params::{ChannelID, MAX_CHANNEL_BANK_SIZE},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider,
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage,
TelemetryProvider,
},
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -56,11 +57,6 @@ where
Self { cfg, telemetry, channels: HashMap::new(), channel_queue: VecDeque::new(), prev }
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Returns the size of the channel bank by accumulating over all channels.
pub fn size(&self) -> usize {
self.channels.iter().fold(0, |acc, (_, c)| acc + c.size())
Expand Down Expand Up @@ -202,6 +198,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for ChannelBank<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for ChannelBank<DAP, CP, T>
where
Expand Down
20 changes: 14 additions & 6 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use super::channel_bank::ChannelBank;
use crate::{
traits::{ChainProvider, DataAvailabilityProvider, LogLevel, TelemetryProvider},
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, TelemetryProvider,
},
types::{Batch, BlockInfo, StageError, StageResult},
};

Expand Down Expand Up @@ -70,18 +72,24 @@ where
Ok(())
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Forces the read to continue with the next channel, resetting any
/// decoding / decompression state to a fresh start.
pub fn next_channel(&mut self) {
self.next_batch = None;
}
}

impl<DAP, CP, T> OriginProvider for ChannelReader<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

/// Batch Reader provides a function that iteratively consumes batches from the reader.
/// The L1Inclusion block is also provided at creation time.
/// Warning: the batch reader can read every batch-type.
Expand Down
19 changes: 13 additions & 6 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use core::fmt::Debug;
use super::l1_retrieval::L1Retrieval;
use crate::{
traits::{
ChainProvider, DataAvailabilityProvider, LogLevel, ResettableStage, TelemetryProvider,
ChainProvider, DataAvailabilityProvider, LogLevel, OriginProvider, ResettableStage,
TelemetryProvider,
},
types::{BlockInfo, Frame, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -41,11 +42,6 @@ where
Self { prev, telemetry, queue: VecDeque::new() }
}

/// Returns the L1 [BlockInfo] origin.
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Fetches the next frame from the [FrameQueue].
pub async fn next_frame(&mut self) -> StageResult<Frame> {
if self.queue.is_empty() {
Expand Down Expand Up @@ -81,6 +77,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for FrameQueue<DAP, CP, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for FrameQueue<DAP, CP, T>
where
Expand Down
21 changes: 13 additions & 8 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use super::L1Traversal;
use crate::{
traits::{
ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, ResettableStage,
TelemetryProvider,
ChainProvider, DataAvailabilityProvider, DataIter, LogLevel, OriginProvider,
ResettableStage, TelemetryProvider,
},
types::{BlockInfo, StageError, StageResult, SystemConfig},
};
Expand Down Expand Up @@ -47,12 +47,6 @@ where
Self { prev, telemetry, provider, data: None }
}

/// Returns the current L1 [BlockInfo] origin from the previous
/// [L1Traversal] stage.
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Retrieves the next data item from the [L1Retrieval] stage.
/// Returns an error if there is no data.
pub async fn next_data(&mut self) -> StageResult<Bytes> {
Expand Down Expand Up @@ -81,6 +75,17 @@ where
}
}

impl<DAP, CP, T> OriginProvider for L1Retrieval<DAP, CP, T>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
T: TelemetryProvider,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}
}

#[async_trait]
impl<DAP, CP, T> ResettableStage for L1Retrieval<DAP, CP, T>
where
Expand Down
13 changes: 7 additions & 6 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the [L1Traversal] stage of the derivation pipeline.

use crate::{
traits::{ChainProvider, LogLevel, ResettableStage, TelemetryProvider},
traits::{ChainProvider, LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, sync::Arc};
Expand Down Expand Up @@ -61,11 +61,6 @@ impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
}
}

/// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists.
pub fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}

/// Advances the internal state of the [L1Traversal] stage to the next L1 block.
/// This function fetches the next L1 [BlockInfo] from the data source and updates the
/// [SystemConfig] with the receipts from the block.
Expand Down Expand Up @@ -112,6 +107,12 @@ impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
}
}

impl<F: ChainProvider, T: TelemetryProvider> OriginProvider for L1Traversal<F, T> {
fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}
}

#[async_trait]
impl<F: ChainProvider + Send, T: TelemetryProvider + Send> ResettableStage for L1Traversal<F, T> {
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
Expand Down
Loading