Skip to content

Commit

Permalink
feat: Add version to BeaconEngineMessage FCU (#12089)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
  • Loading branch information
0xOsiris and Rjected authored Oct 28, 2024
1 parent 1b0f625 commit 0d07d27
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 33 deletions.
13 changes: 10 additions & 3 deletions bin/reth/src/commands/debug_cmd/replay_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use reth_engine_util::engine_store::{EngineMessageStore, StoredEngineApiMessage}
use reth_fs_util as fs;
use reth_network::{BlockDownloaderProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_node_api::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_api::{
EngineApiMessageVersion, NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine,
};
use reth_node_ethereum::{EthEngineTypes, EthEvmConfig, EthExecutorProvider};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_provider::{
Expand Down Expand Up @@ -166,8 +168,13 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
debug!(target: "reth::cli", filepath = %filepath.display(), ?message, "Forwarding Engine API message");
match message {
StoredEngineApiMessage::ForkchoiceUpdated { state, payload_attrs } => {
let response =
beacon_engine_handle.fork_choice_updated(state, payload_attrs).await?;
let response = beacon_engine_handle
.fork_choice_updated(
state,
payload_attrs,
EngineApiMessageVersion::default(),
)
.await?;
debug!(target: "reth::cli", ?response, "Received for forkchoice updated");
}
StoredEngineApiMessage::NewPayload { payload, sidecar } => {
Expand Down
3 changes: 2 additions & 1 deletion crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy_rpc_types_engine::ForkchoiceState;
use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_evm::execute::BlockExecutorProvider;
use reth_provider::{CanonChainTracker, StateProviderFactory};
use reth_stages_api::PipelineEvent;
Expand Down Expand Up @@ -155,6 +155,7 @@ where
state,
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
});
debug!(target: "consensus::auto", ?state, "Sent fork choice update");

Expand Down
7 changes: 5 additions & 2 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use futures::TryFutureExt;
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
Expand Down Expand Up @@ -60,9 +60,10 @@ where
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
Expand All @@ -74,12 +75,14 @@ where
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}
Expand Down
4 changes: 3 additions & 1 deletion crates/consensus/beacon/src/engine/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy_rpc_types_engine::{
ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
};
use futures::{future::Either, FutureExt};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_payload_primitives::PayloadBuilderError;
use std::{
Expand Down Expand Up @@ -156,6 +156,8 @@ pub enum BeaconEngineMessage<Engine: EngineTypes> {
state: ForkchoiceState,
/// The payload attributes for block building.
payload_attrs: Option<Engine::PayloadAttributes>,
/// The Engine API Version.
version: EngineApiMessageVersion,
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
},
Expand Down
17 changes: 14 additions & 3 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_blockchain_tree_api::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
};
use reth_engine_primitives::{EngineTypes, PayloadTypes};
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes, PayloadTypes};
use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
use reth_network_p2p::{
sync::{NetworkSyncUpdater, SyncState},
Expand Down Expand Up @@ -428,7 +428,12 @@ where
} else if let Some(attrs) = attrs {
// the CL requested to build a new payload on top of this new VALID head
let head = outcome.into_header().unseal();
self.process_payload_attributes(attrs, head, state)
self.process_payload_attributes(
attrs,
head,
state,
EngineApiMessageVersion::default(),
)
} else {
OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Expand Down Expand Up @@ -1160,6 +1165,7 @@ where
attrs: <N::Engine as PayloadTypes>::PayloadAttributes,
head: Header,
state: ForkchoiceState,
_version: EngineApiMessageVersion,
) -> OnForkChoiceUpdated {
// 7. Client software MUST ensure that payloadAttributes.timestamp is greater than timestamp
// of a block referenced by forkchoiceState.headBlockHash. If this condition isn't held
Expand Down Expand Up @@ -1855,7 +1861,12 @@ where
// sensitive, hence they are polled first.
if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version: _version,
} => {
this.on_forkchoice_updated(state, payload_attrs, tx);
}
BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
Expand Down
10 changes: 8 additions & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_engine_primitives::EngineApiMessageVersion;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::{either::Either, test_utils::MockExecutorProvider};
use reth_evm_ethereum::execute::EthExecutorProvider;
Expand Down Expand Up @@ -93,7 +94,9 @@ impl<DB> TestEnv<DB> {
&self,
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
self.engine_handle.fork_choice_updated(state, None).await
self.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await
}

/// Sends the `ForkchoiceUpdated` message to the consensus engine and retries if the engine
Expand All @@ -103,7 +106,10 @@ impl<DB> TestEnv<DB> {
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
loop {
let result = self.engine_handle.fork_choice_updated(state, None).await?;
let result = self
.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await?;
if !result.is_syncing() {
return Ok(result)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/engine/local/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes,
Expand Down Expand Up @@ -167,6 +167,7 @@ where
state: self.forkchoice_state(),
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
})?;

let res = rx.await??;
Expand All @@ -193,6 +194,7 @@ where
state: self.forkchoice_state(),
payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
tx,
version: EngineApiMessageVersion::default(),
})?;

let res = rx.await??.await?;
Expand Down
23 changes: 17 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use reth_chain_state::{
};
use reth_chainspec::EthereumHardforks;
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::BlockExecutorProvider;
use reth_payload_builder::PayloadBuilderHandle;
Expand Down Expand Up @@ -969,6 +969,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<T::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
self.metrics.engine.forkchoice_updated_messages.increment(1);
Expand Down Expand Up @@ -1018,7 +1019,7 @@ where
// to return an error
ProviderError::HeaderNotFound(state.head_block_hash.into())
})?;
let updated = self.process_payload_attributes(attr, &tip, state);
let updated = self.process_payload_attributes(attr, &tip, state, version);
return Ok(TreeOutcome::new(updated))
}

Expand All @@ -1038,7 +1039,7 @@ where
}

if let Some(attr) = attrs {
let updated = self.process_payload_attributes(attr, &tip, state);
let updated = self.process_payload_attributes(attr, &tip, state, version);
return Ok(TreeOutcome::new(updated))
}

Expand All @@ -1054,7 +1055,8 @@ where
if self.engine_kind.is_opstack() {
if let Some(attr) = attrs {
debug!(target: "engine::tree", head = canonical_header.number, "handling payload attributes for canonical head");
let updated = self.process_payload_attributes(attr, &canonical_header, state);
let updated =
self.process_payload_attributes(attr, &canonical_header, state, version);
return Ok(TreeOutcome::new(updated))
}
}
Expand Down Expand Up @@ -1206,8 +1208,14 @@ where
}
EngineApiRequest::Beacon(request) => {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
} => {
let mut output =
self.on_forkchoice_updated(state, payload_attrs, version);

if let Ok(res) = &mut output {
// track last received forkchoice state
Expand Down Expand Up @@ -2484,6 +2492,7 @@ where
attrs: T::PayloadAttributes,
head: &Header,
state: ForkchoiceState,
_version: EngineApiMessageVersion,
) -> OnForkChoiceUpdated {
// 7. Client software MUST ensure that payloadAttributes.timestamp is greater than timestamp
// of a block referenced by forkchoiceState.headBlockHash. If this condition isn't held
Expand Down Expand Up @@ -2808,6 +2817,7 @@ mod tests {
state: fcu_state,
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
}
.into(),
))
Expand Down Expand Up @@ -3097,6 +3107,7 @@ mod tests {
},
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
}
.into(),
))
Expand Down
7 changes: 6 additions & 1 deletion crates/engine/util/src/engine_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ impl EngineMessageStore {
fs::create_dir_all(&self.path)?; // ensure that store path had been created
let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => {
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx: _tx,
version: _version,
} => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
Expand Down
20 changes: 17 additions & 3 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::{
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use itertools::Either;
use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
use reth_ethereum_forks::EthereumHardforks;
use reth_evm::{
Expand Down Expand Up @@ -211,18 +211,32 @@ where
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
version: EngineApiMessageVersion::default(),
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
(Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
(
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}),
_,
) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
(item, _) => item,
};
Expand Down
14 changes: 12 additions & 2 deletions crates/engine/util/src/skip_fcu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,25 @@ where
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
continue
}
*this.skipped = 0;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
next => next,
};
Expand Down
11 changes: 6 additions & 5 deletions crates/payload/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,22 +324,23 @@ where
}

/// The version of Engine API message.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum EngineApiMessageVersion {
/// Version 1
V1,
V1 = 1,
/// Version 2
///
/// Added in the Shanghai hardfork.
V2,
V2 = 2,
/// Version 3
///
/// Added in the Cancun hardfork.
V3,
#[default]
V3 = 3,
/// Version 4
///
/// Added in the Prague hardfork.
V4,
V4 = 4,
}

/// Determines how we should choose the payload to return.
Expand Down
Loading

0 comments on commit 0d07d27

Please sign in to comment.