From 618222621a34e7da7debeb748f4432becc25fdb6 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 28 Feb 2024 17:36:45 -0800 Subject: [PATCH] Add limit to looping in banking-stage (#35342) --- core/src/banking_stage/consumer.rs | 3 ++- program-runtime/src/loaded_programs.rs | 9 ++++++--- runtime/src/bank.rs | 6 +++++- svm/src/transaction_processor.rs | 26 ++++++++++++++++++++++---- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 81de74022432d9..f4ac6c6040eda8 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -598,7 +598,8 @@ impl Consumer { transaction_status_sender_enabled, &mut execute_and_commit_timings.execute_timings, None, // account_overrides - self.log_messages_bytes_limit + self.log_messages_bytes_limit, + true, )); execute_and_commit_timings.load_execute_us = load_execute_us; diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 1c29adc8c6c246..8e3e670469c45c 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -195,7 +195,7 @@ impl Stats { ("reloads", reloads, i64), ("insertions", insertions, i64), ("lost_insertions", lost_insertions, i64), - ("replacements", replacements, i64), + ("replace_entry", replacements, i64), ("one_hit_wonders", one_hit_wonders, i64), ("prunes_orphan", prunes_orphan, i64), ("prunes_environment", prunes_environment, i64), @@ -618,6 +618,7 @@ pub struct LoadedProgramsForTxBatch { entries: HashMap>, slot: Slot, pub environments: ProgramRuntimeEnvironments, + pub hit_max_limit: bool, } impl LoadedProgramsForTxBatch { @@ -626,6 +627,7 @@ impl LoadedProgramsForTxBatch { entries: HashMap::new(), slot, environments, + hit_max_limit: false, } } @@ -964,7 +966,7 @@ impl LoadedPrograms { slot: Slot, key: Pubkey, loaded_program: Arc, - ) { + ) -> bool { let second_level = self.entries.entry(key).or_default(); debug_assert_eq!( second_level.cooperative_loading_lock, @@ -985,8 +987,9 @@ impl LoadedPrograms { { self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); } - self.assign_program(key, loaded_program); + let was_occupied = self.assign_program(key, loaded_program); self.loading_task_waiter.notify(); + was_occupied } pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index ccd3f7c522737f..d72e3771cb4408 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4299,6 +4299,7 @@ impl Bank { &mut timings, Some(&account_overrides), None, + true, ); let post_simulation_accounts = loaded_transactions @@ -4537,7 +4538,7 @@ impl Bank { balances } - #[allow(clippy::type_complexity)] + #[allow(clippy::too_many_arguments, clippy::type_complexity)] pub fn load_and_execute_transactions( &self, batch: &TransactionBatch, @@ -4548,6 +4549,7 @@ impl Bank { timings: &mut ExecuteTimings, account_overrides: Option<&AccountOverrides>, log_messages_bytes_limit: Option, + limit_to_load_programs: bool, ) -> LoadAndExecuteTransactionsOutput { let sanitized_txs = batch.sanitized_transactions(); debug!("processing transactions: {}", sanitized_txs.len()); @@ -4614,6 +4616,7 @@ impl Bank { account_overrides, self.builtin_programs.iter(), log_messages_bytes_limit, + limit_to_load_programs, ); let mut signature_count = 0; @@ -5663,6 +5666,7 @@ impl Bank { timings, None, log_messages_bytes_limit, + false, ); let (last_blockhash, lamports_per_signature) = diff --git a/svm/src/transaction_processor.rs b/svm/src/transaction_processor.rs index 0c456c918d68ff..62f06585fff4ac 100644 --- a/svm/src/transaction_processor.rs +++ b/svm/src/transaction_processor.rs @@ -190,6 +190,7 @@ impl TransactionBatchProcessor { account_overrides: Option<&AccountOverrides>, builtin_programs: impl Iterator, log_messages_bytes_limit: Option, + limit_to_load_programs: bool, ) -> LoadAndExecuteSanitizedTransactionsOutput { let mut program_accounts_map = Self::filter_executable_program_accounts( callbacks, @@ -202,9 +203,18 @@ impl TransactionBatchProcessor { program_accounts_map.insert(*builtin_program, (&native_loader, 0)); } - let programs_loaded_for_tx_batch = Rc::new(RefCell::new( - self.replenish_program_cache(callbacks, &program_accounts_map), - )); + let programs_loaded_for_tx_batch = Rc::new(RefCell::new(self.replenish_program_cache( + callbacks, + &program_accounts_map, + limit_to_load_programs, + ))); + + if programs_loaded_for_tx_batch.borrow().hit_max_limit { + return LoadAndExecuteSanitizedTransactionsOutput { + loaded_transactions: vec![], + execution_results: vec![], + }; + } let mut load_time = Measure::start("accounts_load"); let mut loaded_transactions = load_accounts( @@ -356,6 +366,7 @@ impl TransactionBatchProcessor { &self, callback: &CB, program_accounts_map: &HashMap, + limit_to_load_programs: bool, ) -> LoadedProgramsForTxBatch { let mut missing_programs: Vec<(Pubkey, (LoadedProgramMatchCriteria, u64))> = if self.check_program_modification_slot { @@ -401,7 +412,14 @@ impl TransactionBatchProcessor { } // Submit our last completed loading task. if let Some((key, program)) = program_to_store.take() { - loaded_programs_cache.finish_cooperative_loading_task(self.slot, key, program); + if loaded_programs_cache + .finish_cooperative_loading_task(self.slot, key, program) + && limit_to_load_programs + { + let mut ret = LoadedProgramsForTxBatch::default(); + ret.hit_max_limit = true; + return ret; + } } // Figure out which program needs to be loaded next. let program_to_load = loaded_programs_cache.extract(