From 9040d6c29a2fe54b69ee56424ae44a253798fcef Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 18 Jul 2024 11:19:39 +0200 Subject: [PATCH 1/5] feat: make KnownPeers operate on IPs w/o port and add timestamps Signed-off-by: ljedrz --- node/tcp/src/helpers/known_peers.rs | 33 ++++++++++++++++++++--------- node/tcp/src/helpers/stats.rs | 26 +++++++++++++++++++++-- node/tcp/src/protocols/reading.rs | 6 +++--- node/tcp/src/protocols/writing.rs | 4 ++-- node/tcp/src/tcp.rs | 17 +++++---------- 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/node/tcp/src/helpers/known_peers.rs b/node/tcp/src/helpers/known_peers.rs index dea035b5e0..4f7143c943 100644 --- a/node/tcp/src/helpers/known_peers.rs +++ b/node/tcp/src/helpers/known_peers.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{ + collections::{hash_map::Entry, HashMap}, + net::IpAddr, + sync::Arc, + time::Instant, +}; use parking_lot::RwLock; @@ -20,45 +25,53 @@ use crate::Stats; /// Contains statistics related to Tcp's peers, currently connected or not. #[derive(Default)] -pub struct KnownPeers(RwLock>>); +pub struct KnownPeers(RwLock>>); impl KnownPeers { /// Adds an address to the list of known peers. - pub fn add(&self, addr: SocketAddr) { - self.0.write().entry(addr).or_default(); + pub fn add(&self, addr: IpAddr) { + let timestamp = Instant::now(); + match self.0.write().entry(addr) { + Entry::Vacant(entry) => { + entry.insert(Arc::new(Stats::new(timestamp))); + } + Entry::Occupied(entry) => { + *entry.get().timestamp.write() = timestamp; + } + } } /// Returns the stats for the given peer. - pub fn get(&self, addr: SocketAddr) -> Option> { + pub fn get(&self, addr: IpAddr) -> Option> { self.0.read().get(&addr).map(Arc::clone) } /// Removes an address from the list of known peers. - pub fn remove(&self, addr: SocketAddr) -> Option> { + pub fn remove(&self, addr: IpAddr) -> Option> { self.0.write().remove(&addr) } /// Returns the list of all known peers and their stats. - pub fn snapshot(&self) -> HashMap> { + pub fn snapshot(&self) -> HashMap> { self.0.read().clone() } /// Registers a submission of a message to the given address. - pub fn register_sent_message(&self, to: SocketAddr, size: usize) { + pub fn register_sent_message(&self, to: IpAddr, size: usize) { if let Some(stats) = self.0.read().get(&to) { stats.register_sent_message(size); } } /// Registers a receipt of a message to the given address. - pub fn register_received_message(&self, from: SocketAddr, size: usize) { + pub fn register_received_message(&self, from: IpAddr, size: usize) { if let Some(stats) = self.0.read().get(&from) { stats.register_received_message(size); } } /// Registers a failure associated with the given address. - pub fn register_failure(&self, addr: SocketAddr) { + pub fn register_failure(&self, addr: IpAddr) { if let Some(stats) = self.0.read().get(&addr) { stats.register_failure(); } diff --git a/node/tcp/src/helpers/stats.rs b/node/tcp/src/helpers/stats.rs index 2889561b8e..acafbbe14b 100644 --- a/node/tcp/src/helpers/stats.rs +++ b/node/tcp/src/helpers/stats.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; +use parking_lot::RwLock; +use std::{ + sync::atomic::{AtomicU64, Ordering::Relaxed}, + time::Instant, +}; /// Contains statistics related to Tcp. -#[derive(Default)] pub struct Stats { + /// The timestamp of the creation (for the node) or connection (to a peer). + pub(crate) timestamp: RwLock, /// The number of all messages sent. msgs_sent: AtomicU64, /// The number of all messages received. @@ -30,6 +35,23 @@ pub struct Stats { } impl Stats { + /// Creates a new instance of the object. + pub fn new(timestamp: Instant) -> Self { + Self { + timestamp: RwLock::new(timestamp), + msgs_sent: Default::default(), + msgs_received: Default::default(), + bytes_sent: Default::default(), + bytes_received: Default::default(), + failures: Default::default(), + } + } + + /// Returns the creation or connection timestamp. + pub fn timestamp(&self) -> Instant { + *self.timestamp.read() + } + /// Returns the number of sent messages and their collective size in bytes. pub fn sent(&self) -> (u64, u64) { let msgs = self.msgs_sent.load(Relaxed); diff --git a/node/tcp/src/protocols/reading.rs b/node/tcp/src/protocols/reading.rs index 00f0907573..53668317d2 100644 --- a/node/tcp/src/protocols/reading.rs +++ b/node/tcp/src/protocols/reading.rs @@ -143,7 +143,7 @@ impl ReadingInternal for R { while let Some(msg) = inbound_message_receiver.recv().await { if let Err(e) = self_clone.process_message(addr, msg).await { error!(parent: node.span(), "can't process a message from {addr}: {e}"); - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); } #[cfg(feature = "metrics")] metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64); @@ -178,7 +178,7 @@ impl ReadingInternal for R { } Err(e) => { error!(parent: node.span(), "can't read from {addr}: {e}"); - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); if node.config().fatal_io_errors.contains(&e.kind()) { break; } @@ -229,7 +229,7 @@ impl Decoder for CountingCodec { if ret.is_some() { self.acc = 0; - self.node.known_peers().register_received_message(self.addr, read_len); + self.node.known_peers().register_received_message(self.addr.ip(), read_len); self.node.stats().register_received_message(read_len); } else { self.acc = read_len; diff --git a/node/tcp/src/protocols/writing.rs b/node/tcp/src/protocols/writing.rs index 5d15dbcda8..777fcb68d3 100644 --- a/node/tcp/src/protocols/writing.rs +++ b/node/tcp/src/protocols/writing.rs @@ -219,12 +219,12 @@ impl WritingInternal for W { match self_clone.write_to_stream(*msg, &mut framed).await { Ok(len) => { let _ = wrapped_msg.delivery_notification.send(Ok(())); - node.known_peers().register_sent_message(addr, len); + node.known_peers().register_sent_message(addr.ip(), len); node.stats().register_sent_message(len); trace!(parent: node.span(), "sent {}B to {}", len, addr); } Err(e) => { - node.known_peers().register_failure(addr); + node.known_peers().register_failure(addr.ip()); error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e); let is_fatal = node.config().fatal_io_errors.contains(&e.kind()); let _ = wrapped_msg.delivery_notification.send(Err(e)); diff --git a/node/tcp/src/tcp.rs b/node/tcp/src/tcp.rs index 47f3a2f822..1469d98e95 100644 --- a/node/tcp/src/tcp.rs +++ b/node/tcp/src/tcp.rs @@ -22,7 +22,7 @@ use std::{ atomic::{AtomicUsize, Ordering::*}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use once_cell::sync::OnceCell; @@ -101,7 +101,7 @@ impl Tcp { connecting: Default::default(), connections: Default::default(), known_peers: Default::default(), - stats: Default::default(), + stats: Stats::new(Instant::now()), tasks: Default::default(), })); @@ -255,7 +255,7 @@ impl Tcp { if let Err(ref e) = ret { self.connecting.lock().remove(&addr); - self.known_peers().register_failure(addr); + self.known_peers().register_failure(addr.ip()); error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}"); } @@ -282,13 +282,6 @@ impl Tcp { task.abort(); } - // If the (owning) Tcp was not the initiator of the connection, it doesn't know the listening address - // of the associated peer, so the related stats are unreliable; the next connection initiated by the - // peer could be bound to an entirely different port number - if conn.side() == ConnectionSide::Initiator { - self.known_peers().remove(conn.addr()); - } - debug!(parent: self.span(), "Disconnected from {}", conn.addr()); } else { warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}"); @@ -386,7 +379,7 @@ impl Tcp { tokio::spawn(async move { if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await { tcp.connecting.lock().remove(&addr); - tcp.known_peers().register_failure(addr); + tcp.known_peers().register_failure(addr.ip()); error!(parent: tcp.span(), "Failed to connect with {addr}: {e}"); } }); @@ -426,7 +419,7 @@ impl Tcp { /// Prepares the freshly acquired connection to handle the protocols the Tcp implements. async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> { - self.known_peers.add(peer_addr); + self.known_peers.add(peer_addr.ip()); // Register the port seen by the peer. if own_side == ConnectionSide::Initiator { From 057e089b302726c8c7cc0e54cdf89a363075abd0 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 18 Jul 2024 11:39:43 +0200 Subject: [PATCH 2/5] feat: introduce IP-level bans and apply them to connection spammers Signed-off-by: ljedrz --- node/bft/src/gateway.rs | 60 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 60af4c788f..ac1794fef4 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -64,7 +64,14 @@ use futures::SinkExt; use indexmap::{IndexMap, IndexSet}; use parking_lot::{Mutex, RwLock}; use rand::seq::{IteratorRandom, SliceRandom}; -use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + io, + net::{IpAddr, SocketAddr}, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{ net::TcpStream, sync::{oneshot, OnceCell}, @@ -88,6 +95,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175; /// The maximum number of validators to send in a validators response event. const MAX_VALIDATORS_TO_SEND: usize = 200; +/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. +#[cfg(not(any(test, feature = "test")))] +const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; +/// The amount of time an IP address is prohibited from connecting. +const IP_BAN_TIME_IN_SECS: u64 = 30; + /// Part of the Gateway API that deals with networking. /// This is a separate trait to allow for easier testing/mocking. #[async_trait] @@ -119,6 +132,8 @@ pub struct Gateway { /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously /// attempt to connect to each other). This set is used to prevent this from happening. connecting_peers: Arc>>, + /// The set of banned IP addresses. + banned_ips: Arc>>, /// The primary sender. primary_sender: Arc>>, /// The worker senders. @@ -160,6 +175,7 @@ impl Gateway { trusted_validators: trusted_validators.iter().copied().collect(), connected_peers: Default::default(), connecting_peers: Default::default(), + banned_ips: Default::default(), primary_sender: Default::default(), worker_senders: Default::default(), sync_sender: Default::default(), @@ -459,6 +475,19 @@ impl Gateway { Ok(()) } + /// Check whether the given IP address is currently banned. + #[cfg(not(any(test, feature = "test")))] + fn is_ip_banned(&self, ip: IpAddr) -> bool { + self.banned_ips.read().contains_key(&ip) + } + + /// Insert or update a banned IP. + #[cfg(not(any(test, feature = "test")))] + fn update_ip_ban(&self, ip: IpAddr) { + let timestamp = Instant::now(); + self.banned_ips.write().insert(ip, timestamp); + } + #[cfg(feature = "metrics")] fn update_metrics(&self) { metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64); @@ -875,6 +904,8 @@ impl Gateway { self.handle_unauthorized_validators(); // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`. self.handle_min_connected_validators(); + // Unban any addresses whose ban time has expired. + self.handle_banned_ips(); } /// Logs the connected validators. @@ -955,6 +986,11 @@ impl Gateway { } } } + + // Remove addresses whose ban time has expired. + fn handle_banned_ips(&self) { + self.banned_ips.write().retain(|_, timestamp| timestamp.elapsed().as_secs() < IP_BAN_TIME_IN_SECS); + } } #[async_trait] @@ -1105,9 +1141,29 @@ impl OnConnect for Gateway { impl Handshake for Gateway { /// Performs the handshake protocol. async fn perform_handshake(&self, mut connection: Connection) -> io::Result { - // Perform the handshake. let peer_addr = connection.addr(); let peer_side = connection.side(); + + // Check (or impose) IP-level bans. + #[cfg(not(any(test, feature = "test")))] + if self.dev().is_none() && peer_side == ConnectionSide::Initiator { + // If the IP is already banned, update the ban timestamp and reject the connection. + if self.is_ip_banned(peer_addr.ip()) { + self.update_ip_ban(peer_addr.ip()); + trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); + } + + // Check the previous low-level connection timestamp. + if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { + if peer_stats.timestamp().elapsed().as_secs() <= MIN_CONNECTION_INTERVAL_IN_SECS { + self.update_ip_ban(peer_addr.ip()); + trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); + } + } + } + let stream = self.borrow_stream(&mut connection); // If this is an inbound connection, we log it, but don't know the listening address yet. From ec1ffee1725e81cbfd4505e2331d524f2df3a424 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 18 Jul 2024 16:21:44 +0200 Subject: [PATCH 3/5] fix: automatically use a test feature in node-bft Signed-off-by: ljedrz --- node/bft/Cargo.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 8b2c5ed86c..4a1ba078d6 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -19,6 +19,7 @@ edition = "2021" [features] default = [ ] metrics = [ "dep:metrics", "snarkos-node-bft-events/metrics", "snarkos-node-bft-ledger-service/metrics" ] +test = [ ] [dependencies.aleo-std] workspace = true @@ -152,6 +153,10 @@ version = "0.4" [dev-dependencies.rayon] version = "1" +[dev-dependencies.snarkos-node-bft] +path = "." +features = [ "test" ] + [dev-dependencies.snarkos-node-bft-ledger-service] path = "./ledger-service" default-features = false From a921563381ed36acb24fa96d58c810b590d62651 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 22 Jul 2024 14:05:00 +0200 Subject: [PATCH 4/5] refactor: move the IP ban set down to the Tcp Signed-off-by: ljedrz --- node/bft/src/gateway.rs | 21 +++++---------- node/tcp/src/helpers/banned_peers.rs | 39 ++++++++++++++++++++++++++++ node/tcp/src/helpers/mod.rs | 3 +++ node/tcp/src/tcp.rs | 10 +++++++ 4 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 node/tcp/src/helpers/banned_peers.rs diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index ac1794fef4..16de897e50 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -64,14 +64,9 @@ use futures::SinkExt; use indexmap::{IndexMap, IndexSet}; use parking_lot::{Mutex, RwLock}; use rand::seq::{IteratorRandom, SliceRandom}; -use std::{ - collections::{HashMap, HashSet}, - future::Future, - io, - net::{IpAddr, SocketAddr}, - sync::Arc, - time::{Duration, Instant}, -}; +#[cfg(not(any(test, feature = "test")))] +use std::net::IpAddr; +use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ net::TcpStream, sync::{oneshot, OnceCell}, @@ -132,8 +127,6 @@ pub struct Gateway { /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously /// attempt to connect to each other). This set is used to prevent this from happening. connecting_peers: Arc>>, - /// The set of banned IP addresses. - banned_ips: Arc>>, /// The primary sender. primary_sender: Arc>>, /// The worker senders. @@ -175,7 +168,6 @@ impl Gateway { trusted_validators: trusted_validators.iter().copied().collect(), connected_peers: Default::default(), connecting_peers: Default::default(), - banned_ips: Default::default(), primary_sender: Default::default(), worker_senders: Default::default(), sync_sender: Default::default(), @@ -478,14 +470,13 @@ impl Gateway { /// Check whether the given IP address is currently banned. #[cfg(not(any(test, feature = "test")))] fn is_ip_banned(&self, ip: IpAddr) -> bool { - self.banned_ips.read().contains_key(&ip) + self.tcp.banned_peers().is_ip_banned(ip) } /// Insert or update a banned IP. #[cfg(not(any(test, feature = "test")))] fn update_ip_ban(&self, ip: IpAddr) { - let timestamp = Instant::now(); - self.banned_ips.write().insert(ip, timestamp); + self.tcp.banned_peers().update_ip_ban(ip); } #[cfg(feature = "metrics")] @@ -989,7 +980,7 @@ impl Gateway { // Remove addresses whose ban time has expired. fn handle_banned_ips(&self) { - self.banned_ips.write().retain(|_, timestamp| timestamp.elapsed().as_secs() < IP_BAN_TIME_IN_SECS); + self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS); } } diff --git a/node/tcp/src/helpers/banned_peers.rs b/node/tcp/src/helpers/banned_peers.rs new file mode 100644 index 0000000000..3ace384071 --- /dev/null +++ b/node/tcp/src/helpers/banned_peers.rs @@ -0,0 +1,39 @@ +// Copyright (C) 2019-2023 Aleo Systems Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::HashMap, net::IpAddr, time::Instant}; + +use parking_lot::RwLock; + +/// Contains the set of peers currently banned by IP. +#[derive(Default)] +pub struct BannedPeers(RwLock>); + +impl BannedPeers { + /// Check whether the given IP address is currently banned. + pub fn is_ip_banned(&self, ip: IpAddr) -> bool { + self.0.read().contains_key(&ip) + } + + /// Insert or update a banned IP. + pub fn update_ip_ban(&self, ip: IpAddr) { + let timestamp = Instant::now(); + self.0.write().insert(ip, timestamp); + } + + /// Remove the expired entries. + pub fn remove_old_bans(&self, ban_time_in_secs: u64) { + self.0.write().retain(|_, timestamp| timestamp.elapsed().as_secs() < ban_time_in_secs); + } +} diff --git a/node/tcp/src/helpers/mod.rs b/node/tcp/src/helpers/mod.rs index 05e73f6886..22d3ef9b18 100644 --- a/node/tcp/src/helpers/mod.rs +++ b/node/tcp/src/helpers/mod.rs @@ -15,6 +15,9 @@ mod config; pub use config::Config; +mod banned_peers; +pub use banned_peers::BannedPeers; + pub mod connections; pub use connections::{Connection, ConnectionSide}; diff --git a/node/tcp/src/tcp.rs b/node/tcp/src/tcp.rs index 1469d98e95..d4c0f0cff2 100644 --- a/node/tcp/src/tcp.rs +++ b/node/tcp/src/tcp.rs @@ -39,6 +39,7 @@ use tracing::*; use crate::{ connections::{Connection, ConnectionSide, Connections}, protocols::{Protocol, Protocols}, + BannedPeers, Config, KnownPeers, Stats, @@ -75,6 +76,8 @@ pub struct InnerTcp { connections: Connections, /// Collects statistics related to the node's peers. known_peers: KnownPeers, + /// Contains the set of currently banned peers. + banned_peers: BannedPeers, /// Collects statistics related to the node itself. stats: Stats, /// The node's tasks. @@ -101,6 +104,7 @@ impl Tcp { connecting: Default::default(), connections: Default::default(), known_peers: Default::default(), + banned_peers: Default::default(), stats: Stats::new(Instant::now()), tasks: Default::default(), })); @@ -165,6 +169,12 @@ impl Tcp { &self.known_peers } + /// Returns a reference to the set of currently banned peers. + #[inline] + pub fn banned_peers(&self) -> &BannedPeers { + &self.banned_peers + } + /// Returns a reference to the statistics. #[inline] pub fn stats(&self) -> &Stats { From 7efde52bb29977e9ca71e543ce96eb0e655fc2bc Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 30 Aug 2024 14:25:01 +0200 Subject: [PATCH 5/5] feat: extend the IP ban to the Router Signed-off-by: ljedrz --- node/router/src/handshake.rs | 20 ++++++++++++++++++++ node/router/src/heartbeat.rs | 9 +++++++++ node/router/src/lib.rs | 17 +++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 015162beac..587e594c48 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -100,6 +100,26 @@ impl Router { Some(peer_addr) }; + // Check (or impose) IP-level bans. + #[cfg(not(any(test, feature = "test")))] + if !self.is_dev() && peer_side == ConnectionSide::Initiator { + // If the IP is already banned, update the ban timestamp and reject the connection. + if self.is_ip_banned(peer_addr.ip()) { + self.update_ip_ban(peer_addr.ip()); + trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' is a banned IP address", peer_addr.ip()))); + } + + // Check the previous low-level connection timestamp. + if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) { + if peer_stats.timestamp().elapsed().as_secs() <= Router::::MIN_CONNECTION_INTERVAL_IN_SECS { + self.update_ip_ban(peer_addr.ip()); + trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip()); + return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip()))); + } + } + } + // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time. let handshake_result = if peer_side == ConnectionSide::Responder { self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index dca8c6d225..7276bc8415 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -42,6 +42,8 @@ pub trait Heartbeat: Outbound { const MAXIMUM_NUMBER_OF_PEERS: usize = 21; /// The maximum number of provers to maintain connections with. const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4; + /// The amount of time an IP address is prohibited from connecting. + const IP_BAN_TIME_IN_SECS: u64 = 30; /// Handles the heartbeat request. fn heartbeat(&self) { @@ -60,6 +62,8 @@ pub trait Heartbeat: Outbound { self.handle_trusted_peers(); // Keep the puzzle request up to date. self.handle_puzzle_request(); + // Unban any addresses whose ban time has expired. + self.handle_banned_ips(); } /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers. @@ -283,4 +287,9 @@ pub trait Heartbeat: Outbound { fn handle_puzzle_request(&self) { // No-op } + + // Remove addresses whose ban time has expired. + fn handle_banned_ips(&self) { + self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS); + } } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 45e6470e8a..4c9c877dbe 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -45,6 +45,8 @@ use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; use anyhow::{bail, Result}; use parking_lot::{Mutex, RwLock}; +#[cfg(not(any(test, feature = "test")))] +use std::net::IpAddr; use std::{ collections::{HashMap, HashSet}, future::Future, @@ -107,6 +109,9 @@ impl Router { /// The duration in seconds after which a connected peer is considered inactive or /// disconnected if no message has been received in the meantime. const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes + /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. + #[cfg(not(any(test, feature = "test")))] + const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10; } impl Router { @@ -427,6 +432,18 @@ impl Router { self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect() } + /// Check whether the given IP address is currently banned. + #[cfg(not(any(test, feature = "test")))] + fn is_ip_banned(&self, ip: IpAddr) -> bool { + self.tcp.banned_peers().is_ip_banned(ip) + } + + /// Insert or update a banned IP. + #[cfg(not(any(test, feature = "test")))] + fn update_ip_ban(&self, ip: IpAddr) { + self.tcp.banned_peers().update_ip_ban(ip); + } + #[cfg(feature = "metrics")] fn update_metrics(&self) { metrics::gauge(metrics::router::CONNECTED, self.connected_peers.read().len() as f64);