Skip to content

Commit

Permalink
v1.17: Parameterize max streams per ms (backport of solana-labs#707) (s…
Browse files Browse the repository at this point in the history
…olana-labs#1105)

* Parameterize max streams per ms (solana-labs#707)

Make PPS a parameter instead of the hard coded


Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>
  • Loading branch information
mergify[bot] and lijunwangs authored Apr 30, 2024
1 parent a0020cf commit a7cfe9b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
4 changes: 3 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
streamer::StakedNodes,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -256,6 +257,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -163,6 +163,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -183,6 +184,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
7 changes: 6 additions & 1 deletion quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
std::{
Expand Down Expand Up @@ -79,6 +80,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -159,6 +161,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -217,6 +220,7 @@ mod tests {
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand All @@ -241,6 +245,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
55 changes: 42 additions & 13 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ use {
},
};

/// Limit to 500K PPS
const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100);
const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);
Expand All @@ -75,6 +74,9 @@ const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const STREAM_STOP_CODE_THROTTLING: u32 = 15;

/// Limit to 500K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
Expand Down Expand Up @@ -111,6 +113,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
Expand All @@ -136,6 +139,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
Expand All @@ -153,6 +157,7 @@ async fn run_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
Expand Down Expand Up @@ -192,6 +197,7 @@ async fn run_server(
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
));
Expand Down Expand Up @@ -328,6 +334,7 @@ fn handle_and_cache_new_connection(
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout: Duration,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Result<(), ConnectionHandlerError> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
connection_table_l.peer_type,
Expand Down Expand Up @@ -379,6 +386,7 @@ fn handle_and_cache_new_connection(
peer_type,
wait_for_chunk_timeout,
max_unstaked_connections,
max_streams_per_ms,
));
Ok(())
} else {
Expand Down Expand Up @@ -407,6 +415,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
max_connections: usize,
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout: Duration,
max_streams_per_ms: u64,
) -> Result<(), ConnectionHandlerError> {
let stats = params.stats.clone();
if max_connections > 0 {
Expand All @@ -420,6 +429,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
params,
wait_for_chunk_timeout,
max_connections,
max_streams_per_ms,
)
} else {
connection.close(
Expand Down Expand Up @@ -484,6 +494,7 @@ async fn setup_connection(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
) {
Expand All @@ -502,8 +513,9 @@ async fn setup_connection(
),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max MAX_STREAMS_PER_100MS streams.
let min_stake_ratio = 1_f64 / MAX_STREAMS_PER_100MS as f64;
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let stake = if stake_ratio < min_stake_ratio {
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
Expand Down Expand Up @@ -541,6 +553,7 @@ async fn setup_connection(
&params,
wait_for_chunk_timeout,
max_unstaked_connections,
max_streams_per_ms,
) {
stats
.connection_added_from_staked_peer
Expand All @@ -556,6 +569,7 @@ async fn setup_connection(
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
max_streams_per_ms,
)
.await
{
Expand All @@ -577,6 +591,7 @@ async fn setup_connection(
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
max_streams_per_ms,
)
.await
{
Expand Down Expand Up @@ -725,12 +740,14 @@ fn max_streams_for_connection_in_100ms(
stake: u64,
total_stake: u64,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> u64 {
let max_streams_per_interval = max_streams_per_ms.saturating_mul(STREAM_THROTTLING_INTERVAL_MS);
let max_unstaked_streams_per_100ms = if max_unstaked_connections == 0 {
0
} else {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_100MS)
.apply_to(max_streams_per_interval)
.saturating_div(MAX_UNSTAKED_CONNECTIONS as u64)
};

Expand All @@ -744,8 +761,8 @@ fn max_streams_for_connection_in_100ms(
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
max_unstaked_streams_per_100ms
} else {
let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS);
let max_total_staked_streams: u64 = max_streams_per_interval
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_interval);
std::cmp::max(
min_staked_streams_per_100ms,
((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64,
Expand All @@ -762,6 +779,7 @@ fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) ->
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
Expand All @@ -772,6 +790,7 @@ async fn handle_connection(
peer_type: ConnectionPeerType,
wait_for_chunk_timeout: Duration,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) {
let stats = params.stats;
debug!(
Expand All @@ -787,6 +806,7 @@ async fn handle_connection(
params.stake,
params.total_stake,
max_unstaked_connections,
max_streams_per_ms,
);
let mut last_throttling_instant = tokio::time::Instant::now();
let mut streams_in_current_interval = 0;
Expand Down Expand Up @@ -1307,6 +1327,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1743,6 +1764,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1774,6 +1796,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -2120,7 +2143,8 @@ pub mod test {
ConnectionPeerType::Unstaked,
0,
10000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
),
20
);
Expand All @@ -2131,7 +2155,8 @@ pub mod test {
ConnectionPeerType::Unstaked,
10,
10000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
),
20
);
Expand All @@ -2143,7 +2168,8 @@ pub mod test {
ConnectionPeerType::Staked,
0,
10000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
),
20
);
Expand All @@ -2155,7 +2181,8 @@ pub mod test {
ConnectionPeerType::Staked,
15,
10000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS
),
60
);
Expand All @@ -2167,7 +2194,8 @@ pub mod test {
ConnectionPeerType::Staked,
1000,
10000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS
),
4000
);
Expand All @@ -2179,7 +2207,8 @@ pub mod test {
ConnectionPeerType::Staked,
1,
50000,
MAX_UNSTAKED_CONNECTIONS
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS
),
21
);
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> {
Expand All @@ -449,6 +450,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
wait_for_chunk_timeout,
coalesce,
)
Expand All @@ -468,7 +470,9 @@ pub fn spawn_server(
mod test {
use {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
crate::nonblocking::quic::{
test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE,
std::net::SocketAddr,
Expand Down Expand Up @@ -498,6 +502,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -554,6 +559,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -597,6 +603,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down

0 comments on commit a7cfe9b

Please sign in to comment.