From 3783deab9e68df676824ef41d2f6bc37b13be92f Mon Sep 17 00:00:00 2001 From: 0xA001113 <0xeuler@proton.me> Date: Tue, 22 Oct 2024 20:48:51 +0200 Subject: [PATCH] Muhash parallel reduce -- optimize U3072 mul when LHS = one --- consensus/Cargo.toml | 2 +- consensus/benches/check_scripts.rs | 4 +- consensus/benches/hash_benchmarks.rs | 15 ----- consensus/benches/parallel_muhash.rs | 66 +++++++++++++++++++ consensus/core/src/muhash.rs | 14 ++++ consensus/src/consensus/mod.rs | 10 +-- .../virtual_processor/utxo_validation.rs | 3 +- crypto/muhash/src/u3072.rs | 10 +++ utils/src/iter.rs | 6 ++ 9 files changed, 102 insertions(+), 28 deletions(-) delete mode 100644 consensus/benches/hash_benchmarks.rs create mode 100644 consensus/benches/parallel_muhash.rs diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index de5d206..b540479 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -57,7 +57,7 @@ spectre-txscript-errors.workspace = true spectre-addresses.workspace = true [[bench]] -name = "hash_benchmarks" +name = "parallel_muhash" harness = false [[bench]] diff --git a/consensus/benches/check_scripts.rs b/consensus/benches/check_scripts.rs index 856dc16..b7151b7 100644 --- a/consensus/benches/check_scripts.rs +++ b/consensus/benches/check_scripts.rs @@ -9,9 +9,9 @@ use spectre_consensus_core::subnets::SubnetworkId; use spectre_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry}; use spectre_txscript::caches::Cache; use spectre_txscript::pay_to_address_script; +use spectre_utils::iter::parallelism_in_power_steps; use rand::{thread_rng, Rng}; use secp256k1::Keypair; -use std::thread::available_parallelism; // You may need to add more detailed mocks depending on your actual code. fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec) { @@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) { }); // Iterate powers of two up to available parallelism - for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) { + for i in parallelism_in_power_steps() { if inputs_count >= i { group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| { let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone()); diff --git a/consensus/benches/hash_benchmarks.rs b/consensus/benches/hash_benchmarks.rs deleted file mode 100644 index 2f34750..0000000 --- a/consensus/benches/hash_benchmarks.rs +++ /dev/null @@ -1,15 +0,0 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use std::str::FromStr; - -use spectre_hashes::Hash; - -/// Placeholder for actual benchmarks -pub fn hash_benchmark(c: &mut Criterion) { - c.bench_function("Hash::from_str", |b| { - let hash_str = "8e40af02265360d59f4ecf9ae9ebf8f00a3118408f5a9cdcbcc9c0f93642f3af"; - b.iter(|| Hash::from_str(black_box(hash_str))) - }); -} - -criterion_group!(benches, hash_benchmark); -criterion_main!(benches); diff --git a/consensus/benches/parallel_muhash.rs b/consensus/benches/parallel_muhash.rs new file mode 100644 index 0000000..ba77809 --- /dev/null +++ b/consensus/benches/parallel_muhash.rs @@ -0,0 +1,66 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use itertools::Itertools; +use spectre_consensus_core::{ + muhash::MuHashExtensions, + subnets::SUBNETWORK_ID_NATIVE, + tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry}, +}; +use spectre_hashes::TransactionID; +use spectre_muhash::MuHash; +use spectre_utils::iter::parallelism_in_power_steps; +use rayon::prelude::*; + +fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction { + let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]); + let mut entries = vec![]; + for i in 0..ins { + let mut hasher = TransactionID::new(); + hasher.write(i.to_le_bytes()); + hasher.write(randomness.to_le_bytes()); + let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1); + let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false); + tx.inputs.push(input); + entries.push(entry); + } + for _ in 0..outs { + let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34])); + tx.outputs.push(output); + } + tx.finalize(); + SignableTransaction::with_entries(tx, entries) +} + +pub fn parallel_muhash_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("muhash txs"); + let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec(); + group.bench_function("seq", |b| { + b.iter(|| { + let mut mh = MuHash::new(); + for tx in txs.iter() { + mh.add_transaction(&tx.as_verifiable(), 222); + } + black_box(mh) + }) + }); + + for threads in parallelism_in_power_steps() { + group.bench_function(format!("par {threads}"), |b| { + let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap(); + b.iter(|| { + pool.install(|| { + let mh = + txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| { + a.combine(&b); + a + }); + black_box(mh) + }) + }) + }); + } + + group.finish(); +} + +criterion_group!(benches, parallel_muhash_benchmark); +criterion_main!(benches); diff --git a/consensus/core/src/muhash.rs b/consensus/core/src/muhash.rs index 3bccba6..c30d6a3 100644 --- a/consensus/core/src/muhash.rs +++ b/consensus/core/src/muhash.rs @@ -8,6 +8,8 @@ use spectre_muhash::MuHash; pub trait MuHashExtensions { fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64); fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry); + fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self; + fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self; } impl MuHashExtensions for MuHash { @@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash { write_utxo(&mut writer, entry, outpoint); writer.finalize(); } + + fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self { + let mut mh = Self::new(); + mh.add_transaction(tx, block_daa_score); + mh + } + + fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self { + let mut mh = Self::new(); + mh.add_utxo(outpoint, entry); + mh + } } fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) { diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index c0ed1c3..00b24a9 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -775,14 +775,8 @@ impl ConsensusApi for Consensus { pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap(); // Parallelize processing - let inner_multiset = utxoset_chunk - .par_iter() - .map(|(outpoint, entry)| { - let mut inner_multiset = MuHash::new(); - inner_multiset.add_utxo(outpoint, entry); - inner_multiset - }) - .reduce(MuHash::new, |mut a, b| { + let inner_multiset = + utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| { a.combine(&b); a }); diff --git a/consensus/src/pipeline/virtual_processor/utxo_validation.rs b/consensus/src/pipeline/virtual_processor/utxo_validation.rs index e880b70..2e898f3 100644 --- a/consensus/src/pipeline/virtual_processor/utxo_validation.rs +++ b/consensus/src/pipeline/virtual_processor/utxo_validation.rs @@ -248,8 +248,7 @@ impl VirtualStateProcessor { .enumerate() .skip(1) // Skip the coinbase tx. .filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| { - let mut mh = MuHash::new(); - mh.add_transaction(&vtx, pov_daa_score); + let mh = MuHash::from_transaction(&vtx, pov_daa_score); (smallvec![(vtx, i as u32)], mh) } )) diff --git a/crypto/muhash/src/u3072.rs b/crypto/muhash/src/u3072.rs index 85a0d5c..652655c 100644 --- a/crypto/muhash/src/u3072.rs +++ b/crypto/muhash/src/u3072.rs @@ -88,6 +88,16 @@ impl U3072 { } fn mul(&mut self, other: &U3072) { + /* + Optimization: short-circuit when LHS is one + - This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS) + - If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose + - Benchmarks show that general performance remains the same while parallel reduction gains ~35% + */ + if *self == Self::one() { + *self = *other; + return; + } let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0); let mut tmp = Self::one(); diff --git a/utils/src/iter.rs b/utils/src/iter.rs index 58a61d7..3c4c98c 100644 --- a/utils/src/iter.rs +++ b/utils/src/iter.rs @@ -48,3 +48,9 @@ where self.inner.clone().fmt(f) } } + +/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`, +/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16` +pub fn parallelism_in_power_steps() -> impl Iterator { + (1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x)) +}