Skip to content

P2p crawler peers ban #1244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dns_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chainstate = { path = "../chainstate" }
common = { path = "../common" }
crypto = { path = "../crypto" }
logging = { path = "../logging" }
Expand Down
170 changes: 151 additions & 19 deletions dns_server/src/crawler_p2p/crawler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ use std::{
time::Duration,
};

use chainstate::ban_score::BanScore;
use common::{chain::ChainConfig, primitives::time::Time};
use crypto::random::{seq::IteratorRandom, Rng};
use logging::log;
use p2p::{
config::{BanDuration, BanThreshold},
error::P2pError,
net::types::PeerInfo,
peer_manager::{ADDR_RATE_BUCKET_SIZE, ADDR_RATE_INITIAL_SIZE, MAX_ADDR_RATE_PER_SECOND},
types::{peer_id::PeerId, socket_address::SocketAddress},
types::{bannable_address::BannableAddress, peer_id::PeerId, socket_address::SocketAddress},
utils::rate_limiter::RateLimiter,
};

Expand All @@ -52,22 +54,35 @@ use self::address_data::{AddressData, AddressState};
/// How many outbound connection attempts can be made per heartbeat
const MAX_CONNECTS_PER_HEARTBEAT: usize = 25;

#[derive(Clone)]
pub struct CrawlerConfig {
pub ban_threshold: BanThreshold,
pub ban_duration: BanDuration,
}

/// The `Crawler` is the component that communicates with Mintlayer peers using p2p,
/// and based on the results, commands the DNS server to add/remove ip addresses.
/// The `Crawler` emits events that communicate whether addresses were reached or,
/// are unreachable anymore.
pub struct Crawler {
/// Current time. This value is advanced explicitly by the caller code.
now: Time,

/// Chain config
chain_config: Arc<ChainConfig>,

/// Crawler config
config: CrawlerConfig,

/// Map of all known addresses (including currently unreachable); these addresses
/// will be periodically tested, and reachable addresses will be handed
/// to the DNS server to be returned to the user on DNS queries,
/// and unreachable addresses will be removed from the DNS server
addresses: BTreeMap<SocketAddress, AddressData>,

/// Banned addresses.
banned_addresses: BTreeMap<BannableAddress, Time>,

/// Map of all currently connected outbound peers that we successfully
/// reached and are still connected to (generally speaking,
/// we don't have to stay connected to those peers, but this is an implementation detail)
Expand All @@ -77,6 +92,7 @@ pub struct Crawler {
struct Peer {
address: SocketAddress,
address_rate_limiter: RateLimiter,
ban_score: u32,
}

pub enum CrawlerEvent {
Expand All @@ -98,6 +114,10 @@ pub enum CrawlerEvent {
address: SocketAddress,
error: P2pError,
},
Misbehaved {
peer_id: PeerId,
error: P2pError,
},
}

pub enum CrawlerCommand {
Expand All @@ -112,16 +132,24 @@ pub enum CrawlerCommand {
old_state: AddressState,
new_state: AddressState,
},
MarkAsBanned {
address: BannableAddress,
ban_until: Time,
},
RemoveBannedStatus {
address: BannableAddress,
},
}

impl Crawler {
pub fn new(
now: Time,
chain_config: Arc<ChainConfig>,
config: CrawlerConfig,
loaded_addresses: BTreeSet<SocketAddress>,
loaded_banned_addresses: BTreeMap<BannableAddress, Time>,
reserved_addresses: BTreeSet<SocketAddress>,
) -> Self {
let now = common::primitives::time::get_time();

let addresses = loaded_addresses
.union(&reserved_addresses)
.map(|addr| {
Expand All @@ -142,7 +170,9 @@ impl Crawler {
Self {
now,
chain_config,
config,
addresses,
banned_addresses: loaded_banned_addresses,
outbound_peers: BTreeMap::new(),
}
}
Expand All @@ -153,7 +183,7 @@ impl Crawler {
peer_info: PeerInfo,
callback: &mut impl FnMut(CrawlerCommand),
) {
log::info!("connected open, peer_id: {}", peer_info.peer_id);
log::info!("connection opened, peer_id: {}", peer_info.peer_id);
self.create_outbound_peer(peer_info.peer_id, address, peer_info, callback);
}

Expand All @@ -177,6 +207,77 @@ impl Crawler {
AddressStateTransitionTo::Disconnected,
callback,
);

self.handle_new_ban_score(&address, error.ban_score(), callback);
}

fn handle_misbehaved_peer(
&mut self,
peer_id: PeerId,
error: P2pError,
callback: &mut impl FnMut(CrawlerCommand),
) {
let ban_score = error.ban_score();

if ban_score > 0 {
log::debug!("handling misbehaved peer, peer_id: {peer_id}");

let peer = self
.outbound_peers
.get_mut(&peer_id)
.expect("peer must be known (handle_misbehaved_peer)");
peer.ban_score = peer.ban_score.saturating_add(ban_score);

log::info!(
"Adjusting peer ban score for peer {peer_id}, adjustment: {ban_score}, new score: {}",
peer.ban_score
);

let address = peer.address;
let new_score = peer.ban_score;
self.handle_new_ban_score(&address, new_score, callback);
}
}

fn handle_new_ban_score(
&mut self,
address: &SocketAddress,
new_ban_score: u32,
callback: &mut impl FnMut(CrawlerCommand),
) {
let ban_until = (self.now + *self.config.ban_duration).expect("Unexpected ban duration");

if new_ban_score >= *self.config.ban_threshold {
let address = address.as_bannable();

log::info!("Ban threshold for address {address} reached");

self.disconnect_all(&address, callback);
callback(CrawlerCommand::MarkAsBanned { address, ban_until });
self.banned_addresses.insert(address, ban_until);
}
}

fn disconnect_all(
&mut self,
address: &BannableAddress,
callback: &mut impl FnMut(CrawlerCommand),
) {
let to_disconnect = self
.outbound_peers
.iter()
.filter_map(|(peer_id, peer)| {
if peer.address.as_bannable() == *address {
Some((*peer_id, peer.address))
} else {
None
}
})
.collect::<Vec<_>>();

for (peer_id, peer_address) in to_disconnect {
self.disconnect_peer(peer_id, &peer_address, callback);
}
}

fn handle_disconnected(&mut self, peer_id: PeerId, callback: &mut impl FnMut(CrawlerCommand)) {
Expand Down Expand Up @@ -252,6 +353,7 @@ impl Crawler {
let peer = Peer {
address,
address_rate_limiter,
ban_score: 0,
};

let old_peer = self.outbound_peers.insert(peer_id, peer);
Expand All @@ -266,12 +368,12 @@ impl Crawler {
is_compatible
);

let address_data = self
.addresses
.get_mut(&address)
.expect("address must be known (create_outbound_peer)");

if is_compatible {
let address_data = self
.addresses
.get_mut(&address)
.expect("address must be known (create_outbound_peer)");

Self::change_address_state(
self.now,
&address,
Expand All @@ -280,18 +382,32 @@ impl Crawler {
callback,
);
} else {
callback(CrawlerCommand::Disconnect { peer_id });

Self::change_address_state(
self.now,
&address,
address_data,
AddressStateTransitionTo::Disconnecting,
callback,
);
self.disconnect_peer(peer_id, &address, callback);
}
}

fn disconnect_peer(
&mut self,
peer_id: PeerId,
address: &SocketAddress,
callback: &mut impl FnMut(CrawlerCommand),
) {
let address_data = self
.addresses
.get_mut(address)
.expect("address must be known (disconnect_peer)");

callback(CrawlerCommand::Disconnect { peer_id });

Self::change_address_state(
self.now,
address,
address_data,
AddressStateTransitionTo::Disconnecting,
callback,
);
}

/// Remove existing outbound peer
fn remove_outbound_peer(&mut self, peer_id: PeerId, callback: &mut impl FnMut(CrawlerCommand)) {
log::debug!("outbound peer removed, peer_id: {}", peer_id);
Expand Down Expand Up @@ -319,10 +435,23 @@ impl Crawler {
///
/// Select random addresses to connect to, delete old addresses from memory and DB.
fn heartbeat(&mut self, callback: &mut impl FnMut(CrawlerCommand), rng: &mut impl Rng) {
self.banned_addresses.retain(|address, banned_until| {
let banned = self.now < *banned_until;

if !banned {
callback(CrawlerCommand::RemoveBannedStatus { address: *address });
}

banned
});

let connecting_addresses = self
.addresses
.iter_mut()
.filter(|(_address, address_data)| address_data.connect_now(self.now))
.filter(|(address, address_data)| {
address_data.connect_now(self.now)
&& self.banned_addresses.get(&address.as_bannable()).is_none()
})
.choose_multiple(rng, MAX_CONNECTS_PER_HEARTBEAT);

for (address, address_data) in connecting_addresses {
Expand Down Expand Up @@ -370,6 +499,9 @@ impl Crawler {
CrawlerEvent::ConnectionError { address, error } => {
self.handle_connection_error(address, error, callback);
}
CrawlerEvent::Misbehaved { peer_id, error } => {
self.handle_misbehaved_peer(peer_id, error, callback)
}
}
}
}
Expand Down
Loading