diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index c31ce72445fb..18da1bd75684 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -80,6 +80,13 @@ pub struct Transactions( pub Vec, ); +impl Transactions { + /// Returns `true` if the list of transactions contains any blob transactions. + pub fn has_eip4844(&self) -> bool { + self.0.iter().any(|tx| tx.is_eip4844()) + } +} + impl From> for Transactions { fn from(txs: Vec) -> Self { Transactions(txs) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 56d43dbb5e28..e03d466676a1 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -19,8 +19,8 @@ use reth_interfaces::{ use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{Peers, ReputationChangeKind}; use reth_primitives::{ - FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, TxType, - H256, + FromRecoveredPooledTransaction, IntoRecoveredTransaction, PeerId, PooledTransactionsElement, + TransactionSigned, TxHash, TxType, H256, }; use reth_rlp::Encodable; use reth_transaction_pool::{ @@ -164,7 +164,6 @@ impl TransactionsManager { impl TransactionsManager where Pool: TransactionPool + 'static, - ::Transaction: IntoRecoveredTransaction, { /// Returns a new handle that can send commands to this type. pub fn handle(&self) -> TransactionsHandle { @@ -375,7 +374,22 @@ where fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { - self.import_transactions(peer_id, msg.0, TransactionSource::Broadcast); + // ensure we didn't receive any blob transactions as these are disallowed to be + // broadcasted in full + + let has_blob_txs = msg.has_eip4844(); + + let non_blob_txs = msg + .0 + .into_iter() + .map(PooledTransactionsElement::try_from_broadcast) + .filter_map(Result::ok); + + self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast); + + if has_blob_txs { + self.report_peer(peer_id, ReputationChangeKind::BadTransactions); + } } NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => { self.on_new_pooled_transaction_hashes(peer_id, msg) @@ -448,7 +462,7 @@ where fn import_transactions( &mut self, peer_id: PeerId, - transactions: Vec, + transactions: impl IntoIterator, source: TransactionSource, ) { // If the node is pipeline syncing, ignore transactions @@ -463,7 +477,7 @@ where if let Some(peer) = self.peers.get_mut(&peer_id) { for tx in transactions { // recover transaction - let tx = if let Some(tx) = tx.into_ecrecovered() { + let tx = if let Ok(tx) = tx.try_into_ecrecovered() { tx } else { has_bad_transactions = true; @@ -474,18 +488,18 @@ where // If we received the transactions as the response to our GetPooledTransactions // requests (based on received `NewPooledTransactionHashes`) then we already // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] - if source.is_broadcast() && !peer.transactions.insert(tx.hash()) { + if source.is_broadcast() && !peer.transactions.insert(*tx.hash()) { num_already_seen += 1; } - match self.transactions_by_peers.entry(tx.hash()) { + match self.transactions_by_peers.entry(*tx.hash()) { Entry::Occupied(mut entry) => { // transaction was already inserted entry.get_mut().push(peer_id); } Entry::Vacant(entry) => { // this is a new transaction that should be imported into the pool - let pool_transaction = ::from_recovered_transaction(tx); + let pool_transaction = ::from_recovered_transaction(tx); let pool = self.pool.clone(); @@ -583,11 +597,7 @@ where { match result { Ok(Ok(txs)) => { - // convert all transactions to the inner transaction type, ignoring any - // sidecars - // TODO: remove this! this will be different when we introduce the blobpool - let transactions = txs.0.into_iter().map(|tx| tx.into_transaction()).collect(); - this.import_transactions(peer_id, transactions, TransactionSource::Response) + this.import_transactions(peer_id, txs.0, TransactionSource::Response) } Ok(Err(req_err)) => { this.on_request_error(peer_id, req_err); @@ -825,6 +835,8 @@ enum TransactionsCommand { #[allow(missing_docs)] pub enum NetworkTransactionEvent { /// Received list of transactions from the given peer. + /// + /// This represents transactions that were broadcasted to use from the peer. IncomingTransactions { peer_id: PeerId, msg: Transactions }, /// Received list of transactions hashes to the given peer. IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, diff --git a/crates/primitives/src/transaction/pooled.rs b/crates/primitives/src/transaction/pooled.rs index a293b0b67aef..e34d05725338 100644 --- a/crates/primitives/src/transaction/pooled.rs +++ b/crates/primitives/src/transaction/pooled.rs @@ -46,6 +46,17 @@ pub enum PooledTransactionsElement { } impl PooledTransactionsElement { + /// Tries to convert a [TransactionSigned] into a [PooledTransactionsElement]. + /// + /// [BlobTransaction] are disallowed from being propagated, hence this returns an error if the + /// `tx` is [Transaction::Eip4844] + pub fn try_from_broadcast(tx: TransactionSigned) -> Result { + if tx.is_eip4844() { + return Err(tx) + } + Ok(tx.into()) + } + /// Heavy operation that return signature hash over rlp encoded transaction. /// It is only for signature signing or signer recovery. pub fn signature_hash(&self) -> H256 { @@ -57,6 +68,16 @@ impl PooledTransactionsElement { } } + /// Reference to transaction hash. Used to identify transaction. + pub fn hash(&self) -> &TxHash { + match self { + PooledTransactionsElement::Legacy { hash, .. } => hash, + PooledTransactionsElement::Eip2930 { hash, .. } => hash, + PooledTransactionsElement::Eip1559 { hash, .. } => hash, + PooledTransactionsElement::BlobTransaction(tx) => &tx.hash, + } + } + /// Returns the signature of the transaction. pub fn signature(&self) -> &Signature { match self {