From b1190c4b788932762ebe4c7b19c3b04b4e944e31 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 1 Dec 2023 08:48:22 +0100 Subject: [PATCH] Add RPC bench threads to accounts-cluster-bench --- accounts-cluster-bench/src/main.rs | 374 +++++++++++++++++++++++++++-- 1 file changed, 358 insertions(+), 16 deletions(-) diff --git a/accounts-cluster-bench/src/main.rs b/accounts-cluster-bench/src/main.rs index 9e592131a16905..13251b4a45a9e9 100644 --- a/accounts-cluster-bench/src/main.rs +++ b/accounts-cluster-bench/src/main.rs @@ -1,6 +1,6 @@ #![allow(clippy::arithmetic_side_effects)] use { - clap::{crate_description, crate_name, value_t, values_t_or_exit, App, Arg}, + clap::{crate_description, crate_name, value_t, values_t, values_t_or_exit, App, Arg}, log::*, rand::{thread_rng, Rng}, rayon::prelude::*, @@ -9,8 +9,9 @@ use { hidden_unless_forced, input_parsers::pubkey_of, input_validators::is_url_or_moniker, }, solana_cli_config::{ConfigInput, CONFIG_FILE}, - solana_client::transaction_executor::TransactionExecutor, + solana_client::{rpc_request::TokenAccountsFilter, transaction_executor::TransactionExecutor}, solana_gossip::gossip_service::discover, + solana_measure::measure::Measure, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ commitment_config::CommitmentConfig, @@ -26,11 +27,12 @@ use { std::{ cmp::min, process::exit, + str::FromStr, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, - thread::sleep, + thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, }; @@ -179,8 +181,8 @@ fn make_create_message( fn make_close_message( keypair: &Keypair, base_keypair: &Keypair, - max_created: Arc, - max_closed: Arc, + max_created: &AtomicU64, + max_closed: &AtomicU64, num_instructions: usize, balance: u64, spl_token: bool, @@ -227,6 +229,276 @@ fn make_close_message( Message::new(&instructions, Some(&keypair.pubkey())) } +#[derive(Clone, Copy, Debug)] +pub enum RpcBench { + Version, + Slot, + MultipleAccounts, + ProgramAccounts, + TokenAccountsByOwner, +} + +#[derive(Debug)] +pub enum RpcParseError { + InvalidOption, +} + +impl FromStr for RpcBench { + type Err = RpcParseError; + + fn from_str(s: &str) -> Result { + match s { + "slot" => Ok(RpcBench::Slot), + "multiple-accounts" => Ok(RpcBench::MultipleAccounts), + "token-accounts-by-owner" => Ok(RpcBench::TokenAccountsByOwner), + "version" => Ok(RpcBench::Version), + _ => Err(RpcParseError::InvalidOption), + } + } +} + +fn process_get_multiple_accounts( + max_closed: &AtomicU64, + max_created: &AtomicU64, + stats: &mut RpcBenchStats, + last_error: &mut Instant, + base_keypair_pubkey: &Pubkey, + program_id: &Pubkey, + client: &RpcClient, +) { + let start = max_closed.load(Ordering::Relaxed); + let end = max_created.load(Ordering::Relaxed); + let mut chunk_start = start; + let chunk_size = 10; + while chunk_start < end { + let chunk_end = std::cmp::min(chunk_start + chunk_size, end); + + let addresses: Vec<_> = (chunk_start..chunk_end) + .map(|seed| { + Pubkey::create_with_seed(base_keypair_pubkey, &seed.to_string(), program_id) + .unwrap() + }) + .collect(); + chunk_start = chunk_end; + let mut rpc_time = Measure::start("rpc-get-multiple-accounts"); + match client.get_multiple_accounts(&addresses) { + Ok(accounts) => { + rpc_time.stop(); + for account in accounts.into_iter().flatten() { + if thread_rng().gen_ratio(1, 10_000) { + info!( + "account: lamports {:?} size: {} owner: {:?}", + account.lamports, + account.data.len(), + account.owner + ); + } + } + stats.total_success_time_us += rpc_time.as_us(); + stats.success += 1; + } + Err(e) => { + rpc_time.stop(); + stats.total_errors_time_us += rpc_time.as_us(); + stats.errors += 1; + if last_error.elapsed().as_secs() > 2 { + info!("error: {:?}", e); + *last_error = Instant::now(); + } + debug!("error: {:?}", e); + } + } + } +} + +#[derive(Default)] +struct RpcBenchStats { + errors: u64, + success: u64, + total_errors_time_us: u64, + total_success_time_us: u64, +} + +fn run_rpc_bench_loop( + rpc_bench: RpcBench, + thread: usize, + client: &RpcClient, + base_keypair_pubkey: &Pubkey, + exit: &AtomicBool, + program_id: &Pubkey, + max_closed: &AtomicU64, + max_created: &AtomicU64, + mint: &Option, +) { + let mut stats = RpcBenchStats::default(); + let mut iters = 0; + let mut last_error = Instant::now(); + let mut last_print = Instant::now(); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + match rpc_bench { + RpcBench::Slot => { + let mut rpc_time = Measure::start("rpc-get-slot"); + match client.get_slot() { + Ok(_slot) => { + rpc_time.stop(); + stats.success += 1; + stats.total_success_time_us += rpc_time.as_us(); + } + Err(e) => { + rpc_time.stop(); + stats.total_errors_time_us += rpc_time.as_us(); + stats.errors += 1; + if last_error.elapsed().as_secs() > 2 { + info!("get_slot error: {:?}", e); + last_error = Instant::now(); + } + } + } + } + RpcBench::MultipleAccounts => { + process_get_multiple_accounts( + max_closed, + max_created, + &mut stats, + &mut last_error, + base_keypair_pubkey, + program_id, + client, + ); + } + RpcBench::ProgramAccounts => { + let mut rpc_time = Measure::start("rpc-get-program-accounts"); + match client.get_program_accounts(program_id) { + Ok(accounts) => { + rpc_time.stop(); + stats.success += 1; + stats.total_success_time_us += rpc_time.as_us(); + if thread_rng().gen_ratio(1, 100) { + info!("accounts: {} first: {:?}", accounts.len(), accounts.first()); + } + } + Err(e) => { + rpc_time.stop(); + stats.errors += 1; + stats.total_errors_time_us += rpc_time.as_us(); + if last_error.elapsed().as_secs() > 2 { + info!("get-program-accounts error: {:?}", e); + last_error = Instant::now(); + } + } + } + } + RpcBench::TokenAccountsByOwner => { + let mut rpc_time = Measure::start("rpc-get-token-accounts-by-owner"); + let filter = TokenAccountsFilter::Mint(*mint.as_ref().unwrap()); + match client.get_token_accounts_by_owner(program_id, filter) { + Ok(_accounts) => { + rpc_time.stop(); + stats.success += 1; + stats.total_success_time_us += rpc_time.as_us(); + } + Err(e) => { + rpc_time.stop(); + stats.errors += 1; + stats.total_errors_time_us += rpc_time.as_us(); + if last_error.elapsed().as_secs() > 2 { + info!("get-token-accounts error: {:?}", e); + last_error = Instant::now(); + } + } + } + } + RpcBench::Version => { + let mut rpc_time = Measure::start("rpc-get-version"); + match client.get_version() { + Ok(_r) => { + rpc_time.stop(); + stats.success += 1; + stats.total_success_time_us += rpc_time.as_us(); + } + Err(_e) => { + rpc_time.stop(); + stats.errors += 1; + stats.total_errors_time_us += rpc_time.as_us(); + } + } + } + } + + if last_print.elapsed().as_secs() > 3 { + info!( + "t({}) rpc({:?}) iters: {} success: {} errors: {}", + thread, rpc_bench, iters, stats.success, stats.errors + ); + if stats.success > 0 { + info!( + " t({}) rpc({:?} average success_time: {} us", + thread, + rpc_bench, + stats.total_success_time_us / stats.success + ); + } + if stats.errors > 0 { + info!( + " rpc average average errors time: {} us", + stats.total_errors_time_us / stats.errors + ); + } + last_print = Instant::now(); + stats = RpcBenchStats::default(); + } + + iters += 1; + } +} + +fn make_rpc_bench_threads( + rpc_benches: Vec, + mint: &Option, + exit: &Arc, + client: &Arc, + seed_tracker: &SeedTracker, + base_keypair_pubkey: Pubkey, + num_rpc_bench_threads: usize, +) -> Vec> { + let program_id = if mint.is_some() { + inline_spl_token::id() + } else { + system_program::id() + }; + rpc_benches + .into_iter() + .flat_map(|rpc_bench| { + (0..num_rpc_bench_threads).map(move |thread| { + let client = client.clone(); + let exit = exit.clone(); + let max_closed = seed_tracker.max_closed.clone(); + let max_created = seed_tracker.max_created.clone(); + let mint = *mint; + Builder::new() + .name(format!("rpc-bench-{}", thread)) + .spawn(move || { + run_rpc_bench_loop( + rpc_bench, + thread, + &client, + &base_keypair_pubkey, + &exit, + &program_id, + &max_closed, + &max_created, + &mint, + ) + }) + .unwrap() + }) + }) + .collect() +} + #[allow(clippy::too_many_arguments)] fn run_accounts_bench( client: Arc, @@ -239,6 +511,8 @@ fn run_accounts_bench( num_instructions: usize, mint: Option, reclaim_accounts: bool, + rpc_benches: Option>, + num_rpc_bench_threads: usize, ) { assert!(num_instructions > 0); info!("Targeting {}", client.url()); @@ -291,6 +565,22 @@ fn run_accounts_bench( None, ); + let exit = Arc::new(AtomicBool::new(false)); + let base_keypair_pubkey = base_keypair.pubkey(); + let rpc_bench_threads: Vec<_> = if let Some(rpc_benches) = rpc_benches { + make_rpc_bench_threads( + rpc_benches, + &mint, + &exit, + &client, + &seed_tracker, + base_keypair_pubkey, + num_rpc_bench_threads, + ) + } else { + Vec::new() + }; + loop { if latest_blockhash.elapsed().as_millis() > 10_000 { blockhash = poll_get_latest_blockhash(&client).expect("blockhash"); @@ -369,8 +659,8 @@ fn run_accounts_bench( let message = make_close_message( payer_keypairs[0], &base_keypair, - seed_tracker.max_created.clone(), - seed_tracker.max_closed.clone(), + &seed_tracker.max_created, + &seed_tracker.max_closed, 1, min_balance, mint.is_some(), @@ -440,8 +730,8 @@ fn run_accounts_bench( let message = make_close_message( keypair, &base_keypair, - seed_tracker.max_created.clone(), - seed_tracker.max_closed.clone(), + &seed_tracker.max_created, + &seed_tracker.max_closed, num_instructions, min_balance, mint.is_some(), @@ -483,6 +773,11 @@ fn run_accounts_bench( } executor.close(); } + + exit.store(true, Ordering::Relaxed); + for t in rpc_bench_threads { + t.join().unwrap(); + } } fn main() { @@ -605,6 +900,19 @@ fn main() { .takes_value(false) .help("Reclaim accounts after session ends; incompatible with --iterations 0"), ) + .arg( + Arg::with_name("num_rpc_bench_threads") + .long("num-rpc-bench-threads") + .takes_value(true) + .help("Spawn this many RPC benching threads for each type passed by --rpc-bench"), + ) + .arg( + Arg::with_name("rpc_bench") + .long("rpc-bench") + .takes_value(true) + .multiple(true) + .help("Spawn a thread which calls a specific RPC method in a loop to benchmark it"), + ) .get_matches(); let skip_gossip = !matches.is_present("check_gossip"); @@ -619,6 +927,19 @@ fn main() { eprintln!("bad num_instructions: {num_instructions}"); exit(1); } + let rpc_benches = values_t!(matches, "rpc_bench", String) + .map(|benches| { + benches + .into_iter() + .map(|bench| RpcBench::from_str(&bench).unwrap()) + .collect() + }) + .ok(); + let num_rpc_bench_threads = if rpc_benches.is_none() { + 0 + } else { + value_t!(matches, "num_rpc_bench_threads", usize).unwrap_or(1) + }; let mint = pubkey_of(&matches, "mint"); @@ -696,6 +1017,8 @@ fn main() { num_instructions, mint, matches.is_present("reclaim_accounts"), + rpc_benches, + num_rpc_bench_threads, ); } @@ -703,7 +1026,10 @@ fn main() { pub mod test { use { super::*, - solana_accounts_db::inline_spl_token, + solana_accounts_db::{ + accounts_index::{AccountIndex, AccountSecondaryIndexes}, + inline_spl_token, + }, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_local_cluster::{ @@ -719,11 +1045,19 @@ pub mod test { }, }; + fn add_secondary_indexes(indexes: &mut AccountSecondaryIndexes) { + indexes.indexes.insert(AccountIndex::SplTokenOwner); + indexes.indexes.insert(AccountIndex::SplTokenMint); + indexes.indexes.insert(AccountIndex::ProgramId); + } + #[test] fn test_accounts_cluster_bench() { solana_logger::setup(); - let validator_config = ValidatorConfig::default_for_test(); + let mut validator_config = ValidatorConfig::default_for_test(); let num_nodes = 1; + add_secondary_indexes(&mut validator_config.account_indexes); + add_secondary_indexes(&mut validator_config.rpc_config.account_indexes); let mut config = ClusterConfig { cluster_lamports: 10_000_000, poh_config: PohConfig::new_sleep(Duration::from_millis(50)), @@ -745,8 +1079,11 @@ pub mod test { rpc_addr, CommitmentConfig::confirmed(), )); + let mint = None; + let reclaim_accounts = false; + let pre_txs = client.get_transaction_count().unwrap(); run_accounts_bench( - client, + client.clone(), &[&cluster.funding_keypair], iterations, maybe_space, @@ -754,11 +1091,14 @@ pub mod test { close_nth_batch, maybe_lamports, num_instructions, - None, - false, + mint, + reclaim_accounts, + Some(vec![RpcBench::ProgramAccounts]), + 1, ); + let post_txs = client.get_transaction_count().unwrap(); start.stop(); - info!("{}", start); + info!("{} pre {} post {}", start, pre_txs, post_txs); } #[test] @@ -852,6 +1192,8 @@ pub mod test { num_instructions, Some(spl_mint_keypair.pubkey()), true, + None, + 0, ); start.stop(); info!("{}", start);