diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 89c3deb2e61ea7..bd0c352397eb52 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,7 +1,7 @@ use { crate::{ nonblocking::stream_throttle::{ - self, ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -90,7 +90,7 @@ pub enum ConnectionPeerType { } impl ConnectionPeerType { - fn is_staked(&self) -> bool { + pub(crate) fn is_staked(&self) -> bool { matches!(self, ConnectionPeerType::Staked(_)) } } @@ -156,7 +156,10 @@ async fn run_server( let mut last_datapoint = Instant::now(); let unstaked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); - let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone())); + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( + max_unstaked_connections > 0, + stats.clone(), + )); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); let (sender, receiver) = async_unbounded(); @@ -718,25 +721,17 @@ async fn handle_connection( ); let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); - let mut max_streams_per_throttling_interval = - stream_throttle::max_streams_for_connection_in_throttling_duration( - params.peer_type, - params.total_stake, - stream_load_ema.clone(), - ); while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { - if let ConnectionPeerType::Staked(peer_stake) = params.peer_type { - max_streams_per_throttling_interval = stream_load_ema - .available_load_capacity_in_throttling_duration( - peer_stake, - params.total_stake, - ); - } + let max_streams_per_throttling_interval = stream_load_ema + .available_load_capacity_in_throttling_duration( + params.peer_type, + params.total_stake, + ); stream_counter.reset_throttling_params_if_needed(); if stream_counter.stream_count.load(Ordering::Relaxed) @@ -746,9 +741,7 @@ async fn handle_connection( let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - if params.peer_type.is_staked() { - stream_load_ema.increment_load(); - } + stream_load_ema.increment_load(params.peer_type); stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); @@ -797,9 +790,7 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); - if params.peer_type.is_staked() { - stream_load_ema.update_ema_if_needed(); - } + stream_load_ema.update_ema_if_needed(); }); } Err(e) => { diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index f7da3a17dcf178..aa5e53aa1b156b 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -21,22 +21,52 @@ const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; -const MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION: u64 = 8; +const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; pub(crate) struct StakedStreamLoadEMA { current_load_ema: AtomicU64, load_in_recent_interval: AtomicU64, last_update: RwLock, stats: Arc, + // Maximum number of streams for a staked connection in EMA window + // Note: EMA window can be different than stream throttling window. EMA is being calculated + // specifically for staked connections. Unstaked connections have fixed limit on + // stream load, which is tracked by `max_unstaked_load_in_throttling_window` field. + max_staked_load_in_ema_window: u64, + // Maximum number of streams for an unstaked connection in stream throttling window + max_unstaked_load_in_throttling_window: u64, } impl StakedStreamLoadEMA { - pub(crate) fn new(stats: Arc) -> Self { + pub(crate) fn new(allow_unstaked_streams: bool, stats: Arc) -> Self { + let max_staked_load_in_ema_window = if allow_unstaked_streams { + (MAX_STREAMS_PER_MS + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) + * EMA_WINDOW_MS + } else { + MAX_STREAMS_PER_MS * EMA_WINDOW_MS + }; + + let max_num_unstaked_connections = + u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { + error!( + "Failed to convert maximum number of unstaked connections {} to u64.", + MAX_UNSTAKED_CONNECTIONS + ); + 500 + }); + + let max_unstaked_load_in_throttling_window = Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) + .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) + .saturating_div(max_num_unstaked_connections); + Self { current_load_ema: AtomicU64::default(), load_in_recent_interval: AtomicU64::default(), last_update: RwLock::new(Instant::now()), stats, + max_staked_load_in_ema_window, + max_unstaked_load_in_throttling_window, } } @@ -106,76 +136,56 @@ impl StakedStreamLoadEMA { } } - pub(crate) fn increment_load(&self) { - self.load_in_recent_interval.fetch_add(1, Ordering::Relaxed); + pub(crate) fn increment_load(&self, peer_type: ConnectionPeerType) { + if peer_type.is_staked() { + self.load_in_recent_interval.fetch_add(1, Ordering::Relaxed); + } self.update_ema_if_needed(); } pub(crate) fn available_load_capacity_in_throttling_duration( &self, - stake: u64, + peer_type: ConnectionPeerType, total_stake: u64, ) -> u64 { - let ema_window_ms = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; - let max_load_in_ema_window = u128::from( - (MAX_STREAMS_PER_MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) - * ema_window_ms, - ); - - // If the current load is low, cap it to 25% of max_load. - let current_load = cmp::max( - u128::from(self.current_load_ema.load(Ordering::Relaxed)), - max_load_in_ema_window / 4, - ); - - // Formula is (max_load ^ 2 / current_load) * (stake / total_stake) - let capacity_in_ema_window = - (max_load_in_ema_window * max_load_in_ema_window * u128::from(stake)) - / (current_load * u128::from(total_stake)); - - let calculated_capacity = capacity_in_ema_window - * u128::from(STREAM_THROTTLING_INTERVAL_MS) - / u128::from(ema_window_ms); - let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| { - error!( - "Failed to convert stream capacity {} to u64. Using minimum load capacity", - calculated_capacity - ); - self.stats - .stream_load_capacity_overflow - .fetch_add(1, Ordering::Relaxed); - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION - }); - - cmp::max( - calculated_capacity, - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, - ) - } -} - -pub(crate) fn max_streams_for_connection_in_throttling_duration( - peer_type: ConnectionPeerType, - total_stake: u64, - ema_load: Arc, -) -> u64 { - match peer_type { - ConnectionPeerType::Unstaked => { - let max_num_connections = - u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { + match peer_type { + ConnectionPeerType::Unstaked => self.max_unstaked_load_in_throttling_window, + ConnectionPeerType::Staked(stake) => { + // If the current load is low, cap it to 25% of max_load. + let current_load = u128::from(cmp::max( + self.current_load_ema.load(Ordering::Relaxed), + self.max_staked_load_in_ema_window / 4, + )); + + // Formula is (max_load ^ 2 / current_load) * (stake / total_stake) + let capacity_in_ema_window = (u128::from(self.max_staked_load_in_ema_window) + * u128::from(self.max_staked_load_in_ema_window) + * u128::from(stake)) + / (current_load * u128::from(total_stake)); + + let calculated_capacity = capacity_in_ema_window + * u128::from(STREAM_THROTTLING_INTERVAL_MS) + / u128::from(EMA_WINDOW_MS); + let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| { error!( - "Failed to convert maximum number of unstaked connections {} to u64.", - MAX_UNSTAKED_CONNECTIONS + "Failed to convert stream capacity {} to u64. Using minimum load capacity", + calculated_capacity ); - 500 + self.stats + .stream_load_capacity_overflow + .fetch_add(1, Ordering::Relaxed); + self.max_unstaked_load_in_throttling_window + .saturating_add(1) }); - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) - .saturating_div(max_num_connections) - } - ConnectionPeerType::Staked(stake) => { - ema_load.available_load_capacity_in_throttling_duration(stake, total_stake) + + // 1 is added to `max_unstaked_load_in_throttling_window` to guarantee that staked + // clients get at least 1 more number of streams than unstaked connections. + cmp::max( + calculated_capacity, + self.max_unstaked_load_in_throttling_window + .saturating_add(1), + ) + } } } } @@ -215,14 +225,7 @@ impl ConnectionStreamCounter { pub mod test { use { super::*, - crate::{ - nonblocking::stream_throttle::{ - max_streams_for_connection_in_throttling_duration, - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, - STREAM_LOAD_EMA_INTERVAL_MS, - }, - quic::StreamStats, - }, + crate::{nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, quic::StreamStats}, std::{ sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, @@ -231,20 +234,26 @@ pub mod test { #[test] fn test_max_streams_for_unstaked_connection() { - let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + let load_ema = Arc::new(StakedStreamLoadEMA::new( + true, + Arc::new(StreamStats::default()), + )); // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Unstaked, 10000, - load_ema.clone(), ), 10 ); } + #[test] fn test_max_streams_for_staked_connection() { - let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + let load_ema = Arc::new(StakedStreamLoadEMA::new( + true, + Arc::new(StreamStats::default()), + )); // EMA load is used for staked connections to calculate max number of allowed streams. // EMA window = 5ms interval * 10 intervals = 50ms @@ -258,10 +267,9 @@ pub mod test { // ema_load = 10K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30 assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, - load_ema.clone(), ), 30 ); @@ -269,10 +277,9 @@ pub mod test { // ema_load = 10K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, - load_ema.clone(), ), 2000 ); @@ -281,10 +288,9 @@ pub mod test { // ema_load = 2.5K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120 assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, - load_ema.clone(), ), 120 ); @@ -292,10 +298,9 @@ pub mod test { // ema_load = 2.5K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000 assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, - load_ema.clone(), ), 8000 ); @@ -305,39 +310,128 @@ pub mod test { load_ema.current_load_ema.store(2000, Ordering::Relaxed); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, - load_ema.clone(), ), 120 ); // function = ((10K * 10K) / 25% of 10K) * stake / total_stake assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, - load_ema.clone(), ), 8000 ); // At 1/40000 stake weight, and minimum load, it should still allow - // MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams. + // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( - max_streams_for_connection_in_throttling_duration( + load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), 40000, - load_ema.clone(), ), - MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION + load_ema + .max_unstaked_load_in_throttling_window + .saturating_add(1) + ); + } + + #[test] + fn test_max_streams_for_staked_connection_with_no_unstaked_connections() { + let load_ema = Arc::new(StakedStreamLoadEMA::new( + false, + Arc::new(StreamStats::default()), + )); + + // EMA load is used for staked connections to calculate max number of allowed streams. + // EMA window = 5ms interval * 10 intervals = 50ms + // max streams per window = 250K streams/sec = 12.5K per 50ms + // max_streams in 50ms = ((12.5K * 12.5K) / ema_load) * stake / total_stake + // + // Stream throttling window is 100ms. So it'll double the amount of max streams. + // max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / ema_load) * stake / total_stake + + load_ema.current_load_ema.store(10000, Ordering::Relaxed); + // ema_load = 10K, stake = 15, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 10K) * 15 / 10K = 46.875 + assert!( + (46u64..=47).contains(&load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000 + )) + ); + + // ema_load = 10K, stake = 1K, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 10K) * 1K / 10K = 3125 + assert!((3124u64..=3125).contains( + &load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000 + ) + )); + + load_ema.current_load_ema.store(5000, Ordering::Relaxed); + // ema_load = 5K, stake = 15, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 5K) * 15 / 10K = 93.75 + assert!( + (92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000 + )) + ); + + // ema_load = 5K, stake = 1K, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 5K) * 1K / 10K = 6250 + assert!((6248u64..=6250).contains( + &load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000 + ) + )); + + // At 2000, the load is less than 25% of max_load (12.5K). + // Test that we cap it to 25%, yielding the same result as if load was 12.5K/4. + load_ema.current_load_ema.store(2000, Ordering::Relaxed); + // function = ((10K * 10K) / 25% of 12.5K) * stake / total_stake + assert_eq!( + load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(15), + 10000 + ), + 150 + ); + + // function = ((12.5K * 12.5K) / 25% of 12.5K) * stake / total_stake + assert_eq!( + load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000 + ), + 10000 + ); + + // At 1/40000 stake weight, and minimum load, it should still allow + // max_unstaked_load_in_throttling_window + 1 streams. + assert_eq!( + load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(1), + 40000 + ), + load_ema + .max_unstaked_load_in_throttling_window + .saturating_add(1) ); } #[test] fn test_update_ema() { - let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( + true, + Arc::new(StreamStats::default()), + )); stream_load_ema .load_in_recent_interval .store(2500, Ordering::Relaxed); @@ -362,7 +456,10 @@ pub mod test { #[test] fn test_update_ema_missing_interval() { - let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( + true, + Arc::new(StreamStats::default()), + )); stream_load_ema .load_in_recent_interval .store(2500, Ordering::Relaxed); @@ -378,7 +475,10 @@ pub mod test { #[test] fn test_update_ema_if_needed() { - let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( + true, + Arc::new(StreamStats::default()), + )); stream_load_ema .load_in_recent_interval .store(2500, Ordering::Relaxed);