Skip to content

Commit

Permalink
Add limit to looping in banking-stage
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored and willhickey committed Feb 13, 2024
1 parent 0895cb8 commit da2078d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 15 deletions.
3 changes: 2 additions & 1 deletion core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ impl Consumer {
transaction_status_sender_enabled,
&mut execute_and_commit_timings.execute_timings,
None, // account_overrides
self.log_messages_bytes_limit
self.log_messages_bytes_limit,
true,
));
execute_and_commit_timings.load_execute_us = load_execute_us;

Expand Down
25 changes: 15 additions & 10 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl Stats {
("reloads", reloads, i64),
("insertions", insertions, i64),
("lost_insertions", lost_insertions, i64),
("replacements", replacements, i64),
("replace_entry", replacements, i64),
("one_hit_wonders", one_hit_wonders, i64),
("prunes_orphan", prunes_orphan, i64),
("prunes_environment", prunes_environment, i64),
Expand Down Expand Up @@ -553,6 +553,7 @@ pub struct LoadedProgramsForTxBatch {
entries: HashMap<Pubkey, Arc<LoadedProgram>>,
slot: Slot,
pub environments: ProgramRuntimeEnvironments,
pub hit_max_limit: bool,
}

impl LoadedProgramsForTxBatch {
Expand All @@ -561,6 +562,7 @@ impl LoadedProgramsForTxBatch {
entries: HashMap::new(),
slot,
environments,
hit_max_limit: false,
}
}

Expand Down Expand Up @@ -736,10 +738,10 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
key: Pubkey,
entry: Arc<LoadedProgram>,
current_slot: Slot,
) -> Arc<LoadedProgram> {
) -> (bool, Arc<LoadedProgram>) {
let (was_occupied, entry) = self.replenish(key, entry, current_slot);
debug_assert!(!was_occupied);
entry
(was_occupied, entry)
}

pub fn prune_by_deployment_slot(&mut self, slot: Slot) {
Expand Down Expand Up @@ -985,7 +987,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
slot: Slot,
key: Pubkey,
loaded_program: Arc<LoadedProgram>,
) {
) -> bool {
let second_level = self.entries.entry(key).or_default();
debug_assert_eq!(
second_level.cooperative_loading_lock,
Expand All @@ -1008,8 +1010,9 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
{
self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed);
}
self.assign_program(key, loaded_program, slot);
let (was_replaced, _) = self.assign_program(key, loaded_program, slot);
self.loading_task_waiter.notify();
was_replaced
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
Expand Down Expand Up @@ -1232,11 +1235,13 @@ mod tests {
slot: Slot,
reason: LoadedProgramType,
) -> Arc<LoadedProgram> {
cache.assign_program(
key,
Arc::new(LoadedProgram::new_tombstone(slot, reason)),
u64::MAX,
)
cache
.assign_program(
key,
Arc::new(LoadedProgram::new_tombstone(slot, reason)),
u64::MAX,
)
.1
}

fn insert_unloaded_program<FG: ForkGraph>(
Expand Down
30 changes: 26 additions & 4 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4378,6 +4378,7 @@ impl Bank {
&mut timings,
Some(&account_overrides),
None,
true,
);

let post_simulation_accounts = loaded_transactions
Expand Down Expand Up @@ -5047,6 +5048,7 @@ impl Bank {
fn replenish_program_cache(
&self,
program_accounts_map: &HashMap<Pubkey, (&Pubkey, u64)>,
limit_to_load_programs: bool,
) -> LoadedProgramsForTxBatch {
let mut missing_programs: Vec<(Pubkey, (LoadedProgramMatchCriteria, u64))> =
if self.check_program_modification_slot {
Expand Down Expand Up @@ -5092,11 +5094,16 @@ impl Bank {
}
// Submit our last completed loading task.
if let Some((key, program)) = program_to_store.take() {
loaded_programs_cache.finish_cooperative_loading_task(
if loaded_programs_cache.finish_cooperative_loading_task(
self.slot(),
key,
program,
);
) && limit_to_load_programs
{
let mut ret = LoadedProgramsForTxBatch::default();
ret.hit_max_limit = true;
return ret;
}
}
// Figure out which program needs to be loaded next.
let program_to_load = loaded_programs_cache.extract(
Expand Down Expand Up @@ -5128,7 +5135,7 @@ impl Bank {
loaded_programs_for_txs.unwrap()
}

#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
batch: &TransactionBatch,
Expand All @@ -5139,6 +5146,7 @@ impl Bank {
timings: &mut ExecuteTimings,
account_overrides: Option<&AccountOverrides>,
log_messages_bytes_limit: Option<usize>,
limit_to_load_programs: bool,
) -> LoadAndExecuteTransactionsOutput {
let sanitized_txs = batch.sanitized_transactions();
debug!("processing transactions: {}", sanitized_txs.len());
Expand Down Expand Up @@ -5208,9 +5216,22 @@ impl Bank {
}

let programs_loaded_for_tx_batch = Rc::new(RefCell::new(
self.replenish_program_cache(&program_accounts_map),
self.replenish_program_cache(&program_accounts_map, limit_to_load_programs),
));

if programs_loaded_for_tx_batch.borrow().hit_max_limit {
return LoadAndExecuteTransactionsOutput {
loaded_transactions: vec![],
execution_results: vec![],
retryable_transaction_indexes: vec![],
executed_transactions_count: 0,
executed_non_vote_transactions_count: 0,
executed_with_successful_result_count: 0,
signature_count: 0,
error_counters,
};
}

let mut load_time = Measure::start("accounts_load");
let mut loaded_transactions = self.rc.accounts.load_accounts(
&self.ancestors,
Expand Down Expand Up @@ -6309,6 +6330,7 @@ impl Bank {
timings,
None,
log_messages_bytes_limit,
false,
);

let (last_blockhash, lamports_per_signature) =
Expand Down

0 comments on commit da2078d

Please sign in to comment.