From 264b36649ac6c0c43ee5da7e41daa703267a3564 Mon Sep 17 00:00:00 2001 From: garbageslam Date: Tue, 9 Mar 2021 06:59:43 +0200 Subject: [PATCH] Add slam tool to consensus server repo (#733) * Add slam tool to consensus server repo The slam tool is a load testing tool for the consensus servers. Generally, test code should be versioned with the code under test. This moves the slam tool to the same repo as the consensus servers, and will make it easier to run this test in CD and more easily use it to validate changes to the consensus servers. * fix build, docu, review comments, copyright * slam -> mc-slam and update docu * lockfiles and fix build * fixups to readme --- Cargo.lock | 28 ++ Cargo.toml | 1 + slam/Cargo.toml | 38 +++ slam/README.md | 68 +++++ slam/src/config.rs | 124 +++++++++ slam/src/lib.rs | 5 + slam/src/main.rs | 656 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 920 insertions(+) create mode 100644 slam/Cargo.toml create mode 100644 slam/README.md create mode 100644 slam/src/config.rs create mode 100644 slam/src/lib.rs create mode 100755 slam/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index f9b41ed7fc..dd80ed892c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3342,6 +3342,34 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "mc-slam" +version = "1.0.0" +dependencies = [ + "crossbeam-channel 0.5.0", + "curve25519-dalek", + "grpcio", + "lazy_static", + "mc-account-keys", + "mc-attest-core", + "mc-common", + "mc-connection", + "mc-consensus-enclave-measurement", + "mc-consensus-scp", + "mc-crypto-keys", + "mc-fog-report-validation", + "mc-ledger-db", + "mc-ledger-sync", + "mc-mobilecoind", + "mc-transaction-core", + "mc-transaction-std", + "mc-util-keyfile", + "mc-util-uri", + "rand 0.7.3", + "structopt", + "tempdir", +] + [[package]] name = "mc-test-vectors-account-keys" version = "1.0.1-pre1" diff --git a/Cargo.toml b/Cargo.toml index 9a5c368a1a..3b38b80f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ members = [ "sgx/report-cache/untrusted", "sgx/slog-edl", "sgx/urts-sys", + "slam", "test-vectors/account-keys", "test-vectors/b58-encodings", "transaction/core", diff --git a/slam/Cargo.toml b/slam/Cargo.toml new file mode 100644 index 0000000000..9b2e78218f --- /dev/null +++ b/slam/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "mc-slam" +version = "1.0.0" +authors = ["MobileCoin"] +edition = "2018" + +[dependencies] +mc-account-keys = { path = "../account-keys" } +mc-attest-core = { path = "../attest/core" } +mc-common = { path = "../common", features = ["log"] } +mc-connection = { path = "../connection" } +mc-consensus-enclave-measurement = { path = "../consensus/enclave/measurement" } +mc-consensus-scp = { path = "../consensus/scp" } +mc-crypto-keys = { path = "../crypto/keys" } +mc-fog-report-validation = { path = "../fog/report/validation" } +mc-ledger-db = { path = "../ledger/db" } +mc-ledger-sync = { path = "../ledger/sync" } +mc-mobilecoind = { path = "../mobilecoind" } +mc-transaction-core = { path = "../transaction/core" } +mc-transaction-std = { path = "../transaction/std" } +mc-util-keyfile = { path = "../util/keyfile" } +mc-util-uri = { path = "../util/uri" } + +crossbeam-channel = "0.5" +grpcio = "0.6.0" +lazy_static = "1.4" +rand = "0.7" +structopt = "0.3" +tempdir = "0.3" + +[target.'cfg(any(target_feature = "avx2", target_feature = "avx"))'.dependencies] +curve25519-dalek = { version = "3.0", default-features = false, features = ["simd_backend", "nightly"] } + +[target.'cfg(not(any(target_feature = "avx2", target_feature = "avx")))'.dependencies] +curve25519-dalek = { version = "3.0", default-features = false, features = ["nightly", "u64_backend"] } + +[dev-dependencies] +mc-common = { path = "../common", features = ["loggers"] } diff --git a/slam/README.md b/slam/README.md new file mode 100644 index 0000000000..513ded95c6 --- /dev/null +++ b/slam/README.md @@ -0,0 +1,68 @@ +# Slam Load Testing + +Slam is a load testing tool that rapidly submits transactions to a consensus network. + +Slam testing a consensus network usually involves: + +1. Obtaining a copy of the initial ledger content and corresponding private keys. +1. Running Slam. +1. Monitoring the network's performance. + +# Obtaining an initial ledger and keys + +In order to create transactions, Slam requires a copy of the ledger and the set of private keys that own the contents of the ledger. The ledger and keys must **exactly** match those used by the consensus network; if not, Slam's transactions will likely be rejected with `InvalidTxOutMembershipProof` or `LedgerDbError`. + +## Generating a local ledger and keys +If you know how the consensus network's ledger was initialized, you can initialize the same ledger locally with: + +``` + mobilecoin $ mkdir -p target/sample_data + mobilecoin $ cd sample_data + mobilecoin/target/sample_data $ cargo run -p mc-util-keyfile --bin sample-keys --release -- --num 1000 + + mobilecoin/target/sample_data $ cargo run -p mc-util-generate-sample-ledger --bin generate-sample-ledger --release -- --num 100 +``` + + +## Using a deployed ledger +Alternatively, Slam can use the ledger from a deployed network instead of a locally-generated one: + +``` + docker pull mobilecoin/node_hw:master-latest + docker run -it --detach=true --entrypoint="/bin/bash" --name=extract_ledger mobilecoin/node_hw:master-latest + docker cp extract_ledger:/var/lib/mobilecoin/ledger/ /tmp/ledger +``` + +# Running Slam + +To Run Slam against a deployed network (e.g. "other"), set one of the following environment variables. If you get them wrong, you'll probably see "Attestation failure" messages. + +``` +# aws s3 cp s3://enclave-distribution.other.mobilecoin.com/consensus/consensus-enclave.css ./s +export CONSENSUS_ENCLAVE_CSS=/home/you/consensus-enclave.css + +# Local development +export CONSENSUS_ENCLAVE_PRIVKEY=/home/you/Enclave_private.pem +``` + +Then, run slam in `release` mode: + +``` + cargo run -p mc-slam --release -- --sample-data-dir target/sample_data/ \ + --peer mc://node1.demo.mobilecoin.com \ + --peer mc://node2.demo.mobilecoin.com \ + --peer mc://node3.demo.mobilecoin.com \ + --peer mc://node4.demo.mobilecoin.com \ + --peer mc://node5.demo.mobilecoin.com \ + --add-tx-delay-ms 500 \ + --tombstone-block 100 \ + --with-ledger-sync \ + --tx-source-url https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.demo.mobilecoin.com/ +``` + +## Running Slam with a local consensus network + +If you are running a consensus network locally, you will replace the peer URIs above with either: + +* `insecure-mc://localhost:3223` if running outside Docker, with the port matching the local ports corresponding to the consensus nodes. +* `insecure-mc://:3223` if running inside Docker, making sure that the ports are published on the docker container diff --git a/slam/src/config.rs b/slam/src/config.rs new file mode 100644 index 0000000000..cf7e0f7b57 --- /dev/null +++ b/slam/src/config.rs @@ -0,0 +1,124 @@ +// Copyright (c) 2018-2021 The MobileCoin Foundation + +//! Configuration parameters for the slam script + +use grpcio::EnvBuilder; +use mc_attest_core::{MrSignerVerifier, Verifier, DEBUG_ENCLAVE}; +use mc_common::logger::{o, Logger}; +use mc_connection::{ + HardcodedCredentialsProvider, Result as ConnectionResult, SyncConnection, ThickClient, +}; +use mc_mobilecoind::config::PeersConfig; +use mc_util_uri::ConnectionUri; +use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; +use structopt::StructOpt; + +#[derive(Clone, Debug, StructOpt)] +#[structopt(name = "slam", about = "Generate valid txs.")] +pub struct SlamConfig { + /// Path to sample data for keys/ and ledger/ + #[structopt(long, parse(from_os_str))] + pub sample_data_dir: PathBuf, + + /// Number of transactions to send per account + #[structopt(long, default_value = "-1")] + pub num_tx_to_send: isize, + + /// Number of inputs in the ring + #[structopt(long, default_value = "11")] + pub ring_size: usize, + + /// Block after which to tombstone + #[structopt(long, default_value = "50")] + pub tombstone_block: u64, + + #[structopt(long, default_value = "1")] + pub num_inputs: usize, + + /// Ask consensus for the current block to set tombstone appropriately + #[structopt(long)] + pub query_consensus_for_cur_block: bool, + + /// Offset into transactions to start + #[structopt(long, default_value = "0")] + pub start_offset: usize, + + /// Num transactions per account - must set this if using start_offset + #[structopt(long, default_value = "0")] + pub num_transactions_per_account: usize, + + /// Offset into accounts + #[structopt(long, default_value = "0")] + pub account_offset: usize, + + /// Number of threads with which to submit transactions (threadpool uses min with cpu) + #[structopt(long, default_value = "32")] + pub max_threads: usize, + + /// Delay (in milliseconds) before each add_transaction call + #[structopt(long, default_value = "0")] + pub add_tx_delay_ms: u64, + + /// Enable ledger sync, which allows slam to run indefinitely (until it runs out of money). + #[structopt(long)] + pub with_ledger_sync: bool, + + /// URLs to use for transaction data. + /// + /// For example: https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.master.mobilecoin.com/ + #[structopt(long = "tx-source-url")] + pub tx_source_urls: Vec, + + #[structopt(flatten)] + pub peers_config: PeersConfig, +} + +impl SlamConfig { + pub fn get_connections( + &self, + logger: &Logger, + ) -> ConnectionResult>>> { + let mut mr_signer_verifier = + MrSignerVerifier::from(mc_consensus_enclave_measurement::sigstruct()); + mr_signer_verifier.allow_hardening_advisory("INTEL-SA-00334"); + + let mut verifier = Verifier::default(); + verifier.mr_signer(mr_signer_verifier).debug(DEBUG_ENCLAVE); + + self.peers_config + .peers + .clone() + .unwrap() + .iter() + .map(|uri| { + // We create a new environment for each peer to maintain current behavior + let env = Arc::new( + EnvBuilder::new() + .name_prefix(format!("slam-{}", uri.addr())) + .build(), + ); + let logger = logger.new(o!("mc.cxn" => uri.addr())); + ThickClient::new( + uri.clone(), + verifier.clone(), + env, + HardcodedCredentialsProvider::from(uri), + logger.clone(), + ) + .map(|inner| SyncConnection::new(inner, logger)) + }) + .collect() + } +} + +#[derive(Clone, Debug)] +pub struct FileData(pub Vec); +impl FromStr for FileData { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(Self(fs::read(s).map_err(|e| { + format!("Failed reading \"{}\": {:?}", s, e) + })?)) + } +} diff --git a/slam/src/lib.rs b/slam/src/lib.rs new file mode 100644 index 0000000000..f3904f9744 --- /dev/null +++ b/slam/src/lib.rs @@ -0,0 +1,5 @@ +// Copyright (c) 2018-2021 The MobileCoin Foundation + +pub mod config; + +pub use crate::config::SlamConfig; diff --git a/slam/src/main.rs b/slam/src/main.rs new file mode 100755 index 0000000000..03e0b0d4a4 --- /dev/null +++ b/slam/src/main.rs @@ -0,0 +1,656 @@ +// Copyright (c) 2018-2021 The MobileCoin Foundation + +use core::{cell::RefCell, convert::TryFrom}; +use lazy_static::lazy_static; +use mc_account_keys::{AccountKey, PublicAddress}; +use mc_attest_core::{MrSignerVerifier, Verifier, DEBUG_ENCLAVE}; +use mc_common::{ + logger::{create_app_logger, log, o, Logger}, + HashMap, HashSet, ResponderId, +}; +use mc_connection::{ + HardcodedCredentialsProvider, RetryError, RetryableUserTxConnection, SyncConnection, + ThickClient, +}; +use mc_consensus_scp::QuorumSet; +use mc_crypto_keys::{CompressedRistrettoPublic, RistrettoPublic}; +use mc_fog_report_validation::FogResolver; +use mc_ledger_db::{Ledger, LedgerDB}; +use mc_ledger_sync::{LedgerSyncServiceThread, PollingNetworkState, ReqwestTransactionsFetcher}; +use mc_slam::SlamConfig; +use mc_transaction_core::{ + constants::MINIMUM_FEE, + get_tx_out_shared_secret, + onetime_keys::{recover_onetime_private_key, view_key_matches_output}, + ring_signature::KeyImage, + tx::{Tx, TxOut, TxOutMembershipProof}, +}; +use mc_transaction_std::{InputCredentials, TransactionBuilder}; +use mc_util_uri::ConnectionUri; +use rand::{seq::SliceRandom, thread_rng, Rng}; +use std::{ + iter::empty, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, RwLock, + }, + thread, + time::Duration, +}; +use structopt::StructOpt; +use tempdir::TempDir; + +thread_local! { + pub static CONNS: RefCell>>>> = RefCell::new(None); +} +fn set_conns(config: &SlamConfig, logger: &Logger) { + let conns = config.get_connections(logger).unwrap(); + CONNS.with(|c| *c.borrow_mut() = Some(conns)); +} +fn get_conns( + config: &SlamConfig, + logger: &Logger, +) -> Vec>> { + let conns = CONNS.with(|c| c.borrow().clone()); + match conns { + Some(c) => c, + None => { + set_conns(config, logger); + CONNS.with(|c| c.borrow().clone()).unwrap() + } + } +} + +lazy_static! { + pub static ref BLOCK_HEIGHT: AtomicU64 = AtomicU64::default(); + + // A map of tx pub keys to account index. This is used in conjunction with ledger syncing to + // identify which new txs belong to which accounts without having to do any slow crypto. + pub static ref TX_PUB_KEY_TO_ACCOUNT_KEY: Mutex> = Mutex::new(HashMap::default()); + +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SpendableTxOut { + pub tx_out: TxOut, + pub amount: u64, + from_account_key: AccountKey, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SubmitTxMessage { + pub utxos: Vec, + //pub account_index: usize, + //pub from_account: AccountKey, + pub to_address: PublicAddress, + //pub ring_size: usize, +} + +fn main() { + mc_common::setup_panic_handler(); + let (logger, _global_logger_guard) = create_app_logger(o!()); + + let config = SlamConfig::from_args(); + + // Read account root_entropies from disk + let accounts: Vec = mc_util_keyfile::keygen::read_default_root_entropies( + config.sample_data_dir.join(Path::new("keys")), + ) + .expect("Could not read default root entropies from keys") + .iter() + .map(|x| { + let mut root_id = x.clone(); + root_id.fog_report_url = Default::default(); + AccountKey::from(&root_id) + }) + .collect(); + + // Open the ledger_db to process the bootstrapped ledger + log::info!(logger, "Loading ledger"); + + let ledger_dir = TempDir::new("slam_ledger").unwrap(); + std::fs::copy( + config.sample_data_dir.join("ledger").join("data.mdb"), + ledger_dir.path().join("data.mdb"), + ) + .expect("failed copying ledger"); + + let ledger_db = + LedgerDB::open(ledger_dir.path().to_path_buf()).expect("Could not open ledger_db"); + + BLOCK_HEIGHT.store(ledger_db.num_blocks().unwrap(), Ordering::SeqCst); + + // The number of blocks we've processed so far. + let mut block_count = 0; + + // Load the bootstrapped transactions. + log::info!(logger, "Processing transactions"); + let mut num_transactions_per_account = config.num_transactions_per_account; + + let (spendable_txouts_sender, spendable_txouts_receiver) = + crossbeam_channel::unbounded::(); + + while let Ok(block_contents) = ledger_db.get_block_contents(block_count) { + let transactions = block_contents.outputs; + // Only get num_transactions per account for the first block, then assume + // future blocks that were bootstrapped are similar + if num_transactions_per_account == 0 { + num_transactions_per_account = + get_num_transactions_per_account(&accounts[0], &transactions, &logger); + } + log::info!( + logger, + "Loaded {:?} transactions from block {:?}", + transactions.len(), + block_count + ); + + // NOTE: This will start at the same offset per block - we may want just the first offset + let mut account_index = config.account_offset; + let mut account = &accounts[account_index]; + let mut num_per_account_processed = 0; + for (index, tx_out) in transactions.iter().enumerate().skip(config.start_offset) { + // Makes strong assumption about bootstrapped ledger layout + if num_per_account_processed >= num_transactions_per_account { + log::trace!( + logger, + "Moving on to next account {:?} at tx index {:?}", + account_index + 1, + index + ); + account_index += 1; + if account_index >= accounts.len() { + log::info!(logger, "Finished processing accounts. If no transactions sent, you may need to re-bootstrap."); + break; + } + account = &accounts[account_index]; + num_per_account_processed = 0; + } + + let public_key = RistrettoPublic::try_from(&tx_out.public_key).unwrap(); + let shared_secret = get_tx_out_shared_secret(account.view_private_key(), &public_key); + let (input_amount, _blinding_factor) = tx_out + .amount + .get_value(&shared_secret) + .expect("Malformed amount"); + + log::trace!( + logger, + "(account = {:?}) and (tx_index {:?}) = {}", + account_index, + index, + input_amount, + ); + + // Push to queue + spendable_txouts_sender + .send(SpendableTxOut { + tx_out: tx_out.clone(), + amount: input_amount, + from_account_key: account.clone(), + }) + .expect("failed sending to spendable_txouts_sender"); + num_per_account_processed += 1; + } + block_count += 1; + } + + // Spawn worker threads + for i in 0..config.max_threads { + let spendable_txouts_receiver2 = spendable_txouts_receiver.clone(); + let config2 = config.clone(); + let ledger_db2 = ledger_db.clone(); + let mut accounts2 = accounts.clone(); + let logger2 = logger.new(o!("num" => i)); + + accounts2.shuffle(&mut thread_rng()); + + thread::Builder::new() + .name(format!("worker{}", i)) + .spawn(move || { + worker_thread_entry( + spendable_txouts_receiver2, + config2, + ledger_db2, + accounts2, + logger2, + ) + }) + .expect("failed starting thread"); + } + + if config.with_ledger_sync { + if config.tx_source_urls.is_empty() { + panic!("--with-ledger-sync requires at least one --tx-source-url"); + } + + // Set up ledger syncing + log::info!(logger, "Starting ledger syncing..."); + let peer_manager = { + let mut mr_signer_verifier = + MrSignerVerifier::from(mc_consensus_enclave_measurement::sigstruct()); + mr_signer_verifier.allow_hardening_advisory("INTEL-SA-00334"); + + let mut verifier = Verifier::default(); + verifier.mr_signer(mr_signer_verifier).debug(DEBUG_ENCLAVE); + + config.peers_config.create_peer_manager(verifier, &logger) + }; + + let node_ids = config + .peers_config + .peers + .clone() + .unwrap() + .iter() + .map(|p| { + p.responder_id().unwrap_or_else(|_| { + panic!("Could not get responder_id from uri {}", p.to_string()) + }) + }) + .collect::>(); + let quorum_set = QuorumSet::new_with_node_ids(node_ids.len() as u32, node_ids); + + let network_state = Arc::new(RwLock::new(PollingNetworkState::new( + quorum_set, + peer_manager.clone(), + logger.clone(), + ))); + + let transactions_fetcher = + ReqwestTransactionsFetcher::new(config.tx_source_urls, logger.clone()) + .expect("Failed creating ReqwestTransactionsFetcher"); + + let mut next_block_idx = ledger_db.num_blocks().unwrap(); + + let _ledger_sync_service_thread = LedgerSyncServiceThread::new( + ledger_db.clone(), + peer_manager, + network_state, + transactions_fetcher, + Duration::from_secs(1), + logger.clone(), + ); + + loop { + let block_contents = match ledger_db.get_block_contents(next_block_idx) { + Ok(contents) => contents, + Err(_) => { + log::info!(logger, "Waiting on block #{}...", next_block_idx); + thread::sleep(Duration::from_secs(1)); + continue; + } + }; + + log::debug!(logger, "Synced block #{}", next_block_idx); + next_block_idx += 1; + + for tx_out in block_contents.outputs { + if let Some(account) = TX_PUB_KEY_TO_ACCOUNT_KEY + .lock() + .unwrap() + .remove(&tx_out.public_key) + { + log::info!( + logger, + "Got account {} for {}", + account.default_subaddress(), + tx_out.public_key, + ); + + let public_key = RistrettoPublic::try_from(&tx_out.public_key).unwrap(); + let shared_secret = + get_tx_out_shared_secret(account.view_private_key(), &public_key); + let (input_amount, _blinding_factor) = tx_out + .amount + .get_value(&shared_secret) + .expect("Malformed amount"); + log::trace!( + logger, + "amount of {} is {}", + tx_out.public_key, + input_amount + ); + + // Push to queue + spendable_txouts_sender + .send(SpendableTxOut { + tx_out: tx_out.clone(), + amount: input_amount, + from_account_key: account.clone(), + }) + .expect("failed sending to spendable_txouts_sender"); + } else { + log::warn!(logger, "Got unknown tx pub key {}", tx_out.public_key); + } + } + } + } else { + log::info!(logger, "Main thread entering infinite loop"); + loop { + thread::sleep(Duration::from_secs(100)); + } + } +} + +fn worker_thread_entry( + spendable_txouts_receiver: crossbeam_channel::Receiver, + config: SlamConfig, + ledger_db: LedgerDB, + accounts: Vec, + logger: Logger, +) { + log::info!(logger, "Worker started."); + let mut txs_created: usize = 0; + + let mut conns = get_conns(&config, &logger); + conns.shuffle(&mut thread_rng()); + + loop { + let mut pending_spendable_txouts = Vec::::new(); + while pending_spendable_txouts.len() < config.num_inputs { + log::trace!( + logger, + "Waiting for {} more inputs", + config.num_inputs - pending_spendable_txouts.len() + ); + pending_spendable_txouts.push( + spendable_txouts_receiver + .recv() + .expect("failed getting txout"), + ); + } + + // Select a random account to send to + let to_account = &accounts[txs_created % accounts.len()]; + + // Got our inputs, construct transaction. + let tx = build_tx( + &pending_spendable_txouts, + to_account, + &config, + &ledger_db, + &logger, + ); + + txs_created += 1; + + // Submit tx + if submit_tx(txs_created, &conns, &tx, &config, &logger) { + let mut map = TX_PUB_KEY_TO_ACCOUNT_KEY.lock().unwrap(); + map.insert(tx.prefix.outputs[0].public_key, to_account.clone()); + } + } +} + +fn submit_tx( + counter: usize, + conns: &[SyncConnection>], + tx: &Tx, + config: &SlamConfig, + logger: &Logger, +) -> bool { + let max_retries = 10; + let retry_sleep_duration = Duration::from_millis(300); + + for i in 0..max_retries { + // Submit to a node in round robin fashion, starting with a random node + let node_index = (i + counter) % conns.len(); + let conn = &conns[node_index]; + log::debug!( + logger, + "Submitting transaction {} to node {} (attempt {} / {})", + counter, + conn, + i, + max_retries + ); + thread::sleep(Duration::from_millis(config.add_tx_delay_ms)); + match conn.propose_tx(&tx, empty()) { + Ok(block_height) => { + log::debug!( + logger, + "Successfully submitted {:?}, at block height {:?} (attempt {} / {})", + counter, + block_height, + i, + max_retries + ); + + BLOCK_HEIGHT.fetch_max(block_height, Ordering::SeqCst); + return true; + } + Err(RetryError::Operation { error, .. }) => { + log::warn!( + logger, + "Failed to submit transaction {:?} to node {} (attempt {} / {}): {}", + counter, + conn, + i, + max_retries, + error + ); + thread::sleep(retry_sleep_duration); + } + Err(RetryError::Internal(s)) => { + log::warn!( + logger, + "Internal retry error while submitting transaction {:?} to node {} (attempt {} / {}): {}", + counter, + conn, + i, + max_retries, + s + ); + return false; + } + } + } + log::error!( + logger, + "Failed to submit tx {:?} and max retries exceeded: {:?}", + counter, + max_retries + ); + false +} + +fn build_tx( + spendable_txouts: &[SpendableTxOut], + to_account: &AccountKey, + config: &SlamConfig, + ledger_db: &LedgerDB, + logger: &Logger, +) -> Tx { + let utxos_with_proofs = get_membership_proofs(ledger_db, spendable_txouts); + let rings = get_rings(ledger_db, config.ring_size, utxos_with_proofs.len()); + + let mut rng = rand::thread_rng(); + + // Sanity + assert_eq!(utxos_with_proofs.len(), rings.len()); + + // Create tx_builder. No fog reports. + let mut tx_builder = TransactionBuilder::new(FogResolver::default()); + + tx_builder.set_fee(MINIMUM_FEE); + + // Unzip each vec of tuples into a tuple of vecs. + let mut rings_and_proofs: Vec<(Vec, Vec)> = rings + .into_iter() + .map(|tuples| tuples.into_iter().unzip()) + .collect(); + + // Add inputs to the tx. + for (utxo, proof) in utxos_with_proofs.clone() { + let (mut ring, mut membership_proofs) = rings_and_proofs.pop().unwrap(); + assert_eq!( + ring.len(), + membership_proofs.len(), + "Each ring element must have a corresponding membership proof." + ); + + // Add the input to the ring. + let position_opt = ring.iter().position(|tx_out| *tx_out == utxo.tx_out); + let real_key_index = match position_opt { + Some(position) => { + // The input is already present in the ring. + // This could happen if ring elements are sampled randomly from the ledger. + position + } + None => { + // The input is not already in the ring. + if ring.is_empty() { + // Append the input and its proof of membership. + ring.push(utxo.tx_out.clone()); + membership_proofs.push(proof.clone()); + } else { + // Replace the first element of the ring. + ring[0] = utxo.tx_out.clone(); + membership_proofs[0] = proof.clone(); + } + // The real input is always the first element. This is safe because TransactionBuilder + // sorts each ring. + 0 + } + }; + + assert_eq!( + ring.len(), + membership_proofs.len(), + "Each ring element must have a corresponding membership proof." + ); + + let public_key = RistrettoPublic::try_from(&utxo.tx_out.public_key).unwrap(); + let onetime_private_key = recover_onetime_private_key( + &public_key, + utxo.from_account_key.view_private_key(), + &utxo.from_account_key.default_subaddress_spend_private(), + ); + + let key_image = KeyImage::from(&onetime_private_key); + log::trace!( + logger, + "Adding input: ring {:?}, utxo index {:?}, key image {:?}, pubkey {:?}", + ring, + real_key_index, + key_image, + public_key + ); + + tx_builder.add_input( + InputCredentials::new( + ring, + membership_proofs, + real_key_index, + onetime_private_key, + *utxo.from_account_key.view_private_key(), + ) + .expect("add_input failed"), + ); + } + + // Add ouputs + for (i, (utxo, _proof)) in utxos_with_proofs.iter().enumerate() { + let mut amount = utxo.amount; + // Use the first input to pay for the fee. + if i == 0 { + amount -= MINIMUM_FEE; + } + + tx_builder + .add_output(amount, &to_account.default_subaddress(), &mut rng) + .expect("failed to add output"); + } + + // Set tombstone block. + let tombstone_block = BLOCK_HEIGHT.load(Ordering::SeqCst) + config.tombstone_block; + tx_builder.set_tombstone_block(tombstone_block); + + // Build and return tx. + tx_builder.build(&mut rng).expect("failed building tx") +} + +fn get_membership_proofs( + ledger_db: &LedgerDB, + utxos: &[SpendableTxOut], +) -> Vec<(SpendableTxOut, TxOutMembershipProof)> { + let indexes: Vec = utxos + .iter() + .map(|utxo| { + ledger_db + .get_tx_out_index_by_hash(&utxo.tx_out.hash()) + .unwrap() + }) + .collect(); + let proofs = ledger_db.get_tx_out_proof_of_memberships(&indexes).unwrap(); + + utxos.iter().cloned().zip(proofs.into_iter()).collect() +} + +fn get_rings( + ledger_db: &LedgerDB, + ring_size: usize, + num_rings: usize, +) -> Vec> { + let num_requested = ring_size * num_rings; + let num_txos = ledger_db.num_txos().unwrap(); + + // Randomly sample `num_requested` TxOuts, without replacement and convert into a Vec + let mut rng = rand::thread_rng(); + let mut sampled_indices: HashSet = HashSet::default(); + while sampled_indices.len() < num_requested { + let index = rng.gen_range(0, num_txos); + sampled_indices.insert(index); + } + let sampled_indices_vec: Vec = sampled_indices.into_iter().collect(); + + // Get proofs for all of those indexes. + let proofs = ledger_db + .get_tx_out_proof_of_memberships(&sampled_indices_vec) + .unwrap(); + + // Create an iterator that returns (index, proof) elements. + let mut indexes_and_proofs_iterator = sampled_indices_vec.into_iter().zip(proofs.into_iter()); + + // Convert that into a Vec> + let mut rings_with_proofs = Vec::new(); + + for _ in 0..num_rings { + let mut ring = Vec::new(); + for _ in 0..ring_size { + let (index, proof) = indexes_and_proofs_iterator.next().unwrap(); + let tx_out = ledger_db.get_tx_out_by_index(index).unwrap(); + + ring.push((tx_out, proof)); + } + rings_with_proofs.push(ring); + } + + rings_with_proofs +} + +fn get_num_transactions_per_account( + account: &AccountKey, + transactions: &[TxOut], + logger: &Logger, +) -> usize { + for (i, tx_out) in transactions.iter().enumerate() { + let target_key = RistrettoPublic::try_from(&tx_out.target_key).unwrap(); + let public_key = RistrettoPublic::try_from(&tx_out.public_key).unwrap(); + + // Make sure the viewkey matches for this output that we are about to send + // Assume accounts are numbered in order that they were processed by bootstrap + if !view_key_matches_output(&account.view_key(), &target_key, &public_key) { + log::trace!( + logger, + "Transaction {:?} does not belong to account. Total txs per account = {:?}", + i, + i, + ); + return i; + } + } + 0 +}