Skip to content

Commit

Permalink
add fanout to the tpu-client-next
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Nov 5, 2024
1 parent 1fc6055 commit f62dee2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 32 deletions.
91 changes: 61 additions & 30 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// This enum defines to how many discovered leaders we will send transactions.
pub enum LeadersFanout {
/// Send transactions to all the leaders discovered by the `next_leaders`
/// call.
All,
/// Send transactions to the first selected number of leaders.
Selected(usize),
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
Expand Down Expand Up @@ -70,6 +79,9 @@ pub struct ConnectionWorkersSchedulerConfig {
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,

/// The number of leaders to send transactions to.
pub leaders_fanout: LeadersFanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -91,6 +103,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -115,39 +128,41 @@ impl ConnectionWorkersScheduler {
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}

tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
let (new_leaders, future_leaders) = split_leaders(&updated_leaders, &leaders_fanout);
for new_leader in new_leaders {
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}
};

// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch.clone()) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
}

// add future leaders to the cache to hide the latency of opening the
// connection.
for peer in future_leaders {
if !workers.contains(peer) {
let worker = Self::spawn_worker(
Expand Down Expand Up @@ -211,3 +226,19 @@ impl ConnectionWorkersScheduler {
WorkerInfo::new(txs_sender, handle, cancel)
}
}

/// Splits the input vector of leaders into two parts based on the `fanout` configuration:
/// * the first vector contains the leaders to which transactions will be sent.
/// * the second vector contains the remaining leaders, used to warm up connections.
fn split_leaders<'a>(
leaders: &'a [SocketAddr],
fanout: &'a LeadersFanout,
) -> (Vec<&'a SocketAddr>, Vec<&'a SocketAddr>) {
match fanout {
LeadersFanout::All => (leaders.iter().collect(), Vec::new()), // All elements go to the first vector
LeadersFanout::Selected(count) => {
let (selected, remaining) = leaders.split_at((*count).min(leaders.len())); // Split at the specified count or max length
(selected.iter().collect(), remaining.iter().collect())
}
}
}
6 changes: 4 additions & 2 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::ConnectionWorkersSchedulerConfig,
leader_updater::create_leader_updater, transaction_batch::TransactionBatch,
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, LeadersFanout::Selected},
leader_updater::create_leader_updater,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats,
SendTransactionStatsPerAddr,
},
Expand Down Expand Up @@ -49,6 +50,7 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
worker_channel_size: 2,
max_reconnect_attempts: 4,
lookahead_slots: 1,
leaders_fanout: Selected(1),
}
}

Expand Down

0 comments on commit f62dee2

Please sign in to comment.