From 4f8540fb94f018e9c07ea2e4b519f98ad8235c1a Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Fri, 5 Aug 2022 14:46:51 -0400 Subject: [PATCH 1/5] Move Replay Stage accounts data size checks --- core/src/replay_stage.rs | 537 +++++++++++++++++++++++++++++++++++++- ledger/src/block_error.rs | 6 + 2 files changed, 537 insertions(+), 6 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 85fe23137cfb0e..5bd90957d24e37 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -56,6 +56,7 @@ use { accounts_background_service::AbsRequestSender, bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, + block_cost_limits::MAX_ACCOUNT_DATA_BLOCK_LEN, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }, @@ -2412,7 +2413,7 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, - replay_result_vec: &[ReplaySlotFromBlockstore], + replay_result_vec: Vec, ) -> bool { // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; @@ -2425,7 +2426,15 @@ impl ReplayStage { let bank_slot = replay_result.bank_slot; let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); - if let Some(replay_result) = &replay_result.replay_result { + if let Some(mut replay_result) = replay_result.replay_result { + // check accounts data size and error if the limits were exceeded + // - only check when the bank is complate (for correctness) + // - only check when the replay result is OK (for optimization) + if bank.is_complete() && replay_result.is_ok() { + if let Err(err) = check_accounts_data_size(bank) { + replay_result = Err(err.into()) + } + } match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { @@ -2434,7 +2443,7 @@ impl ReplayStage { blockstore, bank, bank_forks.read().unwrap().root(), - err, + &err, rpc_subscriptions, duplicate_slots_tracker, gossip_duplicate_confirmed_slots, @@ -2683,7 +2692,7 @@ impl ReplayStage { duplicate_slots_to_repair, ancestor_hashes_replay_update_sender, block_metadata_notifier, - &replay_result_vec, + replay_result_vec, ) } else { false @@ -3485,6 +3494,45 @@ impl ReplayStage { } } +/// Check to see if the block exceeded the accounts data size limits +fn check_accounts_data_size(bank: &Bank) -> Result<(), BlockError> { + check_accounts_data_size_block(bank)?; + check_accounts_data_size_total(bank) +} + +/// Check to see if the block exceeded the accounts data size block limit +fn check_accounts_data_size_block(bank: &Bank) -> Result<(), BlockError> { + if !bank + .feature_set + .is_active(&feature_set::cap_accounts_data_size_per_block::id()) + { + return Ok(()); + } + + debug_assert!(MAX_ACCOUNT_DATA_BLOCK_LEN <= i64::MAX as u64); + if bank.load_accounts_data_size_delta_on_chain() > MAX_ACCOUNT_DATA_BLOCK_LEN as i64 { + Err(BlockError::ExceededAccountsDataSizeBlockLimit) + } else { + Ok(()) + } +} + +/// Check to see if the block exceeded the accounts data size total limit +fn check_accounts_data_size_total(bank: &Bank) -> Result<(), BlockError> { + if !bank + .feature_set + .is_active(&feature_set::cap_accounts_data_len::id()) + { + return Ok(()); + } + + if bank.load_accounts_data_size() > bank.accounts_data_size_limit() { + Err(BlockError::ExceededAccountsDataSizeTotalLimit) + } else { + Ok(()) + } +} + #[cfg(test)] pub(crate) mod tests { use { @@ -3498,7 +3546,8 @@ pub(crate) mod tests { vote_simulator::{self, VoteSimulator}, }, crossbeam_channel::unbounded, - solana_entry::entry::{self, Entry}, + solana_client::rpc_response::SlotUpdate, + solana_entry::entry::{self, Entry, EntrySlice as _}, solana_gossip::{cluster_info::Node, crds::Cursor}, solana_ledger::{ blockstore::{entries_to_test_shreds, make_slot_entries, BlockstoreError}, @@ -3510,6 +3559,8 @@ pub(crate) mod tests { solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{create_test_transaction_entries, populate_blockstore_for_tests}, + rpc_subscription_tracker::SubscriptionParams, + rpc_subscriptions::RpcNotification, }, solana_runtime::{ accounts_background_service::AbsRequestSender, @@ -3518,11 +3569,13 @@ pub(crate) mod tests { }, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, - genesis_config, + genesis_config::{self, GenesisConfig}, hash::{hash, Hash}, instruction::InstructionError, + native_token::LAMPORTS_PER_SOL, poh_config::PohConfig, signature::{Keypair, KeypairInsecureClone, Signer}, + system_instruction::MAX_PERMITTED_DATA_LENGTH, system_transaction, transaction::TransactionError, }, @@ -3537,6 +3590,7 @@ pub(crate) mod tests { iter, sync::{atomic::AtomicU64, Arc, RwLock}, }, + tokio::sync::broadcast::error::TryRecvError, trees::{tr, Tree}, }; @@ -6831,4 +6885,475 @@ pub(crate) mod tests { ReplayStage::check_for_vote_only_mode(10, 0, &in_vote_only_mode, &bank_forks); assert!(!in_vote_only_mode.load(Ordering::Relaxed)); } + + /// allocate accounts data *above* the *block* limit. this should kill the bank at the slot + #[test] + fn test_accounts_data_size_block_limit_exceeded() { + const ACCOUNT_SIZE: u64 = MAX_PERMITTED_DATA_LENGTH; + const NUM_ACCOUNTS: u64 = MAX_ACCOUNT_DATA_BLOCK_LEN / ACCOUNT_SIZE; + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config((1_000_000 + NUM_ACCOUNTS + 1) * LAMPORTS_PER_SOL); + + let (ledger_path, _) = create_new_tmp_ledger!(&genesis_config); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let (bank, bank_forks, mock_realloc_program_id) = setup_bank_forks(&genesis_config); + + fill_blockstore( + &blockstore, + &bank, + &mint_keypair, + NUM_ACCOUNTS + 1, + ACCOUNT_SIZE, + None, + &mock_realloc_program_id, + ); + + let (did_complete_bank, _rpc_subscriptions, mut rpc_receiver) = + do_replay(Arc::clone(&blockstore), Arc::clone(&bank_forks)); + assert!(!did_complete_bank); + assert!( + bank.load_accounts_data_size_delta_on_chain() > MAX_ACCOUNT_DATA_BLOCK_LEN as i64 + ); + + // Ensure the block is dead + let result = blockstore.get_rooted_block(bank.slot(), false); + assert!(matches!(result, Err(BlockstoreError::DeadSlot))); + + // Ensure the block is dead due to exceeding the accounts data size block limit + let slot_update = receive_slot_update_notification(&mut rpc_receiver, bank.slot()); + let matches = match slot_update { + SlotUpdate::Dead { + slot, + timestamp: _, + err, + } => { + slot == bank.slot() + && err == "error: InvalidBlock(ExceededAccountsDataSizeBlockLimit)" + } + _ => false, + }; + assert!(matches); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + /// allocate accounts data *above* the *block* limit, then realloc back down to/under the limit. this + /// should be OK + #[test] + fn test_accounts_data_size_block_limit_ok() { + const ACCOUNT_SIZE: u64 = MAX_PERMITTED_DATA_LENGTH; + const NUM_ACCOUNTS: u64 = MAX_ACCOUNT_DATA_BLOCK_LEN / ACCOUNT_SIZE; + const SPILL: u64 = (ACCOUNT_SIZE * (NUM_ACCOUNTS + 1)) - MAX_ACCOUNT_DATA_BLOCK_LEN; + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config((1_000_000 + NUM_ACCOUNTS + 1) * LAMPORTS_PER_SOL); + + let (ledger_path, _) = create_new_tmp_ledger!(&genesis_config); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let (bank, bank_forks, mock_realloc_program_id) = setup_bank_forks(&genesis_config); + + fill_blockstore( + &blockstore, + &bank, + &mint_keypair, + NUM_ACCOUNTS + 1, + ACCOUNT_SIZE, + Some(ACCOUNT_SIZE - SPILL), + &mock_realloc_program_id, + ); + + let (did_complete_bank, _rpc_subscriptions, _rpc_receiver) = + do_replay(Arc::clone(&blockstore), Arc::clone(&bank_forks)); + assert!(did_complete_bank); + assert!(bank.is_complete()); + assert!(bank.is_frozen()); + assert_eq!( + bank.load_accounts_data_size_delta_on_chain(), + MAX_ACCOUNT_DATA_BLOCK_LEN as i64 + ); + + // Ensure the block is not dead + let result = blockstore.get_rooted_block(bank.slot(), false); + assert!(!matches!(result, Err(BlockstoreError::DeadSlot))); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + /// allocate accounts data *above* the *total* limit. this should kill the bank at the slot + #[test] + fn test_accounts_data_size_total_limit_exceeded() { + const ACCOUNT_SIZE: u64 = MAX_PERMITTED_DATA_LENGTH; + const NUM_ACCOUNTS: u64 = 2; + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config((1_000_000 + NUM_ACCOUNTS + 1) * LAMPORTS_PER_SOL); + + let (ledger_path, _) = create_new_tmp_ledger!(&genesis_config); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let (bank, bank_forks, mock_realloc_program_id) = setup_bank_forks(&genesis_config); + let initial_accounts_data_size = + bank.accounts_data_size_limit() - (ACCOUNT_SIZE * NUM_ACCOUNTS); + let delta_from_initial = initial_accounts_data_size - bank.load_accounts_data_size(); + bank.update_accounts_data_size_delta_off_chain_for_tests(delta_from_initial as i64); + assert_eq!(bank.load_accounts_data_size(), initial_accounts_data_size); + + fill_blockstore( + &blockstore, + &bank, + &mint_keypair, + NUM_ACCOUNTS + 1, + ACCOUNT_SIZE, + None, + &mock_realloc_program_id, + ); + + let (did_complete_bank, _rpc_subscriptions, mut rpc_receiver) = + do_replay(Arc::clone(&blockstore), Arc::clone(&bank_forks)); + assert!(!did_complete_bank); + assert!(bank.load_accounts_data_size() > bank.accounts_data_size_limit()); + + // Ensure the block is dead + let result = blockstore.get_rooted_block(bank.slot(), false); + assert!(matches!(result, Err(BlockstoreError::DeadSlot))); + + // Ensure the block is dead due to exceeding the accounts data size total limit + let slot_update = receive_slot_update_notification(&mut rpc_receiver, bank.slot()); + let matches = match slot_update { + SlotUpdate::Dead { + slot, + timestamp: _, + err, + } => { + slot == bank.slot() + && err == "error: InvalidBlock(ExceededAccountsDataSizeTotalLimit)" + } + _ => false, + }; + assert!(matches); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + /// allocate accounts data *above* the *total* limit, then realloc back down to/under the limit. this + /// should be OK + #[test] + fn test_accounts_data_size_total_limit_ok() { + const ACCOUNT_SIZE: u64 = MAX_PERMITTED_DATA_LENGTH; + const NUM_ACCOUNTS: u64 = 2; + const SPILL: u64 = MAX_PERMITTED_DATA_LENGTH / 8; + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config((1_000_000 + NUM_ACCOUNTS + 1) * LAMPORTS_PER_SOL); + + let (ledger_path, _) = create_new_tmp_ledger!(&genesis_config); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let (bank, bank_forks, mock_realloc_program_id) = setup_bank_forks(&genesis_config); + let initial_accounts_data_size = + bank.accounts_data_size_limit() - (ACCOUNT_SIZE * NUM_ACCOUNTS) - SPILL; + let delta_from_initial = initial_accounts_data_size - bank.load_accounts_data_size(); + bank.update_accounts_data_size_delta_off_chain_for_tests(delta_from_initial as i64); + assert_eq!(bank.load_accounts_data_size(), initial_accounts_data_size); + + fill_blockstore( + &blockstore, + &bank, + &mint_keypair, + NUM_ACCOUNTS + 1, + ACCOUNT_SIZE, + Some(SPILL), + &mock_realloc_program_id, + ); + + let (did_complete_bank, _rpc_subscriptions, _rpc_receiver) = + do_replay(Arc::clone(&blockstore), Arc::clone(&bank_forks)); + assert!(did_complete_bank); + assert!(bank.is_complete()); + assert!(bank.is_frozen()); + assert_eq!( + bank.load_accounts_data_size(), + bank.accounts_data_size_limit() + ); + + // Ensure the block is not dead + let result = blockstore.get_rooted_block(bank.slot(), false); + assert!(!matches!(result, Err(BlockstoreError::DeadSlot))); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + fn setup_bank_forks( + genesis_config: &GenesisConfig, + ) -> (Arc, Arc>, Pubkey) { + let mut bank = Bank::new_for_tests(genesis_config); + assert!(bank + .feature_set + .is_active(&feature_set::cap_accounts_data_size_per_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::cap_accounts_data_len::id())); + let mock_realloc_program_id = Pubkey::new_unique(); + bank.add_builtin( + "mock_realloc_program", + &mock_realloc_program_id, + mock_realloc::process_instruction, + ); + bank.fill_bank_with_ticks_for_tests(); + let mut bank_forks = BankForks::new(bank); + let bank = bank_forks.working_bank(); + let bank = Bank::new_from_parent(&bank, &Pubkey::default(), bank.slot() + 1); + let bank = bank_forks.insert(bank); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + + (bank, bank_forks, mock_realloc_program_id) + } + + fn fill_blockstore( + blockstore: &Blockstore, + bank: &Bank, + mint_keypair: &Keypair, + num_accounts: u64, + account_size: u64, + realloc_new_size: Option, + realloc_program_id: &Pubkey, + ) { + let mut new_account = Keypair::new(); + let mut hash_for_entry = bank.last_blockhash(); + + // create enough new accounts to exceed the accounts data size block-or-total limit + let mut entries = (0..num_accounts) + .map(|_| { + new_account = Keypair::new(); + let _ = new_account.pubkey(); + let transaction = system_transaction::create_account( + mint_keypair, + &new_account, + bank.last_blockhash(), + LAMPORTS_PER_SOL, + account_size, + realloc_program_id, + ); + let entry = solana_entry::entry::next_entry(&hash_for_entry, 1, vec![transaction]); + hash_for_entry = entry.hash; + entry + }) + .collect::>(); + + // then optionally realloc down below limit + if let Some(new_size) = realloc_new_size { + let transaction = mock_realloc::create_transaction( + mint_keypair, + &new_account.pubkey(), + new_size as usize, + *realloc_program_id, + bank.last_blockhash(), + ); + let entry = solana_entry::entry::next_entry(&hash_for_entry, 1, vec![transaction]); + hash_for_entry = entry.hash; + entries.push(entry); + } + + // fill the rest of the slot with ticks so it will be complete + let num_remaining_ticks = bank.ticks_per_slot() - entries.tick_count(); + let tick_entries = + solana_entry::entry::create_ticks(num_remaining_ticks, 1, hash_for_entry); + let entries = [entries, tick_entries].concat(); + + let shreds = solana_ledger::blockstore::entries_to_test_shreds( + &entries, + bank.slot(), + bank.parent_slot(), + true, + 0, + ); + blockstore.insert_shreds(shreds, None, false).unwrap(); + blockstore.set_roots(std::iter::once(&bank.slot())).unwrap(); + /* + * blockstore + * .put_meta_bytes( + * bank.parent_slot(), + * &bincode::serialize(&SlotMeta::default()).unwrap(), + * ) + * .unwrap(); + */ + /* + * blockstore + * .put_meta_bytes( + * bank.slot(), + * &bincode::serialize(&SlotMeta::default()).unwrap(), + * ) + * .unwrap(); + */ + } + + fn do_replay( + blockstore: Arc, + bank_forks: Arc>, + ) -> ( + bool, + Arc, + tokio::sync::broadcast::Receiver, + ) { + let (mut progress_map, mut heaviest_subtree_fork_choice) = + ReplayStage::initialize_progress_and_fork_choice_with_locked_bank_forks( + &bank_forks, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ); + let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore( + &Arc::default(), + Arc::default(), + Arc::clone(&blockstore), + Arc::clone(&bank_forks), + Arc::default(), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + + let rpc_receiver = rpc_subscriptions.control().broadcast_receiver(); + let _subscription_token = rpc_subscriptions + .control() + .subscribe(SubscriptionParams::SlotsUpdates); + + let (replay_vote_sender, _replay_vote_receiver) = crossbeam_channel::unbounded(); + let (cluster_slots_update_sender, _cluster_slots_update_receiver) = + crossbeam_channel::unbounded(); + let (cost_update_sender, _cost_update_receiver) = crossbeam_channel::unbounded(); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + crossbeam_channel::unbounded(); + + let did_complete_bank = ReplayStage::replay_active_banks( + &blockstore, + &bank_forks, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + &mut progress_map, + None, + None, + &VerifyRecyclers::default(), + &mut heaviest_subtree_fork_choice, + &replay_vote_sender, + &None, + &None, + &rpc_subscriptions, + &mut DuplicateSlotsTracker::default(), + &GossipDuplicateConfirmedSlots::default(), + &mut EpochSlotsFrozenSlots::default(), + &mut UnfrozenGossipVerifiedVoteHashes::default(), + &mut LatestValidatorVotesForFrozenBanks::default(), + &cluster_slots_update_sender, + &cost_update_sender, + &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, + None, + &mut ReplayTiming::default(), + None, + ); + + (did_complete_bank, rpc_subscriptions, rpc_receiver) + } + + fn receive_slot_update_notification( + rpc_receiver: &mut tokio::sync::broadcast::Receiver, + slot: Slot, + ) -> SlotUpdate { + let timer = Instant::now(); + loop { + match rpc_receiver.try_recv() { + Ok(notification) => { + if let Some(json) = notification.json.upgrade() { + if let Ok(json) = + serde_json::from_str::(&json.as_ref().clone()) + { + if let Ok(slot_update) = serde_json::from_value::( + json["params"]["result"].clone(), + ) { + if slot_update.slot() == slot { + return slot_update; + } + } + } + } + } + Err(TryRecvError::Closed) => { + panic!("Channel closed while waiting for notification!") + } + _ => { + if timer.elapsed() > Duration::from_secs(1) { + panic!("Timed out while waiting for notification!"); + } + } + }; + std::thread::yield_now(); + } + } + + mod mock_realloc { + use { + super::*, + serde::{Deserialize, Serialize}, + solana_program_runtime::invoke_context::InvokeContext, + }; + + #[derive(Debug, Serialize, Deserialize)] + enum Instruction { + Realloc { new_size: usize }, + } + + pub fn process_instruction( + _first_instruction_account: usize, + invoke_context: &mut InvokeContext, + ) -> result::Result<(), InstructionError> { + let transaction_context = &invoke_context.transaction_context; + let instruction_context = transaction_context.get_current_instruction_context()?; + let instruction_data = instruction_context.get_instruction_data(); + if let Ok(instruction) = bincode::deserialize(instruction_data) { + match instruction { + Instruction::Realloc { new_size } => { + let mut account = instruction_context + .try_borrow_instruction_account(transaction_context, 0)?; + account.set_data_length(new_size) + } + } + } else { + Err(InstructionError::InvalidInstructionData) + } + } + + pub fn create_transaction( + payer: &Keypair, + reallocd: &Pubkey, + new_size: usize, + mock_realloc_program_id: Pubkey, + recent_blockhash: Hash, + ) -> Transaction { + let account_metas = vec![solana_sdk::instruction::AccountMeta::new(*reallocd, false)]; + let instruction = solana_sdk::instruction::Instruction::new_with_bincode( + mock_realloc_program_id, + &Instruction::Realloc { new_size }, + account_metas, + ); + Transaction::new_signed_with_payer( + &[instruction], + Some(&payer.pubkey()), + &[payer], + recent_blockhash, + ) + } + } } diff --git a/ledger/src/block_error.rs b/ledger/src/block_error.rs index d79b3e0aa42b08..3591b2e2d1bc16 100644 --- a/ledger/src/block_error.rs +++ b/ledger/src/block_error.rs @@ -37,4 +37,10 @@ pub enum BlockError { #[error("duplicate block")] DuplicateBlock, + + #[error("exceeded accounts data size total limit")] + ExceededAccountsDataSizeTotalLimit, + + #[error("exceeded accounts data size block limit")] + ExceededAccountsDataSizeBlockLimit, } From bece58c61b394f419fccb9412843c10fa7d37250 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Mon, 8 Aug 2022 15:20:30 -0400 Subject: [PATCH 2/5] pr: remove dead code --- core/src/replay_stage.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5bd90957d24e37..99aabbac961503 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -7184,22 +7184,6 @@ pub(crate) mod tests { ); blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.set_roots(std::iter::once(&bank.slot())).unwrap(); - /* - * blockstore - * .put_meta_bytes( - * bank.parent_slot(), - * &bincode::serialize(&SlotMeta::default()).unwrap(), - * ) - * .unwrap(); - */ - /* - * blockstore - * .put_meta_bytes( - * bank.slot(), - * &bincode::serialize(&SlotMeta::default()).unwrap(), - * ) - * .unwrap(); - */ } fn do_replay( From c0a437eed1a1dcbb77c3fc6068d04d3c5e497e8e Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Mon, 8 Aug 2022 15:20:47 -0400 Subject: [PATCH 3/5] pr: move checks --- core/src/replay_stage.rs | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 99aabbac961503..aeb39edf2cf6fd 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2413,7 +2413,7 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, - replay_result_vec: Vec, + replay_result_vec: &[ReplaySlotFromBlockstore], ) -> bool { // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; @@ -2426,15 +2426,7 @@ impl ReplayStage { let bank_slot = replay_result.bank_slot; let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); - if let Some(mut replay_result) = replay_result.replay_result { - // check accounts data size and error if the limits were exceeded - // - only check when the bank is complate (for correctness) - // - only check when the replay result is OK (for optimization) - if bank.is_complete() && replay_result.is_ok() { - if let Err(err) = check_accounts_data_size(bank) { - replay_result = Err(err.into()) - } - } + if let Some(replay_result) = &replay_result.replay_result { match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { @@ -2462,6 +2454,30 @@ impl ReplayStage { assert_eq!(bank_slot, bank.slot()); if bank.is_complete() { + // Once the bank is complete, ensure it hasn't exceeded accounts data size limits. + // This must be deterministic across the whole cluster, so cannot be within + // parallel transaction processing. + match check_accounts_data_size(bank) { + Err(err) => { + Self::mark_dead_slot( + blockstore, + bank, + bank_forks.read().unwrap().root(), + &err.into(), + rpc_subscriptions, + duplicate_slots_tracker, + gossip_duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + ); + continue; + } + _ => {} + } + let mut bank_complete_time = Measure::start("bank_complete_time"); let bank_progress = progress .get_mut(&bank.slot()) @@ -2692,7 +2708,7 @@ impl ReplayStage { duplicate_slots_to_repair, ancestor_hashes_replay_update_sender, block_metadata_notifier, - replay_result_vec, + &replay_result_vec, ) } else { false From 6c7fb5be342f8d8f136fa5fb66838afbaf16db39 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Mon, 8 Aug 2022 15:28:54 -0400 Subject: [PATCH 4/5] clippy --- core/src/replay_stage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index aeb39edf2cf6fd..55478c4a7d2984 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2435,7 +2435,7 @@ impl ReplayStage { blockstore, bank, bank_forks.read().unwrap().root(), - &err, + err, rpc_subscriptions, duplicate_slots_tracker, gossip_duplicate_confirmed_slots, From 98b1ef748eb0e81ed11c3b7c3d5f1d8c6797bcbb Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Mon, 8 Aug 2022 15:32:11 -0400 Subject: [PATCH 5/5] clippy --- core/src/replay_stage.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 55478c4a7d2984..6214450e5b7eb9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2457,25 +2457,22 @@ impl ReplayStage { // Once the bank is complete, ensure it hasn't exceeded accounts data size limits. // This must be deterministic across the whole cluster, so cannot be within // parallel transaction processing. - match check_accounts_data_size(bank) { - Err(err) => { - Self::mark_dead_slot( - blockstore, - bank, - bank_forks.read().unwrap().root(), - &err.into(), - rpc_subscriptions, - duplicate_slots_tracker, - gossip_duplicate_confirmed_slots, - epoch_slots_frozen_slots, - progress, - heaviest_subtree_fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - ); - continue; - } - _ => {} + if let Err(err) = check_accounts_data_size(bank) { + Self::mark_dead_slot( + blockstore, + bank, + bank_forks.read().unwrap().root(), + &err.into(), + rpc_subscriptions, + duplicate_slots_tracker, + gossip_duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + ); + continue; } let mut bank_complete_time = Measure::start("bank_complete_time");