Skip to content

Commit

Permalink
Banking Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Mar 29, 2023
1 parent b5e1b52 commit f324eef
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 17 deletions.
4 changes: 2 additions & 2 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ProcessTransactionBatchOutput {
cost_model_throttled_transactions_count: usize,
// Amount of time spent running the cost model
cost_model_us: u64,
execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput,
pub(crate) execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput,
}

pub struct ExecuteAndCommitTransactionsOutput {
Expand All @@ -57,7 +57,7 @@ pub struct ExecuteAndCommitTransactionsOutput {
executed_with_successful_result_count: usize,
// Transactions that either were not executed, or were executed and failed to be committed due
// to the block ending.
retryable_transaction_indexes: Vec<usize>,
pub(crate) retryable_transaction_indexes: Vec<usize>,
// A result that indicates whether transactions were successfully
// committed into the Poh stream.
commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
Expand Down
16 changes: 1 addition & 15 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Forwarder {
/// Forwards all valid, unprocessed packets in the iterator, up to a rate limit.
/// Returns whether forwarding succeeded, the number of attempted forwarded packets
/// if any, the time spent forwarding, and the leader pubkey if any.
fn forward_packets<'a>(
pub(crate) fn forward_packets<'a>(
&self,
forward_option: &ForwardOption,
forwardable_packets: impl Iterator<Item = &'a Packet>,
Expand Down Expand Up @@ -239,20 +239,6 @@ impl Forwarder {
});
}

/// Get the pubkey and socket address for the leader to forward to
fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> {
match forward_option {
ForwardOption::NotForward => None,
ForwardOption::ForwardTransaction => {
next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder)
}

ForwardOption::ForwardTpuVote => {
next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder)
}
}
}

fn forward(
&self,
forward_option: &ForwardOption,
Expand Down
179 changes: 179 additions & 0 deletions core/src/banking_stage/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use {
super::{
consumer::Consumer,
forwarder::Forwarder,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, FinishedForwardWork, ForwardWork},
ForwardOption,
},
crate::immutable_deserialized_packet::ImmutableDeserializedPacket,
crossbeam_channel::{select, Receiver, RecvError, SendError, Sender},
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_runtime::bank::Bank,
solana_sdk::transaction::SanitizedTransaction,
std::{sync::Arc, time::Duration},
thiserror::Error,
};

#[derive(Debug, Error)]
pub enum WorkerError {
#[error("Failed to receive work from scheduler: {0}")]
Recv(RecvError),
#[error("Failed to send finalized consume work to scheduler: {0}")]
ConsumedSend(SendError<FinishedConsumeWork>),
#[error("Failed to send finalized forward work to scheduler: {0}")]
ForwardedSend(SendError<FinishedForwardWork>),
}

impl From<RecvError> for WorkerError {
fn from(err: RecvError) -> Self {
Self::Recv(err)
}
}

impl From<SendError<FinishedConsumeWork>> for WorkerError {
fn from(err: SendError<FinishedConsumeWork>) -> Self {
Self::ConsumedSend(err)
}
}

impl From<SendError<FinishedForwardWork>> for WorkerError {
fn from(err: SendError<FinishedForwardWork>) -> Self {
Self::ForwardedSend(err)
}
}

pub(crate) struct Worker {
consume_receiver: Receiver<ConsumeWork>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork>,

forward_receiver: Receiver<ForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder,
forwarded_sender: Sender<FinishedForwardWork>,

leader_bank_notifier: Arc<LeaderBankNotifier>,
}

impl Worker {
pub fn new(
consume_receiver: Receiver<ConsumeWork>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork>,
forward_receiver: Receiver<ForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder,
forwarded_sender: Sender<FinishedForwardWork>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
) -> Self {
Self {
consume_receiver,
consumer,
consumed_sender,
forward_receiver,
forward_option,
forwarder,
forwarded_sender,
leader_bank_notifier,
}
}

pub fn run(self) -> Result<(), WorkerError> {
loop {
select! {
recv(self.consume_receiver) -> consume_work => {
self.consume_loop(consume_work?)?;
},
recv(self.forward_receiver) -> forward_work => {
self.forward_loop(forward_work?)?;
},
}
}
}

fn consume_loop(&self, consume_work: ConsumeWork) -> Result<(), WorkerError> {
let Some(mut bank) = self.get_consume_bank() else {
return self.retry_drain(consume_work);
};

for work in std::iter::once(consume_work).chain(self.consume_receiver.try_iter()) {
if bank.is_complete() {
if let Some(new_bank) = self.get_consume_bank() {
bank = new_bank;
} else {
return self.retry_drain(work);
}
}
self.consume(&bank, work)?;
}

Ok(())
}

/// Consume a single batch.
fn consume(&self, bank: &Arc<Bank>, consume_work: ConsumeWork) -> Result<(), WorkerError> {
let summary =
self.consumer
.process_and_record_transactions(bank, &consume_work.transactions, 0);
self.consumed_sender.send(FinishedConsumeWork {
work: consume_work,
retryable_indexes: summary
.execute_and_commit_transactions_output
.retryable_transaction_indexes,
})?;
Ok(())
}

/// Try to get a bank for consuming.
fn get_consume_bank(&self) -> Option<Arc<Bank>> {
self.leader_bank_notifier
.get_or_wait_for_in_progress(Duration::from_millis(50))
.upgrade()
}

/// Retry current batche and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork) -> Result<(), WorkerError> {
for work in std::iter::once(work).chain(self.consume_receiver.try_iter()) {
self.retry(work)?;
}
Ok(())
}

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork) -> Result<(), WorkerError> {
let retryable_indexes = (0..work.transactions.len()).collect();
self.consumed_sender.send(FinishedConsumeWork {
work,
retryable_indexes,
})?;
Ok(())
}

fn forward_loop(&self, forward_work: ForwardWork) -> Result<(), WorkerError> {
for work in std::iter::once(forward_work).chain(self.forward_receiver.try_iter()) {
let (res, _num_packets, _forward_us, _leader_pubkey) = self.forwarder.forward_packets(
&self.forward_option,
work.packets.iter().map(|p| p.original_packet()),
);
if res.is_ok() {
self.forwarded_sender.send(FinishedForwardWork {
work,
successful: true,
})?;
} else {
return self.failed_forward_drain(work);
}
}
Ok(())
}

fn failed_forward_drain(&self, work: ForwardWork) -> Result<(), WorkerError> {
for work in std::iter::once(work).chain(self.forward_receiver.try_iter()) {
self.forwarded_sender.send(FinishedForwardWork {
work,
successful: false,
})?;
}
Ok(())
}
}

0 comments on commit f324eef

Please sign in to comment.