From b94e0a6f8db2cb5a43e1c6ef41550de636c9a9af Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:42:24 -0700 Subject: [PATCH 1/4] Addressed merge conflicts --- client/src/connection_cache.rs | 4 ++- core/src/tpu.rs | 8 ++++++ quic-client/tests/quic_client.rs | 10 ++++++-- streamer/src/nonblocking/quic.rs | 18 +++++++++++--- streamer/src/nonblocking/stream_throttle.rs | 27 +++++++++++++++------ streamer/src/quic.rs | 9 ++++++- 6 files changed, 61 insertions(+), 15 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a94bc7cd3d8ca8..74e1c8d344c618 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -227,7 +227,8 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, streamer::StakedNodes, }, std::{ @@ -270,6 +271,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 640caf64544d45..2baab4095ef3a6 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -148,6 +148,9 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); + // 25K PPS + const MAX_STREAMS_PER_MS_TPU: u64 = 25; + let SpawnServerResult { endpoint: _, thread: tpu_quic_t, @@ -163,11 +166,15 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + MAX_STREAMS_PER_MS_TPU, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) .unwrap(); + // 5K PPS + const MAX_STREAMS_PER_MS_TPU_FORWARD: u64 = 5; + let SpawnServerResult { endpoint: _, thread: tpu_forwards_quic_t, @@ -183,6 +190,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions + MAX_STREAMS_PER_MS_TPU_FORWARD, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 0237fc21d098dc..34683b5fee225f 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,8 +10,10 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult, - streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + quic::SpawnServerResult, + streamer::StakedNodes, + tls_certificates::new_dummy_x509_certificate, }, std::{ net::{SocketAddr, UdpSocket}, @@ -82,6 +84,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -161,6 +164,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -223,6 +227,7 @@ mod tests { staked_nodes.clone(), 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -251,6 +256,7 @@ mod tests { staked_nodes, 10, 10, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index f55776873d4e23..e79b4f0342ea50 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,8 +1,8 @@ use { crate::{ nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS, - STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS, + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + STREAM_THROTTLING_INTERVAL_MS, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -76,6 +76,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; +/// Limit to 250K PPS +pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; + // A sequence of bytes that is part of a packet // along with where in the packet it is struct PacketChunk { @@ -124,6 +127,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { @@ -147,6 +151,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, coalesce, @@ -164,6 +169,7 @@ async fn run_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, @@ -176,6 +182,7 @@ async fn run_server( let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( stats.clone(), max_unstaked_connections, + max_streams_per_ms, )); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); @@ -206,6 +213,7 @@ async fn run_server( staked_nodes.clone(), max_staked_connections, max_unstaked_connections, + max_streams_per_ms, stats.clone(), wait_for_chunk_timeout, stream_load_ema.clone(), @@ -484,6 +492,7 @@ async fn setup_connection( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, stats: Arc, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, @@ -505,7 +514,7 @@ async fn setup_connection( // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. let min_stake_ratio = - 1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64; + 1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64; let stake_ratio = stake as f64 / total_stake as f64; let peer_type = if stake_ratio < min_stake_ratio { // If it is a staked connection with ultra low stake ratio, treat it as unstaked. @@ -1327,6 +1336,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1762,6 +1772,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1791,6 +1802,7 @@ pub mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index e3da2be90ddfdc..801f9cd9f8d932 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -11,8 +11,6 @@ use { }, }; -/// Limit to 250K PPS -pub const MAX_STREAMS_PER_MS: u64 = 250; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; @@ -35,14 +33,19 @@ pub(crate) struct StakedStreamLoadEMA { } impl StakedStreamLoadEMA { - pub(crate) fn new(stats: Arc, max_unstaked_connections: usize) -> Self { + pub(crate) fn new( + stats: Arc, + max_unstaked_connections: usize, + max_streams_per_ms: u64, + ) -> Self { + let max_streams_per_ms = max_streams_per_ms; let allow_unstaked_streams = max_unstaked_connections > 0; let max_staked_load_in_ema_window = if allow_unstaked_streams { - (MAX_STREAMS_PER_MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) + (max_streams_per_ms + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms)) * EMA_WINDOW_MS } else { - MAX_STREAMS_PER_MS * EMA_WINDOW_MS + max_streams_per_ms * EMA_WINDOW_MS }; let max_num_unstaked_connections = @@ -56,7 +59,7 @@ impl StakedStreamLoadEMA { let max_unstaked_load_in_throttling_window = if allow_unstaked_streams { Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) .saturating_div(max_num_unstaked_connections) } else { 0 @@ -228,7 +231,9 @@ pub mod test { use { super::*, crate::{ - nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + nonblocking::{ + quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, + }, quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, }, std::{ @@ -242,6 +247,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( @@ -258,6 +264,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -349,6 +356,7 @@ pub mod test { let load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), 0, + DEFAULT_MAX_STREAMS_PER_MS, )); // EMA load is used for staked connections to calculate max number of allowed streams. @@ -436,6 +444,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -464,6 +473,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval @@ -483,6 +493,7 @@ pub mod test { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( Arc::new(StreamStats::default()), MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, )); stream_load_ema .load_in_recent_interval diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 9b68ab1eea01ef..ad6bace1b99a30 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -508,6 +508,7 @@ pub fn spawn_server( staked_nodes: Arc>, max_staked_connections: usize, max_unstaked_connections: usize, + max_streams_per_ms: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -524,6 +525,7 @@ pub fn spawn_server( staked_nodes, max_staked_connections, max_unstaked_connections, + max_streams_per_ms, wait_for_chunk_timeout, coalesce, ) @@ -550,7 +552,9 @@ pub fn spawn_server( mod test { use { super::*, - crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + crate::nonblocking::quic::{ + test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, crossbeam_channel::unbounded, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, @@ -583,6 +587,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -642,6 +647,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -688,6 +694,7 @@ mod test { staked_nodes, MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) From 79162ae2e532bc3ccd48a9476f07657ec67c531b Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 10 Apr 2024 01:53:38 -0700 Subject: [PATCH 2/4] Fixed a clippy issue --- streamer/src/nonblocking/stream_throttle.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 801f9cd9f8d932..0497c6993d12e2 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -38,7 +38,6 @@ impl StakedStreamLoadEMA { max_unstaked_connections: usize, max_streams_per_ms: u64, ) -> Self { - let max_streams_per_ms = max_streams_per_ms; let allow_unstaked_streams = max_unstaked_connections > 0; let max_staked_load_in_ema_window = if allow_unstaked_streams { (max_streams_per_ms From bdf8a484d9fb5cf1468c240b5affb7fe943a8b13 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 11 Apr 2024 10:36:01 -0700 Subject: [PATCH 3/4] Revert PPS change -- will be addressed later --- core/src/tpu.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 2baab4095ef3a6..0bd61d7b77e112 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ use { solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ - nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, @@ -148,8 +148,7 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - // 25K PPS - const MAX_STREAMS_PER_MS_TPU: u64 = 25; + const MAX_STREAMS_PER_MS_TPU: u64 = DEFAULT_MAX_STREAMS_PER_MS; let SpawnServerResult { endpoint: _, @@ -172,8 +171,7 @@ impl Tpu { ) .unwrap(); - // 5K PPS - const MAX_STREAMS_PER_MS_TPU_FORWARD: u64 = 5; + const MAX_STREAMS_PER_MS_TPU_FORWARD: u64 = DEFAULT_MAX_STREAMS_PER_MS; let SpawnServerResult { endpoint: _, From 6994d073ea9216921d72d7861a568b2317a23b6d Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 15 Apr 2024 11:11:13 -0700 Subject: [PATCH 4/4] removed some constants --- core/src/tpu.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0bd61d7b77e112..b594e04e5d18ad 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -148,8 +148,6 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); - const MAX_STREAMS_PER_MS_TPU: u64 = DEFAULT_MAX_STREAMS_PER_MS; - let SpawnServerResult { endpoint: _, thread: tpu_quic_t, @@ -165,14 +163,12 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, - MAX_STREAMS_PER_MS_TPU, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) .unwrap(); - const MAX_STREAMS_PER_MS_TPU_FORWARD: u64 = DEFAULT_MAX_STREAMS_PER_MS; - let SpawnServerResult { endpoint: _, thread: tpu_forwards_quic_t, @@ -188,7 +184,7 @@ impl Tpu { staked_nodes.clone(), MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions - MAX_STREAMS_PER_MS_TPU_FORWARD, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, )