diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 44673c06f4d087..894ca0c7e19ccb 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -217,7 +217,8 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + streamer::StakedNodes, }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, @@ -256,6 +257,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 028a88f416e1fe..edb69171f62c24 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ use { solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -163,6 +163,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) @@ -183,6 +184,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 7986e7a3b728d7..e6270421a93dc5 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,7 +10,8 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate, }, std::{ @@ -79,6 +80,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -159,6 +161,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -217,6 +220,7 @@ mod tests { staked_nodes.clone(), 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -241,6 +245,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index ba567a7f3123fb..d0ecd1cd713ce3 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -53,9 +53,8 @@ use { }, }; -/// Limit to 500K PPS -const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; +const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100); const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); @@ -75,6 +74,9 @@ const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const STREAM_STOP_CODE_THROTTLING: u32 = 15; +/// Limit to 500K PPS +pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500; + // A sequence of bytes that is part of a packet // along with where in the packet it is struct PacketChunk { @@ -111,6 +113,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { @@ -136,6 +139,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, coalesce, @@ -153,6 +157,7 @@ async fn run_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, @@ -192,6 +197,7 @@ async fn run_server( staked_nodes.clone(), max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, )); @@ -328,6 +334,7 @@ fn handle_and_cache_new_connection( params: &NewConnectionHandlerParams, wait_for_chunk_timeout: Duration, max_unstaked_connections: usize, + max_streams_per_ms: u64, ) -> Result<(), ConnectionHandlerError> { if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( connection_table_l.peer_type, @@ -379,6 +386,7 @@ fn handle_and_cache_new_connection( peer_type, wait_for_chunk_timeout, max_unstaked_connections, + max_streams_per_ms, )); Ok(()) } else { @@ -407,6 +415,7 @@ async fn prune_unstaked_connections_and_add_new_connection( max_connections: usize, params: &NewConnectionHandlerParams, wait_for_chunk_timeout: Duration, + max_streams_per_ms: u64, ) -> Result<(), ConnectionHandlerError> { let stats = params.stats.clone(); if max_connections > 0 { @@ -420,6 +429,7 @@ async fn prune_unstaked_connections_and_add_new_connection( params, wait_for_chunk_timeout, max_connections, + max_streams_per_ms, ) } else { connection.close( @@ -484,6 +494,7 @@ async fn setup_connection( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, ) { @@ -502,8 +513,9 @@ async fn setup_connection( ), |(pubkey, stake, total_stake, max_stake, min_stake)| { // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle - // interval during which we allow max MAX_STREAMS_PER_100MS streams. - let min_stake_ratio = 1_f64 / MAX_STREAMS_PER_100MS as f64; + // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. + let min_stake_ratio = + 1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64; let stake_ratio = stake as f64 / total_stake as f64; let stake = if stake_ratio < min_stake_ratio { // If it is a staked connection with ultra low stake ratio, treat it as unstaked. @@ -541,6 +553,7 @@ async fn setup_connection( ¶ms, wait_for_chunk_timeout, max_unstaked_connections, + max_streams_per_ms, ) { stats .connection_added_from_staked_peer @@ -556,6 +569,7 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, + max_streams_per_ms, ) .await { @@ -577,6 +591,7 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, + max_streams_per_ms, ) .await { @@ -725,12 +740,14 @@ fn max_streams_for_connection_in_100ms( stake: u64, total_stake: u64, max_unstaked_connections: usize, + max_streams_per_ms: u64, ) -> u64 { + let max_streams_per_interval = max_streams_per_ms.saturating_mul(STREAM_THROTTLING_INTERVAL_MS); let max_unstaked_streams_per_100ms = if max_unstaked_connections == 0 { 0 } else { Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_100MS) + .apply_to(max_streams_per_interval) .saturating_div(MAX_UNSTAKED_CONNECTIONS as u64) }; @@ -744,8 +761,8 @@ fn max_streams_for_connection_in_100ms( if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 { max_unstaked_streams_per_100ms } else { - let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS); + let max_total_staked_streams: u64 = max_streams_per_interval + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_interval); std::cmp::max( min_staked_streams_per_100ms, ((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64, @@ -762,6 +779,7 @@ fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> } } +#[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -772,6 +790,7 @@ async fn handle_connection( peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, max_unstaked_connections: usize, + max_streams_per_ms: u64, ) { let stats = params.stats; debug!( @@ -787,6 +806,7 @@ async fn handle_connection( params.stake, params.total_stake, max_unstaked_connections, + max_streams_per_ms, ); let mut last_throttling_instant = tokio::time::Instant::now(); let mut streams_in_current_interval = 0; @@ -1307,6 +1327,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1743,6 +1764,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1774,6 +1796,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -2120,7 +2143,8 @@ pub mod test { ConnectionPeerType::Unstaked, 0, 10000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, ), 20 ); @@ -2131,7 +2155,8 @@ pub mod test { ConnectionPeerType::Unstaked, 10, 10000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, ), 20 ); @@ -2143,7 +2168,8 @@ pub mod test { ConnectionPeerType::Staked, 0, 10000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, ), 20 ); @@ -2155,7 +2181,8 @@ pub mod test { ConnectionPeerType::Staked, 15, 10000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS ), 60 ); @@ -2167,7 +2194,8 @@ pub mod test { ConnectionPeerType::Staked, 1000, 10000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS ), 4000 ); @@ -2179,7 +2207,8 @@ pub mod test { ConnectionPeerType::Staked, 1, 50000, - MAX_UNSTAKED_CONNECTIONS + MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS ), 21 ); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 1f8286c9b3d816..9f2bf421c9b536 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -432,6 +432,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> { @@ -449,6 +450,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, wait_for_chunk_timeout, coalesce, ) @@ -468,7 +470,9 @@ pub fn spawn_server( mod test { use { super::*, - crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + crate::nonblocking::quic::{ + test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, crossbeam_channel::unbounded, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, @@ -498,6 +502,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -554,6 +559,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -597,6 +603,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, )