Skip to content

Commit

Permalink
grin v5.3 (0035) add rate limiting to outbound p2p msg sending (mimbl…
Browse files Browse the repository at this point in the history
…ewimble#3560)

treat peers as abusive based on incoming msgs, not outgoing msg rates
  • Loading branch information
bayk committed Jun 11, 2024
1 parent 7eb94d9 commit 78ecd66
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
13 changes: 12 additions & 1 deletion p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use crate::types::{
use crate::util::secp::pedersen::RangeProof;
use bytes::Bytes;
use num::FromPrimitive;
use std::fmt;
use std::fs::File;
use std::io::{Read, Write};
use std::sync::Arc;
use std::{fmt, thread, time::Duration};

/// Grin's user agent with current version
pub const USER_AGENT: &str = concat!("MW/MWC ", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -241,6 +241,17 @@ pub fn write_message<W: Write>(
msg: &Msg,
tracker: Arc<Tracker>,
) -> Result<(), Error> {
// Introduce a delay so messages are spaced at least 150ms apart.
// This gives a max msg rate of 60000/150 = 400 messages per minute.
// Exceeding 500 messages per minute will result in being banned as abusive.
if let Some(elapsed) = tracker.sent_bytes.read().elapsed_since_last_msg() {
let min_interval: u64 = 150;
let sleep_ms = min_interval.saturating_sub(elapsed);
if sleep_ms > 0 {
thread::sleep(Duration::from_millis(sleep_ms))
}
}

let mut buf = ser::ser_vec(&msg.header, msg.version)?;
buf.extend(&msg.body[..]);
stream.write_all(&buf[..])?;
Expand Down
24 changes: 5 additions & 19 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,27 +239,13 @@ impl Peer {

/// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool {
let rec = self.tracker.received_bytes.read();
let sent = self.tracker.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
let rec = self.tracker().received_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
}

/// Number of bytes sent to the peer
pub fn last_min_sent_bytes(&self) -> Option<u64> {
let sent_bytes = self.tracker.sent_bytes.read();
Some(sent_bytes.bytes_per_min())
}

/// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> {
let received_bytes = self.tracker.received_bytes.read();
Some(received_bytes.bytes_per_min())
}

pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
let received_bytes = self.tracker.received_bytes.read();
let sent_bytes = self.tracker.sent_bytes.read();
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
/// Tracker tracks sent/received bytes and message counts per minute.
pub fn tracker(&self) -> &conn::Tracker {
&self.tracker
}

/// Set this peer status to banned
Expand Down
12 changes: 6 additions & 6 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ impl Peers {
debug!("clean_peers {:?}, not connected", peer.info.addr);
rm.push(peer.info.addr.clone());
} else if peer.is_abusive() {
if let Some(counts) = peer.last_min_message_counts() {
debug!(
"clean_peers {:?}, abusive ({} sent, {} recv)",
peer.info.addr, counts.0, counts.1,
);
}
let received = peer.tracker().received_bytes.read().count_per_min();
let sent = peer.tracker().sent_bytes.read().count_per_min();
debug!(
"clean_peers {:?}, abusive ({} sent, {} recv)",
peer.info.addr, sent, received,
);
let _ = self.update_state(peer.info.addr.clone(), State::Banned);
rm.push(peer.info.addr.clone());
} else {
Expand Down
4 changes: 2 additions & 2 deletions servers/src/common/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ impl PeerStats {
height: peer.info.height(),
direction: direction.to_string(),
last_seen: peer.info.last_seen(),
sent_bytes_per_sec: peer.last_min_sent_bytes().unwrap_or(0) / 60,
received_bytes_per_sec: peer.last_min_received_bytes().unwrap_or(0) / 60,
sent_bytes_per_sec: peer.tracker().sent_bytes.read().bytes_per_min() / 60,
received_bytes_per_sec: peer.tracker().received_bytes.read().bytes_per_min() / 60,
capabilities: peer.info.capabilities,
}
}
Expand Down
8 changes: 8 additions & 0 deletions util/src/rate_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ impl RateCounter {
.filter(|x| !x.is_quiet())
.count() as u64
}

/// Elapsed time in ms since the last entry.
/// We use this to rate limit when sending.
pub fn elapsed_since_last_msg(&self) -> Option<u64> {
self.last_min_entries
.last()
.map(|x| millis_since_epoch().saturating_sub(x.timestamp))
}
}

// turns out getting the millisecs since epoch in Rust isn't as easy as it
Expand Down

0 comments on commit 78ecd66

Please sign in to comment.