From bf0a3684eb77512ea6ea3b90a2c624c103bc9a9b Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 8 Mar 2024 12:52:35 -0600 Subject: [PATCH] Make ReplayStage create the parallel fork replay threadpool (#137) ReplayStage owning the pool allows for subsequent work to configure the size of the pool; configuring the size of the pool inside of the lazy_static would have been a little messy --- core/src/replay_stage.rs | 136 +++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 63 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 46014e3f7912de..3683e257ed10a8 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -33,7 +33,6 @@ use { window_service::DuplicateSlotReceiver, }, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - lazy_static::lazy_static, rayon::{prelude::*, ThreadPool}, solana_entry::entry::VerifyRecyclers, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc, @@ -102,14 +101,6 @@ const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; -lazy_static! { - static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() - .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) - .thread_name(|i| format!("solReplay{i:02}")) - .build() - .unwrap(); -} - #[derive(PartialEq, Eq, Debug)] pub enum HeaviestForkFailures { LockedOut(u64), @@ -131,6 +122,11 @@ pub enum HeaviestForkFailures { ), } +enum ForkReplayMode { + Serial, + Parallel(ThreadPool), +} + #[derive(PartialEq, Eq, Debug)] enum ConfirmationType { SupermajorityVoted, @@ -656,6 +652,16 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; + let replay_mode = if replay_slots_concurrently { + ForkReplayMode::Serial + } else { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) + .thread_name(|i| format!("solReplay{i:02}")) + .build() + .expect("new rayon threadpool"); + ForkReplayMode::Parallel(pool) + }; Self::reset_poh_recorder( &my_pubkey, @@ -717,7 +723,7 @@ impl ReplayStage { block_metadata_notifier.clone(), &mut replay_timing, log_messages_bytes_limit, - replay_slots_concurrently, + &replay_mode, &prioritization_fee_cache, &mut purge_repair_slot_counter, ); @@ -2706,6 +2712,7 @@ impl ReplayStage { fn replay_active_banks_concurrently( blockstore: &Blockstore, bank_forks: &RwLock, + thread_pool: &ThreadPool, my_pubkey: &Pubkey, vote_account: &Pubkey, progress: &mut ProgressMap, @@ -2723,7 +2730,7 @@ impl ReplayStage { let longest_replay_time_us = AtomicU64::new(0); // Allow for concurrent replaying of slots from different forks. - let replay_result_vec: Vec = PAR_THREAD_POOL.install(|| { + let replay_result_vec: Vec = thread_pool.install(|| { active_bank_slots .into_par_iter() .map(|bank_slot| { @@ -2737,7 +2744,7 @@ impl ReplayStage { trace!( "Replay active bank: slot {}, thread_idx {}", bank_slot, - PAR_THREAD_POOL.current_thread_index().unwrap_or_default() + thread_pool.current_thread_index().unwrap_or_default() ); let mut progress_lock = progress.write().unwrap(); if progress_lock @@ -3175,7 +3182,7 @@ impl ReplayStage { block_metadata_notifier: Option, replay_timing: &mut ReplayLoopTiming, log_messages_bytes_limit: Option, - replay_slots_concurrently: bool, + replay_mode: &ForkReplayMode, prioritization_fee_cache: &PrioritizationFeeCache, purge_repair_slot_counter: &mut PurgeRepairSlotCounter, ) -> bool /* completed a bank */ { @@ -3186,11 +3193,17 @@ impl ReplayStage { num_active_banks, active_bank_slots ); - if num_active_banks > 0 { - let replay_result_vec = if num_active_banks > 1 && replay_slots_concurrently { + if active_bank_slots.is_empty() { + return false; + } + + let replay_result_vec = match replay_mode { + // Skip the overhead of the threadpool if there is only one bank to play + ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => { Self::replay_active_banks_concurrently( blockstore, bank_forks, + thread_pool, my_pubkey, vote_account, progress, @@ -3203,55 +3216,52 @@ impl ReplayStage { &active_bank_slots, prioritization_fee_cache, ) - } else { - active_bank_slots - .iter() - .map(|bank_slot| { - Self::replay_active_bank( - blockstore, - bank_forks, - my_pubkey, - vote_account, - progress, - transaction_status_sender, - entry_notification_sender, - verify_recyclers, - replay_vote_sender, - replay_timing, - log_messages_bytes_limit, - *bank_slot, - prioritization_fee_cache, - ) - }) - .collect() - }; + } + ForkReplayMode::Serial | ForkReplayMode::Parallel(_) => active_bank_slots + .iter() + .map(|bank_slot| { + Self::replay_active_bank( + blockstore, + bank_forks, + my_pubkey, + vote_account, + progress, + transaction_status_sender, + entry_notification_sender, + verify_recyclers, + replay_vote_sender, + replay_timing, + log_messages_bytes_limit, + *bank_slot, + prioritization_fee_cache, + ) + }) + .collect(), + }; - Self::process_replay_results( - blockstore, - bank_forks, - progress, - transaction_status_sender, - cache_block_meta_sender, - heaviest_subtree_fork_choice, - bank_notification_sender, - rewards_recorder_sender, - rpc_subscriptions, - duplicate_slots_tracker, - duplicate_confirmed_slots, - epoch_slots_frozen_slots, - unfrozen_gossip_verified_vote_hashes, - latest_validator_votes_for_frozen_banks, - cluster_slots_update_sender, - cost_update_sender, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - block_metadata_notifier, - &replay_result_vec, - purge_repair_slot_counter, - ) - } else { - false - } + Self::process_replay_results( + blockstore, + bank_forks, + progress, + transaction_status_sender, + cache_block_meta_sender, + heaviest_subtree_fork_choice, + bank_notification_sender, + rewards_recorder_sender, + rpc_subscriptions, + duplicate_slots_tracker, + duplicate_confirmed_slots, + epoch_slots_frozen_slots, + unfrozen_gossip_verified_vote_hashes, + latest_validator_votes_for_frozen_banks, + cluster_slots_update_sender, + cost_update_sender, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + block_metadata_notifier, + &replay_result_vec, + purge_repair_slot_counter, + ) } #[allow(clippy::too_many_arguments)]