Skip to content

Commit 9023027

Browse files
committed
feat: CompositeStep
1 parent 56acf48 commit 9023027

File tree

4 files changed

+174
-18
lines changed

4 files changed

+174
-18
lines changed

src/pipelines/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@ pub use {
3030
Behavior::{Loop, Once},
3131
events::system_events::*,
3232
limits::*,
33-
step::{ControlFlow, InitContext, PayloadBuilderError, Step, StepContext},
33+
step::{
34+
ControlFlow,
35+
InitContext,
36+
PayloadBuilderError,
37+
Step,
38+
StepContext,
39+
composite,
40+
},
3441
};
3542

3643
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -122,7 +129,7 @@ impl<P: Platform> Pipeline<P> {
122129
/// Sets payload building limits for the pipeline.
123130
///
124131
/// Here we can either use an instance of `LimitsFactory` that generates
125-
/// limits dynamically according to a user-defined logic, or we can use a
132+
/// limits dynamically, according to a user-defined logic, or we can use a
126133
/// fixed `Limits` instance.
127134
#[must_use]
128135
pub fn with_limits<T, L: IntoScopedLimits<P, T>>(self, limits: L) -> Self {
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//! This module defines composite steps that can be used to compose multiple
2+
//! steps into a single step.
3+
4+
use {
5+
crate::{
6+
pipelines::step::StepInstance, platform::types::BuiltPayload, prelude::*,
7+
},
8+
std::sync::Arc,
9+
};
10+
11+
/// A composite step with a list of steps.
12+
/// The associated mode defines the specific behavior of executing steps
13+
pub struct CompositeStep<P: Platform, M> {
14+
steps: Vec<Arc<StepInstance<P>>>,
15+
mode: M,
16+
}
17+
18+
impl<P: Platform, M> CompositeStep<P, M> {
19+
pub fn new(mode: M) -> Self {
20+
Self {
21+
steps: Vec::new(),
22+
mode,
23+
}
24+
}
25+
26+
pub fn append_step(&mut self, step: impl Step<P>) {
27+
self.steps.push(Arc::new(StepInstance::new(step)));
28+
}
29+
}
30+
31+
/// A composite step mode defines the specific behavior of executing steps
32+
/// It takes the list of steps, initial payload and context
33+
trait CompositeStepMode<P: Platform>: Send + Sync {
34+
fn steps(
35+
&self,
36+
steps: &[Arc<StepInstance<P>>],
37+
payload: Checkpoint<P>,
38+
ctx: StepContext<P>,
39+
) -> impl Future<Output = ControlFlow<P>> + Send;
40+
}
41+
42+
/// `CompositeStep` will execute all before/after/setup functions of each step in
43+
/// order The step implementation is delegated to the composite mode
44+
impl<P, M> Step<P> for CompositeStep<P, M>
45+
where
46+
P: Platform,
47+
M: CompositeStepMode<P> + Send + 'static,
48+
{
49+
async fn step(
50+
self: Arc<Self>,
51+
payload: Checkpoint<P>,
52+
ctx: StepContext<P>,
53+
) -> ControlFlow<P> {
54+
self.mode.steps(&self.steps, payload, ctx).await
55+
}
56+
57+
async fn before_job(
58+
self: Arc<Self>,
59+
ctx: StepContext<P>,
60+
) -> Result<(), PayloadBuilderError> {
61+
for step in &self.steps {
62+
step.before_job(ctx.clone()).await?;
63+
}
64+
Ok(())
65+
}
66+
67+
async fn after_job(
68+
self: Arc<Self>,
69+
ctx: StepContext<P>,
70+
result: Arc<Result<BuiltPayload<P>, PayloadBuilderError>>,
71+
) -> Result<(), PayloadBuilderError> {
72+
for step in &self.steps {
73+
step.after_job(ctx.clone(), result.clone()).await?;
74+
}
75+
Ok(())
76+
}
77+
78+
async fn setup(
79+
&mut self,
80+
init: InitContext<P>,
81+
) -> Result<(), PayloadBuilderError> {
82+
for step in &self.steps {
83+
step.setup(init.clone()).await?;
84+
}
85+
Ok(())
86+
}
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use {super::*, crate::test_utils::*};
92+
93+
// Composite step mode that tries to execute all steps and ignore all failures
94+
// and breaks Will return the latest successful result
95+
struct TryAllMode;
96+
impl<P: Platform> CompositeStepMode<P> for TryAllMode {
97+
async fn steps(
98+
&self,
99+
steps: &[Arc<StepInstance<P>>],
100+
payload: Checkpoint<P>,
101+
ctx: StepContext<P>,
102+
) -> ControlFlow<P> {
103+
let mut current = payload.clone();
104+
for step in steps {
105+
if let ControlFlow::Ok(res) =
106+
step.step(current.clone(), ctx.clone()).await
107+
{
108+
current = res;
109+
}
110+
}
111+
ControlFlow::Ok(current)
112+
}
113+
}
114+
115+
#[rblib_test(Ethereum)]
116+
async fn composite_empty_steps<P: TestablePlatform>() -> eyre::Result<()> {
117+
let composite = CompositeStep::<P, _>::new(TryAllMode);
118+
let result = OneStep::<P>::new(composite).run().await?;
119+
assert!(matches!(result, ControlFlow::Ok(_)));
120+
Ok(())
121+
}
122+
123+
#[rblib_test(Ethereum)]
124+
async fn composite_single_step_ok<P: TestablePlatform>() -> eyre::Result<()> {
125+
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
126+
composite.append_step(AlwaysOkStep);
127+
128+
let result = OneStep::<P>::new(composite).run().await?;
129+
assert!(matches!(result, ControlFlow::Ok(_)));
130+
Ok(())
131+
}
132+
133+
#[rblib_test(Ethereum)]
134+
async fn composite_multiple_steps_ok<P: TestablePlatform>() -> eyre::Result<()>
135+
{
136+
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
137+
composite.append_step(AlwaysOkStep);
138+
composite.append_step(AlwaysOkStep);
139+
composite.append_step(AlwaysOkStep);
140+
141+
let result = OneStep::<P>::new(composite).run().await?;
142+
assert!(matches!(result, ControlFlow::Ok(_)));
143+
144+
Ok(())
145+
}
146+
}

src/pipelines/step/context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use {
1111

1212
/// Carries information specific to the step that is currently being executed.
1313
///
14-
/// An instance of this type is passed to `Step::step` method during pipeline
15-
/// execution of steps.
14+
/// An instance of this type is passed to the [`Step::step`] method during the
15+
/// pipeline execution of steps.
16+
#[derive(Debug, Clone)]
1617
pub struct StepContext<P: Platform> {
1718
block: BlockContext<P>,
1819
limits: Limits,

src/pipelines/step/mod.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use {
55
std::sync::Arc,
66
};
77

8+
pub mod composite;
89
mod context;
910
mod instance;
1011
mod metrics;
@@ -30,7 +31,7 @@ pub use {context::StepContext, reth::payload::builder::PayloadBuilderError};
3031
/// can be generic over the platform they run on or specialized for a specific
3132
/// platform.
3233
///
33-
/// The instance of the step is long-lived, and it's lifetime is equal to the
34+
/// The instance of the step is long-lived, and its lifetime is equal to the
3435
/// lifetime of the pipeline it is part of. All invocations of the step will
3536
/// repeatedly call into the `step` async function on the same instance.
3637
///
@@ -45,9 +46,9 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
4546
/// failure that will terminate the pipeline execution.
4647
fn step(
4748
self: Arc<Self>,
48-
payload: Checkpoint<P>,
49-
ctx: StepContext<P>,
50-
) -> impl Future<Output = ControlFlow<P>> + Send + Sync;
49+
_payload: Checkpoint<P>,
50+
_ctx: StepContext<P>,
51+
) -> impl Future<Output = ControlFlow<P>> + Send;
5152

5253
/// This function is called once per new payload job before any steps are
5354
/// executed. It can be used by steps to perform any optional initialization
@@ -57,8 +58,8 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
5758
/// terminated immediately and no steps will be executed.
5859
fn before_job(
5960
self: Arc<Self>,
60-
_: StepContext<P>,
61-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
61+
_ctx: StepContext<P>,
62+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
6263
async { Ok(()) }
6364
}
6465

@@ -70,18 +71,18 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
7071
/// and will not produce a valid payload.
7172
fn after_job(
7273
self: Arc<Self>,
73-
_: StepContext<P>,
74-
_: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
75-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
74+
_ctx: StepContext<P>,
75+
_result: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
76+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
7677
async { Ok(()) }
7778
}
7879

7980
/// Optional setup function called exactly once when a pipeline is
8081
/// instantiated as a payload builder service, before any payload jobs run.
8182
fn setup(
8283
&mut self,
83-
_: InitContext<P>,
84-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
84+
_init: InitContext<P>,
85+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
8586
async { Ok(()) }
8687
}
8788
}
@@ -103,10 +104,10 @@ pub enum ControlFlow<P: Platform> {
103104
/// Stops the pipeline execution that contains the step with a payload.
104105
///
105106
/// If the step is inside a `Loop` sub-pipeline, it will stop the loop,
106-
/// run its epilogue (if it exists) and progress to next steps in the parent
107-
/// pipeline.
107+
/// run its epilogue (if it exists) and progress to the next steps in the
108+
/// parent pipeline.
108109
///
109-
/// Breaking out of a prologue step will not invoke any step in the pipeline,
110+
/// Breaking out of a prologue step will not invoke any step in the pipeline
110111
/// and jump straight to the epilogue.
111112
///
112113
/// Breaking out of an epilogue has the same effect as returning Ok from it,
@@ -156,6 +157,7 @@ impl<P: Platform> ControlFlow<P> {
156157
}
157158

158159
/// Context for the optional setup function of a step.
160+
#[derive(Clone)]
159161
pub struct InitContext<P: Platform> {
160162
metrics_scope: String,
161163
provider: Arc<dyn StateProviderFactory>,

0 commit comments

Comments
 (0)