Skip to content

Commit

Permalink
feat(engine-util): reorg interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Aug 7, 2024
1 parent cea8b7a commit fc4cf05
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 15 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions crates/engine/util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,41 @@ workspace = true

[dependencies]
# reth
reth-primitives.workspace = true
reth-errors.workspace = true
reth-fs-util.workspace = true
reth-rpc.workspace = true
reth-rpc-types.workspace = true
reth-rpc-types-compat.workspace = true
reth-engine-primitives.workspace = true
reth-beacon-consensus.workspace = true
reth-payload-validator.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true
reth-provider.workspace = true
reth-ethereum-forks.workspace = true
revm-primitives.workspace = true

# async
tokio = { workspace = true, default-features = false }
tokio-util.workspace = true
pin-project.workspace = true

# misc
eyre.workspace = true
futures.workspace = true

# io
serde.workspace = true
serde_json.workspace = true

# misc
eyre.workspace = true
itertools.workspace = true

# tracing
tracing.workspace = true

# async
futures.workspace = true

[features]
optimism = [
"reth-rpc/optimism",
"reth-beacon-consensus/optimism",
"reth-ethereum-forks/optimism"
]
6 changes: 3 additions & 3 deletions crates/engine/util/src/engine_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ impl<S> EngineStoreStream<S> {
}
}

impl<Engine, S> Stream for EngineStoreStream<S>
impl<S, Engine> Stream for EngineStoreStream<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
Engine: EngineTypes,
{
type Item = S::Item;

Expand All @@ -146,7 +146,7 @@ where
let next = ready!(this.stream.poll_next_unpin(cx));
if let Some(msg) = &next {
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
error!(target: "engine::stream::store", ?msg, %error, "Error handling Engine API message");
}
}
Poll::Ready(next)
Expand Down
43 changes: 43 additions & 0 deletions crates/engine/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use futures::Stream;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_payload_validator::ExecutionPayloadValidator;
use std::path::PathBuf;
use tokio_util::either::Either;

Expand All @@ -15,6 +16,9 @@ use skip_fcu::EngineSkipFcu;
pub mod skip_new_payload;
use skip_new_payload::EngineSkipNewPayload;

pub mod reorg;
use reorg::EngineReorg;

/// The collection of stream extensions for engine API message stream.
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Stream<Item = BeaconEngineMessage<Engine>>
Expand Down Expand Up @@ -89,6 +93,45 @@ pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Either::Right(self)
}
}

/// Creates reorgs with specified frequency.
fn reorg<Provider, Evm>(
self,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: usize,
) -> EngineReorg<Self, Engine, Provider, Evm>
where
Self: Sized,
{
EngineReorg::new(self, provider, evm_config, payload_validator, frequency)
}

/// If frequency is [Some], returns the stream that creates reorgs with
/// specified frequency. Otherwise, returns `Self`.
fn maybe_reorg<Provider, Evm>(
self,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator,
frequency: Option<usize>,
) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
where
Self: Sized,
{
if let Some(frequency) = frequency {
Either::Left(reorg::EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
))
} else {
Either::Right(self)
}
}
}

impl<Engine, T> EngineMessageStreamExt<Engine> for T
Expand Down
Loading

0 comments on commit fc4cf05

Please sign in to comment.