Skip to content

Commit

Permalink
Parameterize max streams per ms (anza-xyz#707)
Browse files Browse the repository at this point in the history
Make PPS a parameter instead of the hard coded
  • Loading branch information
lijunwangs authored and michaelschem committed Apr 20, 2024
1 parent 92cc459 commit 87d6fc1
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
4 changes: 3 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -270,6 +271,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -163,6 +163,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -183,6 +184,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
10 changes: 8 additions & 2 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
std::{
net::{SocketAddr, UdpSocket},
Expand Down Expand Up @@ -82,6 +84,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -161,6 +164,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -223,6 +227,7 @@ mod tests {
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -251,6 +256,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
18 changes: 15 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS,
STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
Expand Down Expand Up @@ -76,6 +76,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
Expand Down Expand Up @@ -124,6 +127,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
Expand All @@ -147,6 +151,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
Expand All @@ -164,6 +169,7 @@ async fn run_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
Expand All @@ -176,6 +182,7 @@ async fn run_server(
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
stats.clone(),
max_unstaked_connections,
max_streams_per_ms,
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Expand Down Expand Up @@ -206,6 +213,7 @@ async fn run_server(
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
Expand Down Expand Up @@ -484,6 +492,7 @@ async fn setup_connection(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
Expand All @@ -505,7 +514,7 @@ async fn setup_connection(
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64;
1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let peer_type = if stake_ratio < min_stake_ratio {
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
Expand Down Expand Up @@ -1327,6 +1336,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1762,6 +1772,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1791,6 +1802,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
26 changes: 18 additions & 8 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use {
},
};

/// Limit to 250K PPS
pub const MAX_STREAMS_PER_MS: u64 = 250;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
Expand All @@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA {
}

impl StakedStreamLoadEMA {
pub(crate) fn new(stats: Arc<StreamStats>, max_unstaked_connections: usize) -> Self {
pub(crate) fn new(
stats: Arc<StreamStats>,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Self {
let allow_unstaked_streams = max_unstaked_connections > 0;
let max_staked_load_in_ema_window = if allow_unstaked_streams {
(MAX_STREAMS_PER_MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
(max_streams_per_ms
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms))
* EMA_WINDOW_MS
} else {
MAX_STREAMS_PER_MS * EMA_WINDOW_MS
max_streams_per_ms * EMA_WINDOW_MS
};

let max_num_unstaked_connections =
Expand All @@ -56,7 +58,7 @@ impl StakedStreamLoadEMA {

let max_unstaked_load_in_throttling_window = if allow_unstaked_streams {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections)
} else {
0
Expand Down Expand Up @@ -228,7 +230,9 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
Expand All @@ -242,6 +246,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
Expand All @@ -258,6 +263,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -349,6 +355,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
0,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -436,6 +443,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down Expand Up @@ -464,6 +472,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand All @@ -483,6 +492,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<SpawnServerResult, QuicServerError> {
Expand All @@ -524,6 +525,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
wait_for_chunk_timeout,
coalesce,
)
Expand All @@ -550,7 +552,9 @@ pub fn spawn_server(
mod test {
use {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
crate::nonblocking::quic::{
test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE,
std::net::SocketAddr,
Expand Down Expand Up @@ -583,6 +587,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -642,6 +647,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -688,6 +694,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down

0 comments on commit 87d6fc1

Please sign in to comment.