Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move block_cost_limit tracking to BankingStage in preparation for SIMD-0207 #753

Merged
merged 12 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ fn main() {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// This is so that the signal_receiver does not go out of scope after the closure.
Expand Down
2 changes: 2 additions & 0 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
&|_| 0,
);
});

Expand Down Expand Up @@ -327,6 +328,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let chunk_len = verified.len() / CHUNKS;
Expand Down
1 change: 1 addition & 0 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
&bank,
transaction_iter.next().unwrap(),
0,
&|_| 0,
);
assert!(summary
.execute_and_commit_transactions_output
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ impl BankingSimulator {
false,
collections::HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let (&_slot, &raw_base_event_time) = freeze_time_by_slot
Expand Down
23 changes: 21 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -366,6 +366,8 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
// callback function for compute space reservation for BundleStage
block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
Self::new_num_threads(
block_production_method,
Expand All @@ -384,6 +386,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_block_cost_limit_reservation_cb,
)
}

Expand All @@ -405,6 +408,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
match block_production_method {
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
Expand All @@ -423,6 +427,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
),
}
}
Expand All @@ -444,6 +449,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -492,6 +498,7 @@ impl BankingStage {
),
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
));
}

Expand Down Expand Up @@ -521,11 +528,12 @@ impl BankingStage {
);

worker_metrics.push(consume_worker.metrics_handle());
let cb = block_cost_limit_reservation_cb.clone();
bank_thread_hdls.push(
Builder::new()
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
let _ = consume_worker.run();
let _ = consume_worker.run(cb);
})
.unwrap(),
)
Expand Down Expand Up @@ -589,6 +597,7 @@ impl BankingStage {
unprocessed_transaction_storage: UnprocessedTransactionStorage,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
Expand All @@ -610,6 +619,7 @@ impl BankingStage {
&consumer,
id,
unprocessed_transaction_storage,
block_cost_limit_reservation_cb,
)
})
.unwrap()
Expand All @@ -623,6 +633,7 @@ impl BankingStage {
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand All @@ -648,6 +659,7 @@ impl BankingStage {
unprocessed_transaction_storage,
banking_stage_stats,
slot_metrics_tracker,
block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
Expand Down Expand Up @@ -686,6 +698,7 @@ impl BankingStage {
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64,
) {
let mut banking_stage_stats = BankingStageStats::new(id);

Expand All @@ -703,6 +716,7 @@ impl BankingStage {
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
Expand Down Expand Up @@ -840,6 +854,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
drop(non_vote_sender);
drop(tpu_vote_sender);
Expand Down Expand Up @@ -902,6 +917,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
trace!("sending bank");
drop(non_vote_sender);
Expand Down Expand Up @@ -993,6 +1009,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1170,6 +1187,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -1377,6 +1395,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
Expand Down
24 changes: 15 additions & 9 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError<Tx>> {
loop {
let work = self.consume_receiver.recv()?;
self.consume_loop(work)?;
self.consume_loop(work, &reservation_cb)?;
}
}

fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
fn consume_loop(
&self,
work: ConsumeWork<Tx>,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError<Tx>> {
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
let Some(mut bank) = maybe_consume_bank else {
self.metrics
Expand Down Expand Up @@ -97,7 +101,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
return self.retry_drain(work);
}
}
self.consume(&bank, work)?;
self.consume(&bank, work, reservation_cb)?;
}

Ok(())
Expand All @@ -108,11 +112,13 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
&self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError<Tx>> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
&work.max_ages,
reservation_cb,
);

self.metrics.update_for_consume(&output);
Expand Down Expand Up @@ -904,7 +910,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));

let pubkey1 = Pubkey::new_unique();

Expand Down Expand Up @@ -949,7 +955,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -998,7 +1004,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1050,7 +1056,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1125,7 +1131,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down
Loading
Loading