Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/pipelines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ pub use {
Behavior::{Loop, Once},
events::system_events::*,
limits::*,
step::{ControlFlow, InitContext, PayloadBuilderError, Step, StepContext},
step::{
ControlFlow,
InitContext,
PayloadBuilderError,
Step,
StepContext,
composite,
},
};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -122,7 +129,7 @@ impl<P: Platform> Pipeline<P> {
/// Sets payload building limits for the pipeline.
///
/// Here we can either use an instance of `LimitsFactory` that generates
/// limits dynamically according to a user-defined logic, or we can use a
/// limits dynamically, according to a user-defined logic, or we can use a
/// fixed `Limits` instance.
#[must_use]
pub fn with_limits<T, L: IntoScopedLimits<P, T>>(self, limits: L) -> Self {
Expand Down
102 changes: 102 additions & 0 deletions src/pipelines/step/composite/atomic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use {super::*, std::sync::Arc};

pub struct AtomicMode;

impl<P: Platform> CompositeStepMode<P> for AtomicMode {
async fn steps(
&self,
steps: &[Arc<StepInstance<P>>],
payload: Checkpoint<P>,
ctx: StepContext<P>,
) -> ControlFlow<P> {
let initial = payload.clone();
let mut current = payload;

for step in steps {
if ctx.deadline_reached() {
return ControlFlow::Break(initial);
}

match step.step(current, ctx.clone()).await {
ControlFlow::Ok(next) => current = next,
ControlFlow::Break(_) => return ControlFlow::Break(initial),
ControlFlow::Fail(error) => return ControlFlow::Fail(error),
Comment on lines +22 to +23
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that those two variants should return ControlFlow::Ok(initial). Though this is a matter of the intention behind this step.

}
}

if ctx.deadline_reached() {
ControlFlow::Break(initial)
} else {
ControlFlow::Ok(current)
}
}
}

#[macro_export]
macro_rules! atomic {
($($step:expr),+ $(,)?) => {{
let mut composite =
$crate::prelude::composite::CompositeStep::new(
$crate::prelude::composite::atomic::AtomicMode,
);
$(
composite.append_step($step);
)+
composite
}};
}

#[cfg(test)]
mod tests {
use {super::*, crate::test_utils::*};

// TODO: improve tests here

#[rblib_test(Ethereum)]
async fn atomic_mode_break_reverts<P: TestablePlatform>() -> eyre::Result<()>
{
let mut composite = CompositeStep::<P, _>::new(AtomicMode);
composite.append_step(AlwaysOkStep);
composite.append_step(AlwaysBreakStep); // break here
composite.append_step(AlwaysOkStep);

let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Break(_)));

Ok(())
}

#[rblib_test(Ethereum)]
async fn atomic_mode_fail_propagates<P: TestablePlatform>() -> eyre::Result<()>
{
let mut composite = CompositeStep::<P, _>::new(AtomicMode);
composite.append_step(AlwaysOkStep);
composite.append_step(AlwaysFailStep);
composite.append_step(AlwaysOkStep);

let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Fail(_)));
Comment on lines +77 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should ControlFlow::Fail in Atomic when one of its sub-steps fail or should we return ControlFlow::Ok(_) of the initial payload?

My thoughts are that if Atomic propagates ControlFlow::Fail and ControlFlow::Break then it is no different than pipeline.step(S1).step(S2).step(S3).

As a user of this API I would imagine Atomic enabling behavior that is not available out of the box in pipeline syntax and behave somehow similar to CAS semantics in atomic operations:

  • Either all sub-steps succeed with ControlFlow::Ok(_), in which case the resulting payload would be equivalent to fold.
  • Or if any of the sub-steps returns non-ControlFlow::Ok(_), then the original input payload is returned as ControlFlow::Ok(_).
  • I think that people using atomic!() already expect some of the steps to fail but they don't necessarily want those steps failures to propagate to pipeline failures.

What are your thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this makes sense, we should go with what you described. I didn't think through the whole logic of Atomic too much at first


Ok(())
}

#[rblib_test(Ethereum)]
async fn atomic_macro_basic<P: TestablePlatform>() -> eyre::Result<()> {
let composite = atomic!(AlwaysOkStep, AlwaysOkStep);

let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Ok(_)));

Ok(())
}

#[rblib_test(Ethereum)]
async fn atomic_in_pipeline<P: TestablePlatform>() -> eyre::Result<()> {
let pipeline =
Pipeline::<P>::default().with_step(atomic!(AlwaysOkStep, AlwaysOkStep));

P::create_test_node(pipeline).await?.next_block().await?;

Ok(())
}
}
150 changes: 150 additions & 0 deletions src/pipelines/step/composite/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! This module defines composite steps that can be used to compose multiple
//! steps into a single step.

use {
crate::{
pipelines::step::StepInstance,
platform::types::BuiltPayload,
prelude::*,
},
std::sync::Arc,
};

pub mod atomic;

/// A composite step with a list of steps.
/// The associated mode defines the specific behavior of executing steps
pub struct CompositeStep<P: Platform, M> {
steps: Vec<Arc<StepInstance<P>>>,
mode: M,
}

impl<P: Platform, M> CompositeStep<P, M> {
pub fn new(mode: M) -> Self {
Self {
steps: Vec::new(),
mode,
}
}

pub fn append_step(&mut self, step: impl Step<P>) {
self.steps.push(Arc::new(StepInstance::new(step)));
}
}

/// A composite step mode defines the specific behavior of executing steps
/// It takes the list of steps, initial payload and context
trait CompositeStepMode<P: Platform>: Send + Sync {
fn steps(
&self,
steps: &[Arc<StepInstance<P>>],
payload: Checkpoint<P>,
ctx: StepContext<P>,
) -> impl Future<Output = ControlFlow<P>> + Send;
}

/// `CompositeStep` will execute all before/after/setup functions of each step
/// in order The step implementation is delegated to the composite mode
impl<P, M> Step<P> for CompositeStep<P, M>
where
P: Platform,
M: CompositeStepMode<P> + Send + 'static,
{
async fn step(
self: Arc<Self>,
payload: Checkpoint<P>,
ctx: StepContext<P>,
) -> ControlFlow<P> {
self.mode.steps(&self.steps, payload, ctx).await
}

async fn before_job(
self: Arc<Self>,
ctx: StepContext<P>,
) -> Result<(), PayloadBuilderError> {
for step in &self.steps {
step.before_job(ctx.clone()).await?;
}
Ok(())
}

async fn after_job(
self: Arc<Self>,
ctx: StepContext<P>,
result: Arc<Result<BuiltPayload<P>, PayloadBuilderError>>,
) -> Result<(), PayloadBuilderError> {
for step in &self.steps {
step.after_job(ctx.clone(), result.clone()).await?;
}
Ok(())
}

async fn setup(
&mut self,
init: InitContext<P>,
) -> Result<(), PayloadBuilderError> {
for step in &self.steps {
step.setup(init.clone()).await?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use {super::*, crate::test_utils::*};

// Composite step mode that tries to execute all steps and ignore all failures
// and breaks Will return the latest successful result
struct TryAllMode;
impl<P: Platform> CompositeStepMode<P> for TryAllMode {
async fn steps(
&self,
steps: &[Arc<StepInstance<P>>],
payload: Checkpoint<P>,
ctx: StepContext<P>,
) -> ControlFlow<P> {
let mut current = payload.clone();
for step in steps {
if let ControlFlow::Ok(res) =
step.step(current.clone(), ctx.clone()).await
{
current = res;
}
}
ControlFlow::Ok(current)
}
}

#[rblib_test(Ethereum)]
async fn composite_empty_steps<P: TestablePlatform>() -> eyre::Result<()> {
let composite = CompositeStep::<P, _>::new(TryAllMode);
let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Ok(_)));
Ok(())
}

#[rblib_test(Ethereum)]
async fn composite_single_step_ok<P: TestablePlatform>() -> eyre::Result<()> {
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
composite.append_step(AlwaysOkStep);

let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Ok(_)));
Ok(())
}

#[rblib_test(Ethereum)]
async fn composite_multiple_steps_ok<P: TestablePlatform>() -> eyre::Result<()>
{
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
composite.append_step(AlwaysOkStep);
composite.append_step(AlwaysOkStep);
composite.append_step(AlwaysOkStep);

let result = OneStep::<P>::new(composite).run().await?;
assert!(matches!(result, ControlFlow::Ok(_)));

Ok(())
}
}
5 changes: 3 additions & 2 deletions src/pipelines/step/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use {

/// Carries information specific to the step that is currently being executed.
///
/// An instance of this type is passed to `Step::step` method during pipeline
/// execution of steps.
/// An instance of this type is passed to the [`Step::step`] method during the
/// pipeline execution of steps.
#[derive(Debug, Clone)]
pub struct StepContext<P: Platform> {
block: BlockContext<P>,
limits: Limits,
Expand Down
30 changes: 16 additions & 14 deletions src/pipelines/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
std::sync::Arc,
};

pub mod composite;
mod context;
mod instance;
mod metrics;
Expand All @@ -30,7 +31,7 @@ pub use {context::StepContext, reth::payload::builder::PayloadBuilderError};
/// can be generic over the platform they run on or specialized for a specific
/// platform.
///
/// The instance of the step is long-lived, and it's lifetime is equal to the
/// The instance of the step is long-lived, and its lifetime is equal to the
/// lifetime of the pipeline it is part of. All invocations of the step will
/// repeatedly call into the `step` async function on the same instance.
///
Expand All @@ -45,9 +46,9 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
/// failure that will terminate the pipeline execution.
fn step(
self: Arc<Self>,
payload: Checkpoint<P>,
ctx: StepContext<P>,
) -> impl Future<Output = ControlFlow<P>> + Send + Sync;
_payload: Checkpoint<P>,
_ctx: StepContext<P>,
) -> impl Future<Output = ControlFlow<P>> + Send;

/// This function is called once per new payload job before any steps are
/// executed. It can be used by steps to perform any optional initialization
Expand All @@ -57,8 +58,8 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
/// terminated immediately and no steps will be executed.
fn before_job(
self: Arc<Self>,
_: StepContext<P>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
_ctx: StepContext<P>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
async { Ok(()) }
}

Expand All @@ -70,18 +71,18 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
/// and will not produce a valid payload.
fn after_job(
self: Arc<Self>,
_: StepContext<P>,
_: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
_ctx: StepContext<P>,
_result: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
async { Ok(()) }
}

/// Optional setup function called exactly once when a pipeline is
/// instantiated as a payload builder service, before any payload jobs run.
fn setup(
&mut self,
_: InitContext<P>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
_init: InitContext<P>,
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
async { Ok(()) }
}
}
Expand All @@ -103,10 +104,10 @@ pub enum ControlFlow<P: Platform> {
/// Stops the pipeline execution that contains the step with a payload.
///
/// If the step is inside a `Loop` sub-pipeline, it will stop the loop,
/// run its epilogue (if it exists) and progress to next steps in the parent
/// pipeline.
/// run its epilogue (if it exists) and progress to the next steps in the
/// parent pipeline.
///
/// Breaking out of a prologue step will not invoke any step in the pipeline,
/// Breaking out of a prologue step will not invoke any step in the pipeline
/// and jump straight to the epilogue.
///
/// Breaking out of an epilogue has the same effect as returning Ok from it,
Expand Down Expand Up @@ -156,6 +157,7 @@ impl<P: Platform> ControlFlow<P> {
}

/// Context for the optional setup function of a step.
#[derive(Clone)]
pub struct InitContext<P: Platform> {
metrics_scope: String,
provider: Arc<dyn StateProviderFactory>,
Expand Down
Loading