Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby committed Sep 30, 2024
1 parent b53f75b commit 3ceade2
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub trait BatchStreamProvider {
pub struct BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
BF: L2ChainProvider + Debug,
{
/// The previous stage in the derivation pipeline.
prev: P,
Expand All @@ -54,6 +55,7 @@ where
impl<P, BF> BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
BF: L2ChainProvider + Debug,
{
/// Create a new [BatchStream] stage.
pub const fn new(prev: P, config: Arc<RollupConfig>, fetcher: BF) -> Self {
Expand Down Expand Up @@ -101,6 +103,7 @@ where
impl<P, BF> BatchQueueProvider for BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
fn flush(&mut self) {
if self.is_active().unwrap_or(false) {
Expand Down Expand Up @@ -169,6 +172,7 @@ where
impl<P, BF> OriginAdvancer for BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
self.prev.advance_origin().await
Expand All @@ -178,6 +182,7 @@ where
impl<P, BF> OriginProvider for BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
BF: L2ChainProvider + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
self.prev.origin()
Expand All @@ -188,6 +193,7 @@ where
impl<P, BF> ResettableStage for BatchStream<P, BF>
where
P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send,
BF: L2ChainProvider + Send + Debug,
{
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> {
self.prev.reset(base, cfg).await?;
Expand All @@ -203,6 +209,7 @@ mod test {
use crate::{
batch::{SingleBatch, SpanBatchElement},
stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage},
traits::test_utils::TestL2ChainProvider,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -215,7 +222,7 @@ mod test {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());
let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default());

// The stage should not be active.
assert!(!stream.is_active().unwrap());
Expand Down Expand Up @@ -244,7 +251,7 @@ mod test {
let data = vec![Ok(Batch::Span(mock_batch.clone()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());
let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default());

Check warning on line 254 in crates/derive/src/stages/batch_stream.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/batch_stream.rs#L254

Added line #L254 was not covered by tests

// The stage should be active.
assert!(stream.is_active().unwrap());
Expand Down Expand Up @@ -302,7 +309,7 @@ mod test {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());
let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default());

// The stage should be active.
assert!(stream.is_active().unwrap());
Expand Down

0 comments on commit 3ceade2

Please sign in to comment.