Skip to content

Commit

Permalink
Merge 96227a4 into bbec1ce
Browse files Browse the repository at this point in the history
  • Loading branch information
tiangong3624749 authored Apr 28, 2021
2 parents bbec1ce + 96227a4 commit 06af017
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
61 changes: 59 additions & 2 deletions network-p2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use libp2p::swarm::{
ProtocolsHandler,
};
use log::{debug, info, trace, warn};
use std::net::IpAddr;
use std::task::{Context, Poll};
use std::{
cmp,
Expand Down Expand Up @@ -98,6 +99,7 @@ pub struct DiscoveryConfig {
enable_mdns: bool,
kademlia_disjoint_query_paths: bool,
protocol_ids: HashSet<ProtocolId>,
max_connections_per_address: u32,
}

impl DiscoveryConfig {
Expand All @@ -112,6 +114,7 @@ impl DiscoveryConfig {
enable_mdns: false,
kademlia_disjoint_query_paths: false,
protocol_ids: HashSet::new(),
max_connections_per_address: 1,
}
}

Expand Down Expand Up @@ -163,6 +166,11 @@ impl DiscoveryConfig {
self
}

pub fn max_connections_per_address(&mut self, max_connections_per_address: u32) -> &mut Self {
self.max_connections_per_address = std::cmp::max(1, max_connections_per_address);
self
}

/// Require iterative Kademlia DHT queries to use disjoint paths for increased resiliency in the
/// presence of potentially adversarial nodes.
pub fn use_kademlia_disjoint_query_paths(&mut self, value: bool) -> &mut Self {
Expand All @@ -181,6 +189,7 @@ impl DiscoveryConfig {
enable_mdns,
kademlia_disjoint_query_paths,
protocol_ids,
max_connections_per_address,
} = self;

let kademlias = protocol_ids
Expand Down Expand Up @@ -215,6 +224,8 @@ impl DiscoveryConfig {
pending_events: VecDeque::new(),
local_peer_id,
num_connections: 0,
max_connections_per_address,
connections: HashMap::new(),
allow_private_ipv4,
discovery_only_if_under_num,
#[cfg(not(target_os = "unknown"))]
Expand Down Expand Up @@ -252,6 +263,10 @@ pub struct DiscoveryBehaviour {
local_peer_id: PeerId,
/// Number of nodes we're currently connected to.
num_connections: u64,
/// Maximum number of connections from any single IP address
max_connections_per_address: u32,
/// Number of connections from any single IP address
connections: HashMap<IpAddr, u8>,
/// If false, `addresses_of_peer` won't return any private IPv4 address, except for the ones
/// stored in `user_defined`.
allow_private_ipv4: bool,
Expand Down Expand Up @@ -527,7 +542,20 @@ impl NetworkBehaviour for DiscoveryBehaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
) {
self.num_connections += 1;
if endpoint.is_listener() {
if let Some(ip) = multiaddr_to_ip_address(endpoint.get_remote_address()) {
if let Some(connections) = self.connections.get_mut(&ip) {
if self.max_connections_per_address <= (*connections as u32) {
debug!("[network]Threshold reached for single address {:}, max connections {:?}, real connections {:?}", ip, self.max_connections_per_address, *connections);
return;
}
let _ = connections.saturating_add(1_u8);
} else {
self.connections.insert(ip, 1);
};
}
}
let _ = self.num_connections.saturating_add(1);
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_established(k, peer_id, conn, endpoint)
}
Expand All @@ -545,7 +573,18 @@ impl NetworkBehaviour for DiscoveryBehaviour {
conn: &ConnectionId,
endpoint: &ConnectedPoint,
) {
self.num_connections -= 1;
let _ = self.num_connections.saturating_sub(1);
if endpoint.is_listener() {
if let Some(ip) = multiaddr_to_ip_address(endpoint.get_remote_address()) {
if let Some(connections) = self.connections.get_mut(&ip) {
if *connections <= 1_u8 {
self.connections.remove(&ip);
} else {
let _ = connections.saturating_sub(1_u8);
}
};
}
}
for k in self.kademlias.values_mut() {
NetworkBehaviour::inject_connection_closed(k, peer_id, conn, endpoint)
}
Expand Down Expand Up @@ -1144,4 +1183,22 @@ mod tests {
"Expected remote peer not to be added to `protocol_b` Kademlia instance.",
);
}

#[test]
fn test_multiaddr_to_ip_address() {
use super::multiaddr_to_ip_address;

assert!(
multiaddr_to_ip_address(&"/ip4/127.0.0.1/udt/sctp/5678".parse().unwrap()).is_some()
);
assert!(multiaddr_to_ip_address(&"/memory/111".parse().unwrap()).is_none());
}
}

fn multiaddr_to_ip_address(multiaddr: &Multiaddr) -> Option<IpAddr> {
match multiaddr.iter().collect::<Vec<_>>()[0] {
Protocol::Ip4(ip4) => Some(ip4.into()),
Protocol::Ip6(ip6) => Some(ip6.into()),
_ => None,
}
}
1 change: 1 addition & 0 deletions network-p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl NetworkWorker {
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
config.add_protocol(params.protocol_id.clone());
config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht);
config.max_connections_per_address(params.network_config.in_peers / 2);

match params.network_config.transport {
TransportConfig::MemoryOnly => {
Expand Down

0 comments on commit 06af017

Please sign in to comment.