Skip to content

Commit 012c3f9

Browse files
committed
feat: CompositeStep
1 parent 56acf48 commit 012c3f9

File tree

4 files changed

+176
-18
lines changed

4 files changed

+176
-18
lines changed

src/pipelines/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@ pub use {
3030
Behavior::{Loop, Once},
3131
events::system_events::*,
3232
limits::*,
33-
step::{ControlFlow, InitContext, PayloadBuilderError, Step, StepContext},
33+
step::{
34+
ControlFlow,
35+
InitContext,
36+
PayloadBuilderError,
37+
Step,
38+
StepContext,
39+
composite,
40+
},
3441
};
3542

3643
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -122,7 +129,7 @@ impl<P: Platform> Pipeline<P> {
122129
/// Sets payload building limits for the pipeline.
123130
///
124131
/// Here we can either use an instance of `LimitsFactory` that generates
125-
/// limits dynamically according to a user-defined logic, or we can use a
132+
/// limits dynamically, according to a user-defined logic, or we can use a
126133
/// fixed `Limits` instance.
127134
#[must_use]
128135
pub fn with_limits<T, L: IntoScopedLimits<P, T>>(self, limits: L) -> Self {
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
//! This module defines composite steps that can be used to compose multiple
2+
//! steps into a single step.
3+
4+
use {
5+
crate::{
6+
pipelines::step::StepInstance,
7+
platform::types::BuiltPayload,
8+
prelude::*,
9+
},
10+
std::sync::Arc,
11+
};
12+
13+
/// A composite step with a list of steps.
14+
/// The associated mode defines the specific behavior of executing steps
15+
pub struct CompositeStep<P: Platform, M> {
16+
steps: Vec<Arc<StepInstance<P>>>,
17+
mode: M,
18+
}
19+
20+
impl<P: Platform, M> CompositeStep<P, M> {
21+
pub fn new(mode: M) -> Self {
22+
Self {
23+
steps: Vec::new(),
24+
mode,
25+
}
26+
}
27+
28+
pub fn append_step(&mut self, step: impl Step<P>) {
29+
self.steps.push(Arc::new(StepInstance::new(step)));
30+
}
31+
}
32+
33+
/// A composite step mode defines the specific behavior of executing steps
34+
/// It takes the list of steps, initial payload and context
35+
trait CompositeStepMode<P: Platform>: Send + Sync {
36+
fn steps(
37+
&self,
38+
steps: &[Arc<StepInstance<P>>],
39+
payload: Checkpoint<P>,
40+
ctx: StepContext<P>,
41+
) -> impl Future<Output = ControlFlow<P>> + Send;
42+
}
43+
44+
/// CompositeStep will execute all before/after/setup functions of each step in
45+
/// order The step implementation is delegated to the composite mode
46+
impl<P, M> Step<P> for CompositeStep<P, M>
47+
where
48+
P: Platform,
49+
M: CompositeStepMode<P> + Send + 'static,
50+
{
51+
async fn step(
52+
self: Arc<Self>,
53+
payload: Checkpoint<P>,
54+
ctx: StepContext<P>,
55+
) -> ControlFlow<P> {
56+
self.mode.steps(&self.steps, payload, ctx).await
57+
}
58+
59+
async fn before_job(
60+
self: Arc<Self>,
61+
ctx: StepContext<P>,
62+
) -> Result<(), PayloadBuilderError> {
63+
for step in &self.steps {
64+
step.before_job(ctx.clone()).await?;
65+
}
66+
Ok(())
67+
}
68+
69+
async fn after_job(
70+
self: Arc<Self>,
71+
ctx: StepContext<P>,
72+
result: Arc<Result<BuiltPayload<P>, PayloadBuilderError>>,
73+
) -> Result<(), PayloadBuilderError> {
74+
for step in &self.steps {
75+
step.after_job(ctx.clone(), result.clone()).await?;
76+
}
77+
Ok(())
78+
}
79+
80+
async fn setup(
81+
&mut self,
82+
init: InitContext<P>,
83+
) -> Result<(), PayloadBuilderError> {
84+
for step in &self.steps {
85+
step.setup(init.clone()).await?;
86+
}
87+
Ok(())
88+
}
89+
}
90+
91+
#[cfg(test)]
92+
mod tests {
93+
use {super::*, crate::test_utils::*};
94+
95+
// Composite step mode that tries to execute all steps and ignore all failures
96+
// and breaks Will return the latest successful result
97+
struct TryAllMode;
98+
impl<P: Platform> CompositeStepMode<P> for TryAllMode {
99+
async fn steps(
100+
&self,
101+
steps: &[Arc<StepInstance<P>>],
102+
payload: Checkpoint<P>,
103+
ctx: StepContext<P>,
104+
) -> ControlFlow<P> {
105+
let mut current = payload.clone();
106+
for step in steps {
107+
if let ControlFlow::Ok(res) =
108+
step.step(current.clone(), ctx.clone()).await
109+
{
110+
current = res
111+
}
112+
}
113+
ControlFlow::Ok(current)
114+
}
115+
}
116+
117+
#[rblib_test(Ethereum)]
118+
async fn composite_empty_steps<P: TestablePlatform>() -> eyre::Result<()> {
119+
let composite = CompositeStep::<P, _>::new(TryAllMode);
120+
let result = OneStep::<P>::new(composite).run().await?;
121+
assert!(matches!(result, ControlFlow::Ok(_)));
122+
Ok(())
123+
}
124+
125+
#[rblib_test(Ethereum)]
126+
async fn composite_single_step_ok<P: TestablePlatform>() -> eyre::Result<()> {
127+
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
128+
composite.append_step(AlwaysOkStep);
129+
130+
let result = OneStep::<P>::new(composite).run().await?;
131+
assert!(matches!(result, ControlFlow::Ok(_)));
132+
Ok(())
133+
}
134+
135+
#[rblib_test(Ethereum)]
136+
async fn composite_multiple_steps_ok<P: TestablePlatform>() -> eyre::Result<()>
137+
{
138+
let mut composite = CompositeStep::<P, _>::new(TryAllMode);
139+
composite.append_step(AlwaysOkStep);
140+
composite.append_step(AlwaysOkStep);
141+
composite.append_step(AlwaysOkStep);
142+
143+
let result = OneStep::<P>::new(composite).run().await?;
144+
assert!(matches!(result, ControlFlow::Ok(_)));
145+
146+
Ok(())
147+
}
148+
}

src/pipelines/step/context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use {
1111

1212
/// Carries information specific to the step that is currently being executed.
1313
///
14-
/// An instance of this type is passed to `Step::step` method during pipeline
15-
/// execution of steps.
14+
/// An instance of this type is passed to the [`Step::step`] method during the
15+
/// pipeline execution of steps.
16+
#[derive(Debug, Clone)]
1617
pub struct StepContext<P: Platform> {
1718
block: BlockContext<P>,
1819
limits: Limits,

src/pipelines/step/mod.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use {
55
std::sync::Arc,
66
};
77

8+
pub mod composite;
89
mod context;
910
mod instance;
1011
mod metrics;
@@ -30,7 +31,7 @@ pub use {context::StepContext, reth::payload::builder::PayloadBuilderError};
3031
/// can be generic over the platform they run on or specialized for a specific
3132
/// platform.
3233
///
33-
/// The instance of the step is long-lived, and it's lifetime is equal to the
34+
/// The instance of the step is long-lived, and its lifetime is equal to the
3435
/// lifetime of the pipeline it is part of. All invocations of the step will
3536
/// repeatedly call into the `step` async function on the same instance.
3637
///
@@ -45,9 +46,9 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
4546
/// failure that will terminate the pipeline execution.
4647
fn step(
4748
self: Arc<Self>,
48-
payload: Checkpoint<P>,
49-
ctx: StepContext<P>,
50-
) -> impl Future<Output = ControlFlow<P>> + Send + Sync;
49+
_payload: Checkpoint<P>,
50+
_ctx: StepContext<P>,
51+
) -> impl Future<Output = ControlFlow<P>> + Send;
5152

5253
/// This function is called once per new payload job before any steps are
5354
/// executed. It can be used by steps to perform any optional initialization
@@ -57,8 +58,8 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
5758
/// terminated immediately and no steps will be executed.
5859
fn before_job(
5960
self: Arc<Self>,
60-
_: StepContext<P>,
61-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
61+
_ctx: StepContext<P>,
62+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
6263
async { Ok(()) }
6364
}
6465

@@ -70,18 +71,18 @@ pub trait Step<P: Platform>: Send + Sync + 'static {
7071
/// and will not produce a valid payload.
7172
fn after_job(
7273
self: Arc<Self>,
73-
_: StepContext<P>,
74-
_: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
75-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
74+
_ctx: StepContext<P>,
75+
_result: Arc<Result<types::BuiltPayload<P>, PayloadBuilderError>>,
76+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
7677
async { Ok(()) }
7778
}
7879

7980
/// Optional setup function called exactly once when a pipeline is
8081
/// instantiated as a payload builder service, before any payload jobs run.
8182
fn setup(
8283
&mut self,
83-
_: InitContext<P>,
84-
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send + Sync {
84+
_init: InitContext<P>,
85+
) -> impl Future<Output = Result<(), PayloadBuilderError>> + Send {
8586
async { Ok(()) }
8687
}
8788
}
@@ -103,10 +104,10 @@ pub enum ControlFlow<P: Platform> {
103104
/// Stops the pipeline execution that contains the step with a payload.
104105
///
105106
/// If the step is inside a `Loop` sub-pipeline, it will stop the loop,
106-
/// run its epilogue (if it exists) and progress to next steps in the parent
107-
/// pipeline.
107+
/// run its epilogue (if it exists) and progress to the next steps in the
108+
/// parent pipeline.
108109
///
109-
/// Breaking out of a prologue step will not invoke any step in the pipeline,
110+
/// Breaking out of a prologue step will not invoke any step in the pipeline
110111
/// and jump straight to the epilogue.
111112
///
112113
/// Breaking out of an epilogue has the same effect as returning Ok from it,
@@ -156,6 +157,7 @@ impl<P: Platform> ControlFlow<P> {
156157
}
157158

158159
/// Context for the optional setup function of a step.
160+
#[derive(Clone)]
159161
pub struct InitContext<P: Platform> {
160162
metrics_scope: String,
161163
provider: Arc<dyn StateProviderFactory>,

0 commit comments

Comments
 (0)