Skip to content

Commit

Permalink
Move block_cost_limit tracking to BankingStage in preparation for SIM…
Browse files Browse the repository at this point in the history
…D-0207 (Backport #753) (#786)
  • Loading branch information
buffalu authored Feb 11, 2025
1 parent 88e87ff commit c36a223
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 426 deletions.
1 change: 1 addition & 0 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ fn main() {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// This is so that the signal_receiver does not go out of scope after the closure.
Expand Down
2 changes: 2 additions & 0 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
&|_| 0,
);
});

Expand Down Expand Up @@ -322,6 +323,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let chunk_len = verified.len() / CHUNKS;
Expand Down
1 change: 1 addition & 0 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
&bank,
transaction_iter.next().unwrap(),
0,
&|_| 0,
);
assert!(summary
.execute_and_commit_transactions_output
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ impl BankingSimulator {
false,
collections::HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let (&_slot, &raw_base_event_time) = freeze_time_by_slot
Expand Down
26 changes: 24 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -365,6 +365,8 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
// callback function for compute space reservation for BundleStage
block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
Self::new_num_threads(
block_production_method,
Expand All @@ -383,6 +385,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_block_cost_limit_reservation_cb,
)
}

Expand All @@ -404,6 +407,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
match block_production_method {
BlockProductionMethod::ThreadLocalMultiIterator => {
Expand All @@ -422,6 +426,7 @@ impl BankingStage {
prioritization_fee_cache,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
Expand All @@ -440,6 +445,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
),
}
}
Expand All @@ -460,6 +466,7 @@ impl BankingStage {
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -528,6 +535,7 @@ impl BankingStage {
unprocessed_transaction_storage,
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
)
})
.collect();
Expand All @@ -551,6 +559,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -599,6 +608,7 @@ impl BankingStage {
),
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
));
}

Expand Down Expand Up @@ -628,11 +638,12 @@ impl BankingStage {
);

worker_metrics.push(consume_worker.metrics_handle());
let cb = block_cost_limit_reservation_cb.clone();
bank_thread_hdls.push(
Builder::new()
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
let _ = consume_worker.run();
let _ = consume_worker.run(cb);
})
.unwrap(),
)
Expand Down Expand Up @@ -687,6 +698,7 @@ impl BankingStage {
unprocessed_transaction_storage: UnprocessedTransactionStorage,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
Expand All @@ -708,6 +720,7 @@ impl BankingStage {
&consumer,
id,
unprocessed_transaction_storage,
block_cost_limit_reservation_cb,
)
})
.unwrap()
Expand All @@ -722,6 +735,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand All @@ -747,6 +761,7 @@ impl BankingStage {
unprocessed_transaction_storage,
banking_stage_stats,
slot_metrics_tracker,
block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
Expand Down Expand Up @@ -787,6 +802,7 @@ impl BankingStage {
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64,
) {
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);
Expand All @@ -806,6 +822,7 @@ impl BankingStage {
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
&block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
Expand Down Expand Up @@ -939,6 +956,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
drop(non_vote_sender);
drop(tpu_vote_sender);
Expand Down Expand Up @@ -997,6 +1015,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
trace!("sending bank");
drop(non_vote_sender);
Expand Down Expand Up @@ -1084,6 +1103,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1261,6 +1281,7 @@ mod tests {
&Arc::new(PrioritizationFeeCache::new(0u64)),
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -1464,6 +1485,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
Expand Down
30 changes: 20 additions & 10 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ impl ConsumeWorker {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError> {
pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError> {
loop {
let work = self.consume_receiver.recv()?;
self.consume_loop(work)?;
self.consume_loop(work, &reservation_cb)?;
}
}

fn consume_loop(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume_loop(
&self,
work: ConsumeWork,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError> {
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
let Some(mut bank) = maybe_consume_bank else {
self.metrics
Expand Down Expand Up @@ -99,18 +103,24 @@ impl ConsumeWorker {
return self.retry_drain(work);
}
}
self.consume(&bank, work)?;
self.consume(&bank, work, reservation_cb)?;
}

Ok(())
}

/// Consume a single batch.
fn consume(&self, bank: &Arc<Bank>, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume(
&self,
bank: &Arc<Bank>,
work: ConsumeWork,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
&work.max_ages,
reservation_cb,
);

self.metrics.update_for_consume(&output);
Expand Down Expand Up @@ -847,7 +857,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));

let pubkey1 = Pubkey::new_unique();

Expand Down Expand Up @@ -892,7 +902,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -941,7 +951,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -993,7 +1003,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1068,7 +1078,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down
Loading

0 comments on commit c36a223

Please sign in to comment.