Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blob transactions in manager #4294

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ pub struct Transactions(
pub Vec<TransactionSigned>,
);

impl Transactions {
/// Returns `true` if the list of transactions contains any blob transactions.
pub fn has_eip4844(&self) -> bool {
self.0.iter().any(|tx| tx.is_eip4844())
}
}

impl From<Vec<TransactionSigned>> for Transactions {
fn from(txs: Vec<TransactionSigned>) -> Self {
Transactions(txs)
Expand Down
40 changes: 26 additions & 14 deletions crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use reth_interfaces::{
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{Peers, ReputationChangeKind};
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, TxType,
H256,
FromRecoveredPooledTransaction, IntoRecoveredTransaction, PeerId, PooledTransactionsElement,
TransactionSigned, TxHash, TxType, H256,
};
use reth_rlp::Encodable;
use reth_transaction_pool::{
Expand Down Expand Up @@ -164,7 +164,6 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle {
Expand Down Expand Up @@ -375,7 +374,22 @@ where
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
self.import_transactions(peer_id, msg.0, TransactionSource::Broadcast);
// ensure we didn't receive any blob transactions as these are disallowed to be
// broadcasted in full

let has_blob_txs = msg.has_eip4844();

let non_blob_txs = msg
.0
.into_iter()
.map(PooledTransactionsElement::try_from_broadcast)
.filter_map(Result::ok);

self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);

if has_blob_txs {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
}
}
NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transaction_hashes(peer_id, msg)
Expand Down Expand Up @@ -448,7 +462,7 @@ where
fn import_transactions(
&mut self,
peer_id: PeerId,
transactions: Vec<TransactionSigned>,
transactions: impl IntoIterator<Item = PooledTransactionsElement>,
source: TransactionSource,
) {
// If the node is pipeline syncing, ignore transactions
Expand All @@ -463,7 +477,7 @@ where
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions {
// recover transaction
let tx = if let Some(tx) = tx.into_ecrecovered() {
let tx = if let Ok(tx) = tx.try_into_ecrecovered() {
tx
} else {
has_bad_transactions = true;
Expand All @@ -474,18 +488,18 @@ where
// If we received the transactions as the response to our GetPooledTransactions
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`]
if source.is_broadcast() && !peer.transactions.insert(tx.hash()) {
if source.is_broadcast() && !peer.transactions.insert(*tx.hash()) {
num_already_seen += 1;
}

match self.transactions_by_peers.entry(tx.hash()) {
match self.transactions_by_peers.entry(*tx.hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
}
Entry::Vacant(entry) => {
// this is a new transaction that should be imported into the pool
let pool_transaction = <Pool::Transaction as FromRecoveredTransaction>::from_recovered_transaction(tx);
let pool_transaction = <Pool::Transaction as FromRecoveredPooledTransaction>::from_recovered_transaction(tx);

let pool = self.pool.clone();

Expand Down Expand Up @@ -583,11 +597,7 @@ where
{
match result {
Ok(Ok(txs)) => {
// convert all transactions to the inner transaction type, ignoring any
// sidecars
// TODO: remove this! this will be different when we introduce the blobpool
let transactions = txs.0.into_iter().map(|tx| tx.into_transaction()).collect();
this.import_transactions(peer_id, transactions, TransactionSource::Response)
this.import_transactions(peer_id, txs.0, TransactionSource::Response)
}
Ok(Err(req_err)) => {
this.on_request_error(peer_id, req_err);
Expand Down Expand Up @@ -825,6 +835,8 @@ enum TransactionsCommand {
#[allow(missing_docs)]
pub enum NetworkTransactionEvent {
/// Received list of transactions from the given peer.
///
/// This represents transactions that were broadcasted to use from the peer.
IncomingTransactions { peer_id: PeerId, msg: Transactions },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
Expand Down
21 changes: 21 additions & 0 deletions crates/primitives/src/transaction/pooled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ pub enum PooledTransactionsElement {
}

impl PooledTransactionsElement {
/// Tries to convert a [TransactionSigned] into a [PooledTransactionsElement].
///
/// [BlobTransaction] are disallowed from being propagated, hence this returns an error if the
/// `tx` is [Transaction::Eip4844]
pub fn try_from_broadcast(tx: TransactionSigned) -> Result<Self, TransactionSigned> {
if tx.is_eip4844() {
return Err(tx)
}
Ok(tx.into())
}

/// Heavy operation that return signature hash over rlp encoded transaction.
/// It is only for signature signing or signer recovery.
pub fn signature_hash(&self) -> H256 {
Expand All @@ -57,6 +68,16 @@ impl PooledTransactionsElement {
}
}

/// Reference to transaction hash. Used to identify transaction.
pub fn hash(&self) -> &TxHash {
match self {
PooledTransactionsElement::Legacy { hash, .. } => hash,
PooledTransactionsElement::Eip2930 { hash, .. } => hash,
PooledTransactionsElement::Eip1559 { hash, .. } => hash,
PooledTransactionsElement::BlobTransaction(tx) => &tx.hash,
}
}

/// Returns the signature of the transaction.
pub fn signature(&self) -> &Signature {
match self {
Expand Down
Loading