diff --git a/src/pipelines/mod.rs b/src/pipelines/mod.rs index d39ca5f..0fed684 100644 --- a/src/pipelines/mod.rs +++ b/src/pipelines/mod.rs @@ -30,7 +30,14 @@ pub use { Behavior::{Loop, Once}, events::system_events::*, limits::*, - step::{ControlFlow, InitContext, PayloadBuilderError, Step, StepContext}, + step::{ + ControlFlow, + InitContext, + PayloadBuilderError, + Step, + StepContext, + composite, + }, }; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -122,7 +129,7 @@ impl Pipeline

{ /// Sets payload building limits for the pipeline. /// /// Here we can either use an instance of `LimitsFactory` that generates - /// limits dynamically according to a user-defined logic, or we can use a + /// limits dynamically, according to a user-defined logic, or we can use a /// fixed `Limits` instance. #[must_use] pub fn with_limits>(self, limits: L) -> Self { diff --git a/src/pipelines/step/composite/atomic.rs b/src/pipelines/step/composite/atomic.rs new file mode 100644 index 0000000..11cc02b --- /dev/null +++ b/src/pipelines/step/composite/atomic.rs @@ -0,0 +1,102 @@ +use {super::*, std::sync::Arc}; + +pub struct AtomicMode; + +impl CompositeStepMode

for AtomicMode { + async fn steps( + &self, + steps: &[Arc>], + payload: Checkpoint

, + ctx: StepContext

, + ) -> ControlFlow

{ + let initial = payload.clone(); + let mut current = payload; + + for step in steps { + if ctx.deadline_reached() { + return ControlFlow::Break(initial); + } + + match step.step(current, ctx.clone()).await { + ControlFlow::Ok(next) => current = next, + ControlFlow::Break(_) => return ControlFlow::Break(initial), + ControlFlow::Fail(error) => return ControlFlow::Fail(error), + } + } + + if ctx.deadline_reached() { + ControlFlow::Break(initial) + } else { + ControlFlow::Ok(current) + } + } +} + +#[macro_export] +macro_rules! atomic { + ($($step:expr),+ $(,)?) => {{ + let mut composite = + $crate::prelude::composite::CompositeStep::new( + $crate::prelude::composite::atomic::AtomicMode, + ); + $( + composite.append_step($step); + )+ + composite + }}; +} + +#[cfg(test)] +mod tests { + use {super::*, crate::test_utils::*}; + + // TODO: improve tests here + + #[rblib_test(Ethereum)] + async fn atomic_mode_break_reverts() -> eyre::Result<()> + { + let mut composite = CompositeStep::::new(AtomicMode); + composite.append_step(AlwaysOkStep); + composite.append_step(AlwaysBreakStep); // break here + composite.append_step(AlwaysOkStep); + + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Break(_))); + + Ok(()) + } + + #[rblib_test(Ethereum)] + async fn atomic_mode_fail_propagates() -> eyre::Result<()> + { + let mut composite = CompositeStep::::new(AtomicMode); + composite.append_step(AlwaysOkStep); + composite.append_step(AlwaysFailStep); + composite.append_step(AlwaysOkStep); + + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Fail(_))); + + Ok(()) + } + + #[rblib_test(Ethereum)] + async fn atomic_macro_basic() -> eyre::Result<()> { + let composite = atomic!(AlwaysOkStep, AlwaysOkStep); + + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Ok(_))); + + Ok(()) + } + + #[rblib_test(Ethereum)] + async fn atomic_in_pipeline() -> eyre::Result<()> { + let pipeline = + Pipeline::

::default().with_step(atomic!(AlwaysOkStep, AlwaysOkStep)); + + P::create_test_node(pipeline).await?.next_block().await?; + + Ok(()) + } +} diff --git a/src/pipelines/step/composite/mod.rs b/src/pipelines/step/composite/mod.rs new file mode 100644 index 0000000..c96bac7 --- /dev/null +++ b/src/pipelines/step/composite/mod.rs @@ -0,0 +1,150 @@ +//! This module defines composite steps that can be used to compose multiple +//! steps into a single step. + +use { + crate::{ + pipelines::step::StepInstance, + platform::types::BuiltPayload, + prelude::*, + }, + std::sync::Arc, +}; + +pub mod atomic; + +/// A composite step with a list of steps. +/// The associated mode defines the specific behavior of executing steps +pub struct CompositeStep { + steps: Vec>>, + mode: M, +} + +impl CompositeStep { + pub fn new(mode: M) -> Self { + Self { + steps: Vec::new(), + mode, + } + } + + pub fn append_step(&mut self, step: impl Step

) { + self.steps.push(Arc::new(StepInstance::new(step))); + } +} + +/// A composite step mode defines the specific behavior of executing steps +/// It takes the list of steps, initial payload and context +trait CompositeStepMode: Send + Sync { + fn steps( + &self, + steps: &[Arc>], + payload: Checkpoint

, + ctx: StepContext

, + ) -> impl Future> + Send; +} + +/// `CompositeStep` will execute all before/after/setup functions of each step +/// in order The step implementation is delegated to the composite mode +impl Step

for CompositeStep +where + P: Platform, + M: CompositeStepMode

+ Send + 'static, +{ + async fn step( + self: Arc, + payload: Checkpoint

, + ctx: StepContext

, + ) -> ControlFlow

{ + self.mode.steps(&self.steps, payload, ctx).await + } + + async fn before_job( + self: Arc, + ctx: StepContext

, + ) -> Result<(), PayloadBuilderError> { + for step in &self.steps { + step.before_job(ctx.clone()).await?; + } + Ok(()) + } + + async fn after_job( + self: Arc, + ctx: StepContext

, + result: Arc, PayloadBuilderError>>, + ) -> Result<(), PayloadBuilderError> { + for step in &self.steps { + step.after_job(ctx.clone(), result.clone()).await?; + } + Ok(()) + } + + async fn setup( + &mut self, + init: InitContext

, + ) -> Result<(), PayloadBuilderError> { + for step in &self.steps { + step.setup(init.clone()).await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use {super::*, crate::test_utils::*}; + + // Composite step mode that tries to execute all steps and ignore all failures + // and breaks Will return the latest successful result + struct TryAllMode; + impl CompositeStepMode

for TryAllMode { + async fn steps( + &self, + steps: &[Arc>], + payload: Checkpoint

, + ctx: StepContext

, + ) -> ControlFlow

{ + let mut current = payload.clone(); + for step in steps { + if let ControlFlow::Ok(res) = + step.step(current.clone(), ctx.clone()).await + { + current = res; + } + } + ControlFlow::Ok(current) + } + } + + #[rblib_test(Ethereum)] + async fn composite_empty_steps() -> eyre::Result<()> { + let composite = CompositeStep::::new(TryAllMode); + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Ok(_))); + Ok(()) + } + + #[rblib_test(Ethereum)] + async fn composite_single_step_ok() -> eyre::Result<()> { + let mut composite = CompositeStep::::new(TryAllMode); + composite.append_step(AlwaysOkStep); + + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Ok(_))); + Ok(()) + } + + #[rblib_test(Ethereum)] + async fn composite_multiple_steps_ok() -> eyre::Result<()> + { + let mut composite = CompositeStep::::new(TryAllMode); + composite.append_step(AlwaysOkStep); + composite.append_step(AlwaysOkStep); + composite.append_step(AlwaysOkStep); + + let result = OneStep::

::new(composite).run().await?; + assert!(matches!(result, ControlFlow::Ok(_))); + + Ok(()) + } +} diff --git a/src/pipelines/step/context.rs b/src/pipelines/step/context.rs index a7e6d9b..943283e 100644 --- a/src/pipelines/step/context.rs +++ b/src/pipelines/step/context.rs @@ -11,8 +11,9 @@ use { /// Carries information specific to the step that is currently being executed. /// -/// An instance of this type is passed to `Step::step` method during pipeline -/// execution of steps. +/// An instance of this type is passed to the [`Step::step`] method during the +/// pipeline execution of steps. +#[derive(Debug, Clone)] pub struct StepContext { block: BlockContext

, limits: Limits, diff --git a/src/pipelines/step/mod.rs b/src/pipelines/step/mod.rs index 9e3a0b0..90f7001 100644 --- a/src/pipelines/step/mod.rs +++ b/src/pipelines/step/mod.rs @@ -5,6 +5,7 @@ use { std::sync::Arc, }; +pub mod composite; mod context; mod instance; mod metrics; @@ -30,7 +31,7 @@ pub use {context::StepContext, reth::payload::builder::PayloadBuilderError}; /// can be generic over the platform they run on or specialized for a specific /// platform. /// -/// The instance of the step is long-lived, and it's lifetime is equal to the +/// The instance of the step is long-lived, and its lifetime is equal to the /// lifetime of the pipeline it is part of. All invocations of the step will /// repeatedly call into the `step` async function on the same instance. /// @@ -45,9 +46,9 @@ pub trait Step: Send + Sync + 'static { /// failure that will terminate the pipeline execution. fn step( self: Arc, - payload: Checkpoint

, - ctx: StepContext

, - ) -> impl Future> + Send + Sync; + _payload: Checkpoint

, + _ctx: StepContext

, + ) -> impl Future> + Send; /// This function is called once per new payload job before any steps are /// executed. It can be used by steps to perform any optional initialization @@ -57,8 +58,8 @@ pub trait Step: Send + Sync + 'static { /// terminated immediately and no steps will be executed. fn before_job( self: Arc, - _: StepContext

, - ) -> impl Future> + Send + Sync { + _ctx: StepContext

, + ) -> impl Future> + Send { async { Ok(()) } } @@ -70,9 +71,9 @@ pub trait Step: Send + Sync + 'static { /// and will not produce a valid payload. fn after_job( self: Arc, - _: StepContext

, - _: Arc, PayloadBuilderError>>, - ) -> impl Future> + Send + Sync { + _ctx: StepContext

, + _result: Arc, PayloadBuilderError>>, + ) -> impl Future> + Send { async { Ok(()) } } @@ -80,8 +81,8 @@ pub trait Step: Send + Sync + 'static { /// instantiated as a payload builder service, before any payload jobs run. fn setup( &mut self, - _: InitContext

, - ) -> impl Future> + Send + Sync { + _init: InitContext

, + ) -> impl Future> + Send { async { Ok(()) } } } @@ -103,10 +104,10 @@ pub enum ControlFlow { /// Stops the pipeline execution that contains the step with a payload. /// /// If the step is inside a `Loop` sub-pipeline, it will stop the loop, - /// run its epilogue (if it exists) and progress to next steps in the parent - /// pipeline. + /// run its epilogue (if it exists) and progress to the next steps in the + /// parent pipeline. /// - /// Breaking out of a prologue step will not invoke any step in the pipeline, + /// Breaking out of a prologue step will not invoke any step in the pipeline /// and jump straight to the epilogue. /// /// Breaking out of an epilogue has the same effect as returning Ok from it, @@ -156,6 +157,7 @@ impl ControlFlow

{ } /// Context for the optional setup function of a step. +#[derive(Clone)] pub struct InitContext { metrics_scope: String, provider: Arc,