Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Muhash parallel reduce -- optimize U3072 mul when LHS = one #581

Merged
merged 5 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ kaspa-txscript-errors.workspace = true
kaspa-addresses.workspace = true

[[bench]]
name = "hash_benchmarks"
name = "parallel_muhash"
harness = false

[[bench]]
Expand Down
4 changes: 2 additions & 2 deletions consensus/benches/check_scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use kaspa_consensus_core::subnets::SubnetworkId;
use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry};
use kaspa_txscript::caches::Cache;
use kaspa_txscript::pay_to_address_script;
use kaspa_utils::iter::parallelism_in_power_steps;
use rand::{thread_rng, Rng};
use secp256k1::Keypair;
use std::thread::available_parallelism;

// You may need to add more detailed mocks depending on your actual code.
fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec<UtxoEntry>) {
Expand Down Expand Up @@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) {
});

// Iterate powers of two up to available parallelism
for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) {
for i in parallelism_in_power_steps() {
if inputs_count >= i {
group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| {
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());
Expand Down
15 changes: 0 additions & 15 deletions consensus/benches/hash_benchmarks.rs

This file was deleted.

66 changes: 66 additions & 0 deletions consensus/benches/parallel_muhash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use kaspa_consensus_core::{
muhash::MuHashExtensions,
subnets::SUBNETWORK_ID_NATIVE,
tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry},
};
use kaspa_hashes::TransactionID;
use kaspa_muhash::MuHash;
use kaspa_utils::iter::parallelism_in_power_steps;
use rayon::prelude::*;

fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction {
let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]);
let mut entries = vec![];
for i in 0..ins {
let mut hasher = TransactionID::new();
hasher.write(i.to_le_bytes());
hasher.write(randomness.to_le_bytes());
let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1);
let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false);
tx.inputs.push(input);
entries.push(entry);
}
for _ in 0..outs {
let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34]));
tx.outputs.push(output);
}
tx.finalize();
SignableTransaction::with_entries(tx, entries)
}

pub fn parallel_muhash_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("muhash txs");
let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec();
group.bench_function("seq", |b| {
b.iter(|| {
let mut mh = MuHash::new();
for tx in txs.iter() {
mh.add_transaction(&tx.as_verifiable(), 222);
}
black_box(mh)
})
});

for threads in parallelism_in_power_steps() {
group.bench_function(format!("par {threads}"), |b| {
let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap();
b.iter(|| {
pool.install(|| {
let mh =
txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
black_box(mh)
})
})
});
}

group.finish();
}

criterion_group!(benches, parallel_muhash_benchmark);
criterion_main!(benches);
14 changes: 14 additions & 0 deletions consensus/core/src/muhash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use kaspa_muhash::MuHash;
pub trait MuHashExtensions {
fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64);
fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry);
fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self;
fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self;
}

impl MuHashExtensions for MuHash {
Expand All @@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash {
write_utxo(&mut writer, entry, outpoint);
writer.finalize();
}

fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self {
let mut mh = Self::new();
mh.add_transaction(tx, block_daa_score);
mh
}

fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self {
let mut mh = Self::new();
mh.add_utxo(outpoint, entry);
mh
}
}

fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) {
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,8 @@ impl ConsensusApi for Consensus {
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();

// Parallelize processing
let inner_multiset = utxoset_chunk
.par_iter()
.map(|(outpoint, entry)| {
let mut inner_multiset = MuHash::new();
inner_multiset.add_utxo(outpoint, entry);
inner_multiset
})
.reduce(MuHash::new, |mut a, b| {
let inner_multiset =
utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/pipeline/virtual_processor/utxo_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ impl VirtualStateProcessor {
.enumerate()
.skip(1) // Skip the coinbase tx.
.filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| {
let mut mh = MuHash::new();
mh.add_transaction(&vtx, pov_daa_score);
let mh = MuHash::from_transaction(&vtx, pov_daa_score);
(smallvec![(vtx, i as u32)], mh)
}
))
Expand Down
10 changes: 10 additions & 0 deletions crypto/muhash/src/u3072.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl U3072 {
}

fn mul(&mut self, other: &U3072) {
/*
Optimization: short-circuit when LHS is one
- This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS)
- If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose
- Benchmarks show that general performance remains the same while parallel reduction gains ~35%
*/
if *self == Self::one() {
*self = *other;
return;
}
let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0);
let mut tmp = Self::one();

Expand Down
6 changes: 6 additions & 0 deletions utils/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ where
self.inner.clone().fmt(f)
}
}

/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`,
/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16`
pub fn parallelism_in_power_steps() -> impl Iterator<Item = usize> {
(1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x))
}