Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce EngineApiRequest type #10158

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
download::{BlockDownloader, DownloadAction, DownloadOutcome},
};
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_chain_state::ExecutedBlock;
use reth_engine_primitives::EngineTypes;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -217,6 +219,21 @@ pub enum EngineApiKind {
OpStack,
}

/// The request variants that the engine API handler can receive.
#[derive(Debug)]
pub enum EngineApiRequest<T: EngineTypes> {
/// A request received from the consensus engine.
Beacon(BeaconEngineMessage<T>),
/// Request to insert an already executed block, e.g. via payload building.
InsertExecutedBlock(ExecutedBlock),
}

impl<T: EngineTypes> From<BeaconEngineMessage<T>> for EngineApiRequest<T> {
fn from(msg: BeaconEngineMessage<T>) -> Self {
Self::Beacon(msg)
}
}

/// Events emitted by the engine API handler.
#[derive(Debug)]
pub enum EngineApiEvent {
Expand Down
118 changes: 69 additions & 49 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tracing::*;

mod config;
mod metrics;
use crate::tree::metrics::EngineApiMetrics;
use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics};
pub use config::TreeConfig;

/// Keeps track of the state of the tree.
Expand Down Expand Up @@ -373,9 +373,9 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
/// them one by one so that we can handle incoming engine API in between and don't become
/// unresponsive. This can happen during live sync transition where we're trying to close the
/// gap (up to 3 epochs of blocks in the worst case).
incoming_tx: Sender<FromEngine<BeaconEngineMessage<T>>>,
incoming_tx: Sender<FromEngine<EngineApiRequest<T>>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
incoming: Receiver<FromEngine<EngineApiRequest<T>>>,
/// Outgoing events that are emitted to the handler.
outgoing: UnboundedSender<EngineApiEvent>,
/// Channels to the persistence layer.
Expand Down Expand Up @@ -452,7 +452,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
) -> (Sender<FromEngine<BeaconEngineMessage<T>>>, UnboundedReceiver<EngineApiEvent>) {
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();

Expand Down Expand Up @@ -488,7 +488,7 @@ where
}

/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<BeaconEngineMessage<T>>> {
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T>>> {
self.incoming_tx.clone()
}

Expand Down Expand Up @@ -804,7 +804,7 @@ where
/// Returns an error if the engine channel is disconnected.
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<BeaconEngineMessage<T>>>, RecvError> {
) -> Result<Option<FromEngine<EngineApiRequest<T>>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
Expand Down Expand Up @@ -868,7 +868,7 @@ where
}

/// Handles a message from the engine.
fn on_engine_message(&mut self, msg: FromEngine<BeaconEngineMessage<T>>) {
fn on_engine_message(&mut self, msg: FromEngine<EngineApiRequest<T>>) {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncStarted => {
Expand All @@ -879,43 +879,58 @@ where
self.on_backfill_sync_finished(ctrl);
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);

if let Ok(res) = &mut output {
// track last received forkchoice state
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());

// emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));

// handle the event if any
self.on_maybe_tree_event(res.event.take());
FromEngine::Request(request) => {
match request {
EngineApiRequest::InsertExecutedBlock(block) => {
self.state.tree_state.insert_executed(block);
}

if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e))
})) {
error!("Failed to send event: {err:?}");
EngineApiRequest::Beacon(request) => {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);

if let Ok(res) = &mut output {
// track last received forkchoice state
self.state
.forkchoice_state_tracker
.set_latest(state, res.outcome.forkchoice_status());

// emit an event about the handled FCU
self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
res.outcome.forkchoice_status(),
));

// handle the event if any
self.on_maybe_tree_event(res.event.take());
}

if let Err(err) =
tx.send(output.map(|o| o.outcome).map_err(Into::into))
{
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(
Box::new(e),
)
})) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
// triggering this hook will record that we received a request from
// the CL
self.canonical_in_memory_state
.on_transition_configuration_exchanged();
}
}
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
// triggering this hook will record that we received a request from the CL
self.canonical_in_memory_state.on_transition_configuration_exchanged();
}
},
}
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
self.on_tree_event(event);
Expand Down Expand Up @@ -1992,7 +2007,7 @@ mod tests {

struct TestHarness {
tree: EngineApiTreeHandler<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
to_tree_tx: Sender<FromEngine<BeaconEngineMessage<EthEngineTypes>>>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes>>>,
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlock>,
action_rx: Receiver<PersistenceAction>,
Expand Down Expand Up @@ -2136,7 +2151,8 @@ mod tests {
state: fcu_state,
payload_attrs: None,
tx,
},
}
.into(),
));

let response = rx.await.unwrap().unwrap().await.unwrap();
Expand Down Expand Up @@ -2367,7 +2383,8 @@ mod tests {
},
payload_attrs: None,
tx,
},
}
.into(),
));

let resp = rx.await.unwrap().unwrap().await.unwrap();
Expand Down Expand Up @@ -2424,11 +2441,14 @@ mod tests {
TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);

let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}));
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::NewPayload {
payload: payload.clone().into(),
cancun_fields: None,
tx,
}
.into(),
));

let resp = rx.await.unwrap().unwrap();
assert!(resp.is_syncing());
Expand Down
4 changes: 2 additions & 2 deletions crates/ethereum/engine/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiRequestHandler, EngineHandler},
engine::{EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, TreeConfig},
};
Expand All @@ -33,7 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Alias for Ethereum chain orchestrator.
type EthServiceType<DB, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<BeaconEngineMessage<EthEngineTypes>>,
EngineApiRequestHandler<EngineApiRequest<EthEngineTypes>>,
UnboundedReceiverStream<BeaconEngineMessage<EthEngineTypes>>,
BasicBlockDownloader<Client>,
>,
Expand Down
Loading