diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index ee5317088178ab..cb543e6044993a 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -43,7 +43,7 @@ pub struct ProcessTransactionBatchOutput { cost_model_throttled_transactions_count: usize, // Amount of time spent running the cost model cost_model_us: u64, - execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, + pub(crate) execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, } pub struct ExecuteAndCommitTransactionsOutput { @@ -57,7 +57,7 @@ pub struct ExecuteAndCommitTransactionsOutput { executed_with_successful_result_count: usize, // Transactions that either were not executed, or were executed and failed to be committed due // to the block ending. - retryable_transaction_indexes: Vec, + pub(crate) retryable_transaction_indexes: Vec, // A result that indicates whether transactions were successfully // committed into the Poh stream. commit_transactions_result: Result, PohRecorderError>, diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 3866ae145cf128..308445e22b9530 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -138,7 +138,7 @@ impl Forwarder { /// Forwards all valid, unprocessed packets in the iterator, up to a rate limit. /// Returns whether forwarding succeeded, the number of attempted forwarded packets /// if any, the time spent forwarding, and the leader pubkey if any. - fn forward_packets<'a>( + pub(crate) fn forward_packets<'a>( &self, forward_option: &ForwardOption, forwardable_packets: impl Iterator, @@ -239,20 +239,6 @@ impl Forwarder { }); } - /// Get the pubkey and socket address for the leader to forward to - fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> { - match forward_option { - ForwardOption::NotForward => None, - ForwardOption::ForwardTransaction => { - next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder) - } - - ForwardOption::ForwardTpuVote => { - next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) - } - } - } - fn forward( &self, forward_option: &ForwardOption, diff --git a/core/src/banking_stage/worker.rs b/core/src/banking_stage/worker.rs new file mode 100644 index 00000000000000..756541e5cdc90d --- /dev/null +++ b/core/src/banking_stage/worker.rs @@ -0,0 +1,179 @@ +use { + super::{ + consumer::Consumer, + forwarder::Forwarder, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, FinishedForwardWork, ForwardWork}, + ForwardOption, + }, + crate::immutable_deserialized_packet::ImmutableDeserializedPacket, + crossbeam_channel::{select, Receiver, RecvError, SendError, Sender}, + solana_poh::leader_bank_notifier::LeaderBankNotifier, + solana_runtime::bank::Bank, + solana_sdk::transaction::SanitizedTransaction, + std::{sync::Arc, time::Duration}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum WorkerError { + #[error("Failed to receive work from scheduler: {0}")] + Recv(RecvError), + #[error("Failed to send finalized consume work to scheduler: {0}")] + ConsumedSend(SendError), + #[error("Failed to send finalized forward work to scheduler: {0}")] + ForwardedSend(SendError), +} + +impl From for WorkerError { + fn from(err: RecvError) -> Self { + Self::Recv(err) + } +} + +impl From> for WorkerError { + fn from(err: SendError) -> Self { + Self::ConsumedSend(err) + } +} + +impl From> for WorkerError { + fn from(err: SendError) -> Self { + Self::ForwardedSend(err) + } +} + +pub(crate) struct Worker { + consume_receiver: Receiver, + consumer: Consumer, + consumed_sender: Sender, + + forward_receiver: Receiver, + forward_option: ForwardOption, + forwarder: Forwarder, + forwarded_sender: Sender, + + leader_bank_notifier: Arc, +} + +impl Worker { + pub fn new( + consume_receiver: Receiver, + consumer: Consumer, + consumed_sender: Sender, + forward_receiver: Receiver, + forward_option: ForwardOption, + forwarder: Forwarder, + forwarded_sender: Sender, + leader_bank_notifier: Arc, + ) -> Self { + Self { + consume_receiver, + consumer, + consumed_sender, + forward_receiver, + forward_option, + forwarder, + forwarded_sender, + leader_bank_notifier, + } + } + + pub fn run(self) -> Result<(), WorkerError> { + loop { + select! { + recv(self.consume_receiver) -> consume_work => { + self.consume_loop(consume_work?)?; + }, + recv(self.forward_receiver) -> forward_work => { + self.forward_loop(forward_work?)?; + }, + } + } + } + + fn consume_loop(&self, consume_work: ConsumeWork) -> Result<(), WorkerError> { + let Some(mut bank) = self.get_consume_bank() else { + return self.retry_drain(consume_work); + }; + + for work in std::iter::once(consume_work).chain(self.consume_receiver.try_iter()) { + if bank.is_complete() { + if let Some(new_bank) = self.get_consume_bank() { + bank = new_bank; + } else { + return self.retry_drain(work); + } + } + self.consume(&bank, work)?; + } + + Ok(()) + } + + /// Consume a single batch. + fn consume(&self, bank: &Arc, consume_work: ConsumeWork) -> Result<(), WorkerError> { + let summary = + self.consumer + .process_and_record_transactions(bank, &consume_work.transactions, 0); + self.consumed_sender.send(FinishedConsumeWork { + work: consume_work, + retryable_indexes: summary + .execute_and_commit_transactions_output + .retryable_transaction_indexes, + })?; + Ok(()) + } + + /// Try to get a bank for consuming. + fn get_consume_bank(&self) -> Option> { + self.leader_bank_notifier + .get_or_wait_for_in_progress(Duration::from_millis(50)) + .upgrade() + } + + /// Retry current batche and all outstanding batches. + fn retry_drain(&self, work: ConsumeWork) -> Result<(), WorkerError> { + for work in std::iter::once(work).chain(self.consume_receiver.try_iter()) { + self.retry(work)?; + } + Ok(()) + } + + /// Send transactions back to scheduler as retryable. + fn retry(&self, work: ConsumeWork) -> Result<(), WorkerError> { + let retryable_indexes = (0..work.transactions.len()).collect(); + self.consumed_sender.send(FinishedConsumeWork { + work, + retryable_indexes, + })?; + Ok(()) + } + + fn forward_loop(&self, forward_work: ForwardWork) -> Result<(), WorkerError> { + for work in std::iter::once(forward_work).chain(self.forward_receiver.try_iter()) { + let (res, _num_packets, _forward_us, _leader_pubkey) = self.forwarder.forward_packets( + &self.forward_option, + work.packets.iter().map(|p| p.original_packet()), + ); + if res.is_ok() { + self.forwarded_sender.send(FinishedForwardWork { + work, + successful: true, + })?; + } else { + return self.failed_forward_drain(work); + } + } + Ok(()) + } + + fn failed_forward_drain(&self, work: ForwardWork) -> Result<(), WorkerError> { + for work in std::iter::once(work).chain(self.forward_receiver.try_iter()) { + self.forwarded_sender.send(FinishedForwardWork { + work, + successful: false, + })?; + } + Ok(()) + } +}