Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added chunk impl for parallel sender recovery #5622

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 96 additions & 4 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use alloy_rlp::{
use bytes::{Buf, BytesMut};
use derive_more::{AsRef, Deref};
use once_cell::sync::Lazy;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use reth_codecs::{add_arbitrary_tests, derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
use std::mem;
use std::{mem, sync::mpsc, thread};

pub use access_list::{AccessList, AccessListItem};
pub use eip1559::TxEip1559;
Expand Down Expand Up @@ -849,7 +849,53 @@ impl TransactionSignedNoHash {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSignedNoHash> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
for tx in chunk {
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}
}
Expand Down Expand Up @@ -1003,7 +1049,53 @@ impl TransactionSigned {
if num_txes < *PARALLEL_SENDER_RECOVERY_THRESHOLD {
txes.into_iter().map(|tx| tx.recover_signer()).collect()
} else {
txes.into_par_iter().map(|tx| tx.recover_signer()).collect()
let mut recovered_signers: Vec<Address> = Vec::new();
let mut channels = Vec::new();
rayon::scope(|s| {
let (chunk_size, chunks) = if num_txes < 16 {
(num_txes, 2)
} else {
let chunk_size = num_txes / (num_txes / 16);
let chunks = num_txes / chunk_size + 1;
(chunk_size, chunks)
};
let mut iter = txes.into_iter();
(0..chunks).for_each(|i| {
let chunk: Vec<&TransactionSigned> = if i == chunks - 1 {
iter.by_ref().take(num_txes % chunk_size).collect()
} else {
iter.by_ref().take(chunk_size).collect()
};
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered
// the senders.
s.spawn(move |_| {
for tx in chunk {
let recovery_result = tx.recover_signer();
let _ = recovered_senders_tx.send(recovery_result);
}
});
})
});
thread::spawn(move || {
for channel in channels {
while let Ok(recovered) = channel.recv() {
match recovered {
Some(signer) => {
recovered_signers.push(signer);
}
None => {
return None;
}
}
}
}
Some(recovered_signers)
})
.join()
.unwrap()
}
}

Expand Down
Loading