Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterize max streams per ms #707

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -270,6 +271,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -163,6 +163,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -183,6 +184,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
10 changes: 8 additions & 2 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
std::{
net::{SocketAddr, UdpSocket},
Expand Down Expand Up @@ -82,6 +84,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -161,6 +164,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -223,6 +227,7 @@ mod tests {
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -251,6 +256,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
18 changes: 15 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS,
STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
Expand Down Expand Up @@ -76,6 +76,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
Expand Down Expand Up @@ -124,6 +127,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
Expand All @@ -147,6 +151,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
Expand All @@ -164,6 +169,7 @@ async fn run_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
Expand All @@ -176,6 +182,7 @@ async fn run_server(
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
stats.clone(),
max_unstaked_connections,
max_streams_per_ms,
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Expand Down Expand Up @@ -206,6 +213,7 @@ async fn run_server(
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
Expand Down Expand Up @@ -484,6 +492,7 @@ async fn setup_connection(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
Expand All @@ -505,7 +514,7 @@ async fn setup_connection(
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64;
1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let peer_type = if stake_ratio < min_stake_ratio {
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
Expand Down Expand Up @@ -1327,6 +1336,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1762,6 +1772,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1791,6 +1802,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
26 changes: 18 additions & 8 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use {
},
};

/// Limit to 250K PPS
pub const MAX_STREAMS_PER_MS: u64 = 250;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
Expand All @@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA {
}

impl StakedStreamLoadEMA {
pub(crate) fn new(stats: Arc<StreamStats>, max_unstaked_connections: usize) -> Self {
pub(crate) fn new(
stats: Arc<StreamStats>,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Self {
let allow_unstaked_streams = max_unstaked_connections > 0;
let max_staked_load_in_ema_window = if allow_unstaked_streams {
(MAX_STREAMS_PER_MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
(max_streams_per_ms
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms))
* EMA_WINDOW_MS
} else {
MAX_STREAMS_PER_MS * EMA_WINDOW_MS
max_streams_per_ms * EMA_WINDOW_MS
};

let max_num_unstaked_connections =
Expand All @@ -56,7 +58,7 @@ impl StakedStreamLoadEMA {

let max_unstaked_load_in_throttling_window = if allow_unstaked_streams {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections)
} else {
0
Expand Down Expand Up @@ -228,7 +230,9 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
Expand All @@ -242,6 +246,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
Expand All @@ -258,6 +263,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -349,6 +355,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
0,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -436,6 +443,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down Expand Up @@ -464,6 +472,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand All @@ -483,6 +492,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<SpawnServerResult, QuicServerError> {
Expand All @@ -524,6 +525,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
wait_for_chunk_timeout,
coalesce,
)
Expand All @@ -550,7 +552,9 @@ pub fn spawn_server(
mod test {
use {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
crate::nonblocking::quic::{
test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE,
std::net::SocketAddr,
Expand Down Expand Up @@ -583,6 +587,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -642,6 +647,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -688,6 +694,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
Loading