diff --git a/crates/primitives/src/transaction/mod.rs b/crates/primitives/src/transaction/mod.rs index 9702ae19eb4d..427134fb6923 100644 --- a/crates/primitives/src/transaction/mod.rs +++ b/crates/primitives/src/transaction/mod.rs @@ -8,10 +8,10 @@ use alloy_rlp::{ use bytes::{Buf, BytesMut}; use derive_more::{AsRef, Deref}; use once_cell::sync::Lazy; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use rayon::prelude::*; use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact}; use serde::{Deserialize, Serialize}; -use std::mem; +use std::{mem, sync::mpsc, thread}; pub use access_list::{AccessList, AccessListItem}; pub use eip1559::TxEip1559; @@ -849,7 +849,53 @@ impl TransactionSignedNoHash { if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD { txes.into_iter().map(|tx| tx.recover_signer()).collect() } else { - txes.into_par_iter().map(|tx| tx.recover_signer()).collect() + let mut recovered_signers: Vec
= Vec::new(); + let mut channels = Vec::new(); + rayon::scope(|s| { + let (chunk_size, chunks) = if num_txes < 16 { + (num_txes, 2) + } else { + let chunk_size = num_txes / (num_txes / 16); + let chunks = num_txes / chunk_size + 1; + (chunk_size, chunks) + }; + let mut iter = txes.into_iter(); + (0..chunks).for_each(|i| { + let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 { + iter.by_ref().take(num_txes % chunk_size).collect() + } else { + iter.by_ref().take(chunk_size).collect() + }; + let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); + channels.push(recovered_senders_rx); + // Spawn the sender recovery task onto the global rayon pool + // This task will send the results through the channel after it recovered + // the senders. + s.spawn(move |_| { + for tx in chunk { + let recovery_result = tx.recover_signer(); + let _ = recovered_senders_tx.send(recovery_result); + } + }); + }) + }); + thread::spawn(move || { + for channel in channels { + while let Ok(recovered) = channel.recv() { + match recovered { + Some(signer) => { + recovered_signers.push(signer); + } + None => { + return None; + } + } + } + } + Some(recovered_signers) + }) + .join() + .unwrap() } } } @@ -1003,7 +1049,53 @@ impl TransactionSigned { if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD { txes.into_iter().map(|tx| tx.recover_signer()).collect() } else { - txes.into_par_iter().map(|tx| tx.recover_signer()).collect() + let mut recovered_signers: Vec = Vec::new(); + let mut channels = Vec::new(); + rayon::scope(|s| { + let (chunk_size, chunks) = if num_txes < 16 { + (num_txes, 2) + } else { + let chunk_size = num_txes / (num_txes / 16); + let chunks = num_txes / chunk_size + 1; + (chunk_size, chunks) + }; + let mut iter = txes.into_iter(); + (0..chunks).for_each(|i| { + let chunk: Vec<&TransactionSigned> = if i == chunks - 1 { + iter.by_ref().take(num_txes % chunk_size).collect() + } else { + iter.by_ref().take(chunk_size).collect() + }; + let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); + channels.push(recovered_senders_rx); + // Spawn the sender recovery task onto the global rayon pool + // This task will send the results through the channel after it recovered + // the senders. + s.spawn(move |_| { + for tx in chunk { + let recovery_result = tx.recover_signer(); + let _ = recovered_senders_tx.send(recovery_result); + } + }); + }) + }); + thread::spawn(move || { + for channel in channels { + while let Ok(recovered) = channel.recv() { + match recovered { + Some(signer) => { + recovered_signers.push(signer); + } + None => { + return None; + } + } + } + } + Some(recovered_signers) + }) + .join() + .unwrap() } }