Skip to content

Commit fd3fb02

Browse files
committed
Reworked payload limits handling
1 parent 715871e commit fd3fb02

File tree

16 files changed

+827
-691
lines changed

16 files changed

+827
-691
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "2"
88

99
[workspace.package]
10-
version = "0.3.0"
10+
version = "0.3.1"
1111
edition = "2024"
1212
rust-version = "1.87"
1313
license = "MIT"

src/payload/checkpoint.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use {
1717
state::{AccountInfo, Bytecode},
1818
},
1919
},
20-
std::sync::Arc,
20+
std::{sync::Arc, time::Instant},
2121
thiserror::Error,
2222
};
2323

@@ -84,8 +84,14 @@ impl<P: Platform> Checkpoint<P> {
8484
self.inner.depth
8585
}
8686

87+
/// Returns the timestamp when this checkpoint was created.
88+
pub fn created_at(&self) -> Instant {
89+
self.inner.created_at
90+
}
91+
8792
/// The returns the payload version before the current checkpoint.
88-
/// Working with the previous checkpoint is equivalent to discarding the
93+
///
94+
/// Using the previous checkpoint is equivalent to discarding the
8995
/// state mutations made in the current checkpoint.
9096
///
9197
/// There may be multiple parallel forks of the payload under construction,
@@ -181,6 +187,7 @@ impl<P: Platform> Checkpoint<P> {
181187
.try_into_executable()?
182188
.execute(self.block(), self)?,
183189
),
190+
created_at: Instant::now(),
184191
}),
185192
})
186193
}
@@ -196,6 +203,7 @@ impl<P: Platform> Checkpoint<P> {
196203
prev: Some(Arc::clone(&self.inner)),
197204
depth: self.inner.depth + 1,
198205
mutation: Mutation::Barrier,
206+
created_at: Instant::now(),
199207
}),
200208
}
201209
}
@@ -213,6 +221,7 @@ impl<P: Platform> Checkpoint<P> {
213221
prev: None,
214222
depth: 0,
215223
mutation: Mutation::Barrier,
224+
created_at: Instant::now(),
216225
}),
217226
}
218227
}
@@ -257,6 +266,9 @@ struct CheckpointInner<P: Platform> {
257266

258267
/// The mutation
259268
mutation: Mutation<P>,
269+
270+
/// The timestamp when this checkpoint was created.
271+
created_at: Instant,
260272
}
261273

262274
/// Converts a checkpoint into a vector of transactions that were applied to

src/payload/ext/checkpoint.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use {
1313
},
1414
},
1515
itertools::Itertools,
16+
std::time::Instant,
1617
};
1718

1819
/// Quality of Life extensions for the `Checkpoint` type.
@@ -123,6 +124,12 @@ pub trait CheckpointExt<P: Platform>: super::sealed::Sealed {
123124

124125
/// Returns true if the checkpoint represents a bundle of transactions.
125126
fn is_bundle(&self) -> bool;
127+
128+
/// Returns the timestamp of initial checkpoint that was created at the very
129+
/// beginning of the payload building process. It is the timestamp if the
130+
/// first checkpoint in the history of this checkpoint that is created
131+
/// by `Checkpoint::new_at_block`.
132+
fn building_since(&self) -> Instant;
126133
}
127134

128135
impl<P: Platform> CheckpointExt<P> for Checkpoint<P> {
@@ -272,4 +279,16 @@ impl<P: Platform> CheckpointExt<P> for Checkpoint<P> {
272279
fn is_bundle(&self) -> bool {
273280
self.as_bundle().is_some()
274281
}
282+
283+
/// Returns the timestamp of initial checkpoint that was created at the very
284+
/// beginning of the payload building process. It is the timestamp if the
285+
/// first barrier checkpoint in the history of this checkpoint that is created
286+
/// by `Checkpoint::new_at_block`.
287+
fn building_since(&self) -> Instant {
288+
let mut created_at = self.created_at();
289+
while let Some(prev) = self.prev() {
290+
created_at = prev.created_at();
291+
}
292+
created_at
293+
}
275294
}

src/pipelines/exec/mod.rs

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ where
5757

5858
/// Execution scopes. This root scope represents the top-level pipeline that
5959
/// may contain nested scopes for each nested pipeline.
60-
scope: Arc<RootScope>,
60+
scope: Arc<RootScope<P>>,
6161
}
6262

6363
impl<P: Platform, Provider: traits::ProviderBounds<P>>
@@ -76,8 +76,12 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
7676
.metrics()
7777
.record_payload_job_attributes::<P>(block.attributes());
7878

79+
// Create the initial payload checkpoint, this will implicitly capture the
80+
// time we started executing the pipeline for this payload job.
81+
let checkpoint = block.start();
82+
7983
// initialize pipeline scopes
80-
let root = Arc::new(RootScope::new(&pipeline, &block));
84+
let root = Arc::new(RootScope::new(&pipeline, &checkpoint));
8185

8286
// Initially set the execution cursor to initializing state, that will call
8387
// all `before_job` methods of the steps in the pipeline.
@@ -88,19 +92,17 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
8892
let scope = Arc::clone(&root);
8993

9094
async move {
91-
// enter the scope of the root pipeline
92-
scope.enter();
93-
9495
for step in pipeline.iter_steps() {
9596
let navi = step.navigator(&pipeline).expect(
9697
"Invalid step path. This is a bug in the pipeline executor \
9798
implementation.",
9899
);
99-
let scope = scope.of(&step).expect("invalid step path");
100-
let ctx = StepContext::new(&block, &navi, scope);
100+
let limits = scope.limits_of(&step).expect("invalid step path");
101+
let ctx = StepContext::new(&block, &navi, limits, None);
101102
navi.instance().before_job(ctx).await?;
102103
}
103-
Ok(())
104+
105+
Ok(checkpoint)
104106
}
105107
.boxed()
106108
}),
@@ -133,18 +135,19 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
133135
path: &StepPath,
134136
input: Checkpoint<P>,
135137
) -> Pin<Box<dyn Future<Output = ControlFlow<P>> + Send>> {
136-
let scope = self.scope.of(path).expect(
138+
let limits = self.scope.limits_of(path).expect(
137139
"Invalid step path. This is a bug in the pipeline executor \
138140
implementation.",
139141
);
140142

141-
let step_navi = path.navigator(&self.pipeline).expect(
143+
let navi = path.navigator(&self.pipeline).expect(
142144
"Invalid step path. This is a bug in the pipeline executor \
143145
implementation.",
144146
);
145147

146-
let ctx = StepContext::new(&self.block, &step_navi, scope);
147-
let step = Arc::clone(step_navi.instance());
148+
let entered_at = self.scope.entered_at(path);
149+
let ctx = StepContext::new(&self.block, &navi, limits, entered_at);
150+
let step = Arc::clone(navi.instance());
148151
async move { step.step(input, ctx).await }.boxed()
149152
}
150153

@@ -194,13 +197,17 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
194197
// there is a next step to be executed, create a cursor that will
195198
// start running the next step with the output of the current step
196199
// as input on next executor future poll
197-
self.scope.switch_context(step.path());
200+
201+
// enter the scope of the next step
202+
self.scope.switch_context(step.path(), &input);
203+
204+
// schedule execution on next future poll
198205
Cursor::BeforeStep(step.into(), input)
199206
}
200207

201208
/// After pipeline steps are initialized, this method will identify the first
202209
/// step to execute in the pipeline and prepare the cursor to run it.
203-
fn first_step(&self) -> Cursor<P> {
210+
fn first_step(&self, checkpoint: Checkpoint<P>) -> Cursor<P> {
204211
let Some(navigator) = StepNavigator::entrypoint(&self.pipeline) else {
205212
debug!(
206213
"empty pipeline, building empty payload for attributes: {:?}",
@@ -215,7 +222,11 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
215222
);
216223
};
217224

218-
Cursor::BeforeStep(navigator.into(), self.block.start())
225+
// enter the scope of the root pipeline
226+
self.scope.enter(&checkpoint);
227+
228+
// Begin executing the first step of the pipeline in the next future poll
229+
Cursor::BeforeStep(navigator.into(), checkpoint)
219230
}
220231

221232
/// This method will walk through the pipeline steps and invoke the
@@ -239,13 +250,16 @@ impl<P: Platform, Provider: traits::ProviderBounds<P>>
239250
"Invalid step path. This is a bug in the pipeline executor \
240251
implementation.",
241252
);
242-
let scope = scope.of(&step).expect("invalid step path");
243-
let ctx = StepContext::new(&block, &navi, scope);
253+
let limits = scope.limits_of(&step).expect("invalid step path");
254+
let ctx = StepContext::new(&block, &navi, limits, None);
244255
navi.instance().after_job(ctx, output.clone()).await?;
245256
}
246257

247-
// leave the scope of the root pipeline
248-
scope.leave();
258+
// leave the scope of the root pipeline (if entered, we never enter the
259+
// root scope only in empty pipelines).
260+
if scope.is_active() {
261+
scope.leave();
262+
}
249263

250264
Arc::into_inner(output)
251265
.expect("unexpected > 1 strong reference count")
@@ -270,9 +284,9 @@ where
270284
if let Cursor::Initializing(ref mut future) = executor.cursor {
271285
if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) {
272286
match output {
273-
Ok(()) => {
287+
Ok(checkpoint) => {
274288
trace!("{} initialized successfully", executor.pipeline);
275-
executor.cursor = executor.first_step();
289+
executor.cursor = executor.first_step(checkpoint);
276290
}
277291
Err(error) => {
278292
trace!(
@@ -393,7 +407,11 @@ enum Cursor<P: Platform> {
393407
/// This happens once before any step is executed and it calls the
394408
/// `before_job` method of each step in the pipeline.
395409
Initializing(
396-
Pin<Box<dyn Future<Output = Result<(), PayloadBuilderError>> + Send>>,
410+
Pin<
411+
Box<
412+
dyn Future<Output = Result<Checkpoint<P>, PayloadBuilderError>> + Send,
413+
>,
414+
>,
397415
),
398416

399417
/// This state occurs after the `Completed` state is reached. It calls

0 commit comments

Comments
 (0)