Skip to content

Commit

Permalink
sleep instead of drop when stream rate exceeded limit; (anza-xyz#939)
Browse files Browse the repository at this point in the history
* sleep instead of drop when stream rate exceeded limit;

Consider connection count of staked nodes when calculating allowed PPS

remove rtt from throttle_duration calculation

removed connection count in StreamerCounter -- we do not need it at this point

* remove connection count related changes -- they are unrelated to this PR

* revert unintended changes
  • Loading branch information
lijunwangs authored Apr 23, 2024
1 parent a921457 commit 137a982
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
49 changes: 30 additions & 19 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
Expand Down Expand Up @@ -55,7 +55,7 @@ use {
// introduce any other awaits while holding the RwLock.
sync::{Mutex, MutexGuard},
task::JoinHandle,
time::timeout,
time::{sleep, timeout},
},
};

Expand Down Expand Up @@ -825,25 +825,36 @@ async fn handle_connection(
params.total_stake,
);

stream_counter.reset_throttling_params_if_needed();
if stream_counter.stream_count.load(Ordering::Relaxed)
>= max_streams_per_throttling_interval
{
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match params.peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked(_) => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
let throttle_interval_start =
stream_counter.reset_throttling_params_if_needed();
let streams_read_in_throttle_interval =
stream_counter.stream_count.load(Ordering::Relaxed);
if streams_read_in_throttle_interval >= max_streams_per_throttling_interval {
// The peer is sending faster than we're willing to read. Sleep for what's
// left of this read interval so the peer backs off.
let throttle_duration = STREAM_THROTTLING_INTERVAL
.saturating_sub(throttle_interval_start.elapsed());

if !throttle_duration.is_zero() {
debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \
max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \
throttle_duration: {throttle_duration:?}",
params.peer_type, params.total_stake);
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match params.peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked(_) => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
}
}
sleep(throttle_duration).await;
}
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
stream_load_ema.increment_load(params.peer_type);
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
Expand Down
18 changes: 12 additions & 6 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use {

const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
pub const STREAM_THROTTLING_INTERVAL: Duration =
Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5;
const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10;
const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT;
Expand Down Expand Up @@ -208,19 +209,24 @@ impl ConnectionStreamCounter {
}
}

pub(crate) fn reset_throttling_params_if_needed(&self) {
const THROTTLING_INTERVAL: Duration = Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
if tokio::time::Instant::now().duration_since(*self.last_throttling_instant.read().unwrap())
> THROTTLING_INTERVAL
/// Reset the counter and last throttling instant and
/// return last_throttling_instant regardless it is reset or not.
pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant {
let last_throttling_instant = *self.last_throttling_instant.read().unwrap();
if tokio::time::Instant::now().duration_since(last_throttling_instant)
> STREAM_THROTTLING_INTERVAL
{
let mut last_throttling_instant = self.last_throttling_instant.write().unwrap();
// Recheck as some other thread might have done throttling since this thread tried to acquire the write lock.
if tokio::time::Instant::now().duration_since(*last_throttling_instant)
> THROTTLING_INTERVAL
> STREAM_THROTTLING_INTERVAL
{
*last_throttling_instant = tokio::time::Instant::now();
self.stream_count.store(0, Ordering::Relaxed);
}
*last_throttling_instant
} else {
last_throttling_instant
}
}
}
Expand Down

0 comments on commit 137a982

Please sign in to comment.