Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

corrected to not use hardcoded connections count for unstaked #633

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ async fn run_server(
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
max_unstaked_connections > 0,
stats.clone(),
max_unstaked_connections,
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Expand Down
43 changes: 24 additions & 19 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use {
crate::{
nonblocking::quic::ConnectionPeerType,
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
crate::{nonblocking::quic::ConnectionPeerType, quic::StreamStats},
percentage::Percentage,
std::{
cmp,
Expand Down Expand Up @@ -38,7 +35,8 @@ pub(crate) struct StakedStreamLoadEMA {
}

impl StakedStreamLoadEMA {
pub(crate) fn new(allow_unstaked_streams: bool, stats: Arc<StreamStats>) -> Self {
pub(crate) fn new(stats: Arc<StreamStats>, max_unstaked_connections: usize) -> Self {
let allow_unstaked_streams = max_unstaked_connections > 0;
let max_staked_load_in_ema_window = if allow_unstaked_streams {
(MAX_STREAMS_PER_MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
Expand All @@ -48,17 +46,21 @@ impl StakedStreamLoadEMA {
};

let max_num_unstaked_connections =
u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| {
u64::try_from(max_unstaked_connections).unwrap_or_else(|_| {
error!(
"Failed to convert maximum number of unstaked connections {} to u64.",
MAX_UNSTAKED_CONNECTIONS
max_unstaked_connections
);
500
});

let max_unstaked_load_in_throttling_window = Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections);
let max_unstaked_load_in_throttling_window = if allow_unstaked_streams {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections)
} else {
0
};

Self {
current_load_ema: AtomicU64::default(),
Expand Down Expand Up @@ -225,7 +227,10 @@ impl ConnectionStreamCounter {
pub mod test {
use {
super::*,
crate::{nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, quic::StreamStats},
crate::{
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
Expand All @@ -235,8 +240,8 @@ pub mod test {
#[test]
fn test_max_streams_for_unstaked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
true,
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
Expand All @@ -251,8 +256,8 @@ pub mod test {
#[test]
fn test_max_streams_for_staked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
true,
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -342,8 +347,8 @@ pub mod test {
#[test]
fn test_max_streams_for_staked_connection_with_no_unstaked_connections() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
false,
Arc::new(StreamStats::default()),
0,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -413,12 +418,12 @@ pub mod test {
10000
);

// At 1/40000 stake weight, and minimum load, it should still allow
// At 1/400000 stake weight, and minimum load, it should still allow
// max_unstaked_load_in_throttling_window + 1 streams.
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1),
40000
400000
),
load_ema
.max_unstaked_load_in_throttling_window
Expand All @@ -429,8 +434,8 @@ pub mod test {
#[test]
fn test_update_ema() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
true,
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
));
stream_load_ema
.load_in_recent_interval
Expand All @@ -457,8 +462,8 @@ pub mod test {
#[test]
fn test_update_ema_missing_interval() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
true,
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
));
stream_load_ema
.load_in_recent_interval
Expand All @@ -476,8 +481,8 @@ pub mod test {
#[test]
fn test_update_ema_if_needed() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
true,
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
));
stream_load_ema
.load_in_recent_interval
Expand Down
Loading