Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fanout to tpu-client-next #3478

Merged
merged 10 commits into from
Nov 7, 2024
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ solana-test-validator = { path = "test-validator", version = "=2.2.0" }
solana-thin-client = { path = "thin-client", version = "=2.2.0" }
solana-transaction-error = { path = "sdk/transaction-error", version = "=2.2.0" }
solana-tpu-client = { path = "tpu-client", version = "=2.2.0", default-features = false }
solana-tpu-client-next = { path = "tpu-client-next", version = "=2.2.0" }
solana-transaction-status = { path = "transaction-status", version = "=2.2.0" }
solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.2.0" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.2.0" }
Expand Down
26 changes: 13 additions & 13 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS},
timing::timestamp,
},
std::net::SocketAddr,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
},
tokio::{
sync::mpsc,
time::{sleep, Duration},
Expand Down Expand Up @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker {
connection: ConnectionState,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: SendTransactionStats,
send_txs_stats: Arc<SendTransactionStats>,
cancel: CancellationToken,
}

Expand All @@ -93,6 +96,7 @@ impl ConnectionWorker {
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
send_txs_stats: Arc<SendTransactionStats>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();

Expand All @@ -103,7 +107,7 @@ impl ConnectionWorker {
connection: ConnectionState::NotSetup,
skip_check_transaction_age,
max_reconnect_attempts,
send_txs_stats: SendTransactionStats::default(),
send_txs_stats,
cancel: cancel.clone(),
};

Expand Down Expand Up @@ -155,11 +159,6 @@ impl ConnectionWorker {
}
}

/// Retrieves the statistics for transactions sent by this worker.
pub fn transaction_stats(&self) -> &SendTransactionStats {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
Expand All @@ -183,11 +182,12 @@ impl ConnectionWorker {

if let Err(error) = result {
trace!("Failed to send transaction over stream with error: {error}.");
record_error(error, &mut self.send_txs_stats);
record_error(error, &self.send_txs_stats);
self.connection = ConnectionState::Retry(0);
} else {
self.send_txs_stats.successfully_sent =
self.send_txs_stats.successfully_sent.saturating_add(1);
self.send_txs_stats
.successfully_sent
.fetch_add(1, Ordering::Relaxed);
}
}
measure_send.stop();
Expand Down Expand Up @@ -221,14 +221,14 @@ impl ConnectionWorker {
}
Err(err) => {
warn!("Connection error {}: {}", self.peer, err);
record_error(err.into(), &mut self.send_txs_stats);
record_error(err.into(), &self.send_txs_stats);
self.connection =
ConnectionState::Retry(max_retries_attempt.saturating_add(1));
}
}
}
Err(connecting_error) => {
record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
record_error(connecting_error.clone().into(), &self.send_txs_stats);
match connecting_error {
ConnectError::EndpointStopping => {
debug!("Endpoint stopping, exit connection worker.");
Expand Down
122 changes: 80 additions & 42 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use {
create_client_config, create_client_endpoint, QuicClientCertificate, QuicError,
},
transaction_batch::TransactionBatch,
workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError},
workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError},
SendTransactionStats,
},
log::*,
quinn::Endpoint,
Expand Down Expand Up @@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// [`Fanout`] is a configuration struct that specifies how many leaders should
/// be targeted when sending transactions and connecting.
///
/// Note, that the unit is number of leaders per
/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is
/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per
/// consecutive leader slots are [L1, L1, L2], so there are 3 of them.
///
/// The idea of having a separate `connect` parameter is to create a set of
/// nodes to connect to in advance in order to hide the latency of opening new
/// connection. Hence, `connect` must be greater or equal to `send`
pub struct Fanout {
/// The number of leaders to target for sending transactions.
pub send: usize,

/// The number of leaders to target for establishing connections.
pub connect: usize,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
Expand Down Expand Up @@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig {
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,
/// Configures the number of leaders to connect to and send transactions to.
pub leaders_fanout: Fanout,
}

impl ConnectionWorkersScheduler {
Expand All @@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
leaders_fanout,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
Expand All @@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
debug!("No existing workers for {new_leader:?}, starting a new one.");
let worker = Self::spawn_worker(
&endpoint,
new_leader,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
);
workers.push(*new_leader, worker).await;
}

tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
},
() = cancel.cancelled() => {
debug!("Cancelled: Shutting down");
break;
}
};
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);

// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
for peer in future_leaders {
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
Comment on lines +139 to +140

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that we call it send and connect portions elsewhere, but here they are called fanout and connection portions.

Maybe it would be more consistent to call it send_leaders and connect_leaders here as well?

Suggested change
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
let (send_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);

// add future leaders to the cache to hide the latency of opening
// the connection.
for peer in connect_leaders {
if !workers.contains(peer) {
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

This clone() should be unnecessary - I do not see stats used in this block anymore.

Suggested change
stats.clone(),
stats,

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird that it passed clippy

);
workers.push(*peer, worker).await;
maybe_shutdown_worker(workers.push(*peer, worker));
}
}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think that this can ever happen? I'd remove the code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if a connection to the leader is dropped and the worker is stopped.
This arm in the send_err match case below:

                    Err(WorkersCacheError::ReceiverDropped) => {
                        // Remove the worker from the cache, if the peer has disconnected.
                        maybe_shutdown_worker(workers.pop(*new_leader));
                    }

It is possible for the fanout_leaders to contain duplicates.
The duplicate would not be able to get a matching worker.

Comment on lines +158 to +159

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the fanout to send rename above, the new_leader name is a bit confusing to me here.
I would expect the new_leader name to imply we are going to open a connection to this leader or start a worker for it.

But we actually start workers in the block above.

Maybe call it send_to instead?
Or some other name that indicates that this is only a destination for the next transaction batch.
It could as well be the same leader as in the previous slot group1.

Suggested change
for new_leader in fanout_leaders {
if !workers.contains(new_leader) {
for send_to in send_leaders {
if !workers.contains(send_to) {

Footnotes

  1. Is there a name for a sequence of NUM_CONSECUTIVE_LEADER_SLOTS slots?

warn!("No existing worker for {new_leader:?}, skip sending to this leader.");
continue;
}

let send_res =
workers.try_send_transactions_to_address(new_leader, transaction_batch.clone());
match send_res {
Ok(()) => (),
Err(WorkersCacheError::ShutdownError) => {
debug!("Connection to {new_leader} was closed, worker cache shutdown");
}
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
Err(err) => {
warn!("Connection to {new_leader} was closed, worker error: {err}");
// If we has failed to send batch, it will be dropped.
}
}
}
}
Expand All @@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(workers.transaction_stats().clone())
Ok(send_stats_per_addr)
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
Expand All @@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler {
worker_channel_size: usize,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
stats: Arc<SendTransactionStats>,
) -> WorkerInfo {
let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size);
let endpoint = endpoint.clone();
Expand All @@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler {
txs_receiver,
skip_check_transaction_age,
max_reconnect_attempts,
stats,
);
let handle = tokio::spawn(async move {
worker.run().await;
worker.transaction_stats().clone()
});

WorkerInfo::new(txs_sender, handle, cancel)
}
}

/// Splits `leaders` into two slices based on the `fanout` configuration:
/// * the first slice contains the leaders to which transactions will be sent,
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
Comment on lines +236 to +237

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

Suggested change
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
/// * the second slice contains the leaders, used to warm up connections. This
/// slice includes the first set.

fn split_leaders<'leaders>(
leaders: &'leaders [SocketAddr],
fanout: &Fanout,
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
let Fanout { send, connect } = fanout;
assert!(send <= connect);
let send_count = (*send).min(leaders.len());
let connect_count = (*connect).min(leaders.len());

let send_slice = &leaders[..send_count];
let connect_slice = &leaders[..connect_count];

(send_slice, connect_slice)
}
15 changes: 11 additions & 4 deletions tpu-client-next/src/leader_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
log::*,
solana_connection_cache::connection_cache::Protocol,
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS,
solana_tpu_client::nonblocking::tpu_client::LeaderTpuService,
std::{
fmt,
Expand All @@ -22,26 +23,30 @@ use {
Arc,
},
},
thiserror::Error,
};

/// [`LeaderUpdater`] trait abstracts out functionality required for the
/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to
/// identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
/// Returns next unique leaders for the next `lookahead_slots` starting from
/// Returns next leaders for the next `lookahead_leaders` starting from
/// current estimated slot.
///
/// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition.
///
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>;
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr>;

/// Stop [`LeaderUpdater`] and releases all associated resources.
async fn stop(&mut self);
}

/// Error type for [`LeaderUpdater`].
#[derive(Error, PartialEq)]
pub struct LeaderUpdaterError;

impl fmt::Display for LeaderUpdaterError {
Expand Down Expand Up @@ -98,7 +103,9 @@ struct LeaderUpdaterService {

#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec<SocketAddr> {
let lookahead_slots =
(lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS);
self.leader_tpu_service.leader_tpu_sockets(lookahead_slots)
}

Expand All @@ -116,7 +123,7 @@ struct PinnedLeaderUpdater {

#[async_trait]
impl LeaderUpdater for PinnedLeaderUpdater {
fn next_leaders(&self, _lookahead_slots: u64) -> Vec<SocketAddr> {
fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec<SocketAddr> {
self.address.clone()
}

Expand Down
Loading
Loading