Skip to content

Commit 26eec72

Browse files
authored
Merge pull request #1244 from mintlayer/p2p_crawler_peers_ban
P2p crawler peers ban
2 parents e3c8196 + fbe581c commit 26eec72

File tree

18 files changed

+1016
-188
lines changed

18 files changed

+1016
-188
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dns_server/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ rust-version.workspace = true
88
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
99

1010
[dependencies]
11+
chainstate = { path = "../chainstate" }
1112
common = { path = "../common" }
1213
crypto = { path = "../crypto" }
1314
logging = { path = "../logging" }

dns_server/src/crawler_p2p/crawler/mod.rs

+151-19
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,16 @@ use std::{
3434
time::Duration,
3535
};
3636

37+
use chainstate::ban_score::BanScore;
3738
use common::{chain::ChainConfig, primitives::time::Time};
3839
use crypto::random::{seq::IteratorRandom, Rng};
3940
use logging::log;
4041
use p2p::{
42+
config::{BanDuration, BanThreshold},
4143
error::P2pError,
4244
net::types::PeerInfo,
4345
peer_manager::{ADDR_RATE_BUCKET_SIZE, ADDR_RATE_INITIAL_SIZE, MAX_ADDR_RATE_PER_SECOND},
44-
types::{peer_id::PeerId, socket_address::SocketAddress},
46+
types::{bannable_address::BannableAddress, peer_id::PeerId, socket_address::SocketAddress},
4547
utils::rate_limiter::RateLimiter,
4648
};
4749

@@ -52,22 +54,35 @@ use self::address_data::{AddressData, AddressState};
5254
/// How many outbound connection attempts can be made per heartbeat
5355
const MAX_CONNECTS_PER_HEARTBEAT: usize = 25;
5456

57+
#[derive(Clone)]
58+
pub struct CrawlerConfig {
59+
pub ban_threshold: BanThreshold,
60+
pub ban_duration: BanDuration,
61+
}
62+
5563
/// The `Crawler` is the component that communicates with Mintlayer peers using p2p,
5664
/// and based on the results, commands the DNS server to add/remove ip addresses.
5765
/// The `Crawler` emits events that communicate whether addresses were reached or,
5866
/// are unreachable anymore.
5967
pub struct Crawler {
68+
/// Current time. This value is advanced explicitly by the caller code.
6069
now: Time,
6170

6271
/// Chain config
6372
chain_config: Arc<ChainConfig>,
6473

74+
/// Crawler config
75+
config: CrawlerConfig,
76+
6577
/// Map of all known addresses (including currently unreachable); these addresses
6678
/// will be periodically tested, and reachable addresses will be handed
6779
/// to the DNS server to be returned to the user on DNS queries,
6880
/// and unreachable addresses will be removed from the DNS server
6981
addresses: BTreeMap<SocketAddress, AddressData>,
7082

83+
/// Banned addresses.
84+
banned_addresses: BTreeMap<BannableAddress, Time>,
85+
7186
/// Map of all currently connected outbound peers that we successfully
7287
/// reached and are still connected to (generally speaking,
7388
/// we don't have to stay connected to those peers, but this is an implementation detail)
@@ -77,6 +92,7 @@ pub struct Crawler {
7792
struct Peer {
7893
address: SocketAddress,
7994
address_rate_limiter: RateLimiter,
95+
ban_score: u32,
8096
}
8197

8298
pub enum CrawlerEvent {
@@ -98,6 +114,10 @@ pub enum CrawlerEvent {
98114
address: SocketAddress,
99115
error: P2pError,
100116
},
117+
Misbehaved {
118+
peer_id: PeerId,
119+
error: P2pError,
120+
},
101121
}
102122

103123
pub enum CrawlerCommand {
@@ -112,16 +132,24 @@ pub enum CrawlerCommand {
112132
old_state: AddressState,
113133
new_state: AddressState,
114134
},
135+
MarkAsBanned {
136+
address: BannableAddress,
137+
ban_until: Time,
138+
},
139+
RemoveBannedStatus {
140+
address: BannableAddress,
141+
},
115142
}
116143

117144
impl Crawler {
118145
pub fn new(
146+
now: Time,
119147
chain_config: Arc<ChainConfig>,
148+
config: CrawlerConfig,
120149
loaded_addresses: BTreeSet<SocketAddress>,
150+
loaded_banned_addresses: BTreeMap<BannableAddress, Time>,
121151
reserved_addresses: BTreeSet<SocketAddress>,
122152
) -> Self {
123-
let now = common::primitives::time::get_time();
124-
125153
let addresses = loaded_addresses
126154
.union(&reserved_addresses)
127155
.map(|addr| {
@@ -142,7 +170,9 @@ impl Crawler {
142170
Self {
143171
now,
144172
chain_config,
173+
config,
145174
addresses,
175+
banned_addresses: loaded_banned_addresses,
146176
outbound_peers: BTreeMap::new(),
147177
}
148178
}
@@ -153,7 +183,7 @@ impl Crawler {
153183
peer_info: PeerInfo,
154184
callback: &mut impl FnMut(CrawlerCommand),
155185
) {
156-
log::info!("connected open, peer_id: {}", peer_info.peer_id);
186+
log::info!("connection opened, peer_id: {}", peer_info.peer_id);
157187
self.create_outbound_peer(peer_info.peer_id, address, peer_info, callback);
158188
}
159189

@@ -177,6 +207,77 @@ impl Crawler {
177207
AddressStateTransitionTo::Disconnected,
178208
callback,
179209
);
210+
211+
self.handle_new_ban_score(&address, error.ban_score(), callback);
212+
}
213+
214+
fn handle_misbehaved_peer(
215+
&mut self,
216+
peer_id: PeerId,
217+
error: P2pError,
218+
callback: &mut impl FnMut(CrawlerCommand),
219+
) {
220+
let ban_score = error.ban_score();
221+
222+
if ban_score > 0 {
223+
log::debug!("handling misbehaved peer, peer_id: {peer_id}");
224+
225+
let peer = self
226+
.outbound_peers
227+
.get_mut(&peer_id)
228+
.expect("peer must be known (handle_misbehaved_peer)");
229+
peer.ban_score = peer.ban_score.saturating_add(ban_score);
230+
231+
log::info!(
232+
"Adjusting peer ban score for peer {peer_id}, adjustment: {ban_score}, new score: {}",
233+
peer.ban_score
234+
);
235+
236+
let address = peer.address;
237+
let new_score = peer.ban_score;
238+
self.handle_new_ban_score(&address, new_score, callback);
239+
}
240+
}
241+
242+
fn handle_new_ban_score(
243+
&mut self,
244+
address: &SocketAddress,
245+
new_ban_score: u32,
246+
callback: &mut impl FnMut(CrawlerCommand),
247+
) {
248+
let ban_until = (self.now + *self.config.ban_duration).expect("Unexpected ban duration");
249+
250+
if new_ban_score >= *self.config.ban_threshold {
251+
let address = address.as_bannable();
252+
253+
log::info!("Ban threshold for address {address} reached");
254+
255+
self.disconnect_all(&address, callback);
256+
callback(CrawlerCommand::MarkAsBanned { address, ban_until });
257+
self.banned_addresses.insert(address, ban_until);
258+
}
259+
}
260+
261+
fn disconnect_all(
262+
&mut self,
263+
address: &BannableAddress,
264+
callback: &mut impl FnMut(CrawlerCommand),
265+
) {
266+
let to_disconnect = self
267+
.outbound_peers
268+
.iter()
269+
.filter_map(|(peer_id, peer)| {
270+
if peer.address.as_bannable() == *address {
271+
Some((*peer_id, peer.address))
272+
} else {
273+
None
274+
}
275+
})
276+
.collect::<Vec<_>>();
277+
278+
for (peer_id, peer_address) in to_disconnect {
279+
self.disconnect_peer(peer_id, &peer_address, callback);
280+
}
180281
}
181282

182283
fn handle_disconnected(&mut self, peer_id: PeerId, callback: &mut impl FnMut(CrawlerCommand)) {
@@ -252,6 +353,7 @@ impl Crawler {
252353
let peer = Peer {
253354
address,
254355
address_rate_limiter,
356+
ban_score: 0,
255357
};
256358

257359
let old_peer = self.outbound_peers.insert(peer_id, peer);
@@ -266,12 +368,12 @@ impl Crawler {
266368
is_compatible
267369
);
268370

269-
let address_data = self
270-
.addresses
271-
.get_mut(&address)
272-
.expect("address must be known (create_outbound_peer)");
273-
274371
if is_compatible {
372+
let address_data = self
373+
.addresses
374+
.get_mut(&address)
375+
.expect("address must be known (create_outbound_peer)");
376+
275377
Self::change_address_state(
276378
self.now,
277379
&address,
@@ -280,18 +382,32 @@ impl Crawler {
280382
callback,
281383
);
282384
} else {
283-
callback(CrawlerCommand::Disconnect { peer_id });
284-
285-
Self::change_address_state(
286-
self.now,
287-
&address,
288-
address_data,
289-
AddressStateTransitionTo::Disconnecting,
290-
callback,
291-
);
385+
self.disconnect_peer(peer_id, &address, callback);
292386
}
293387
}
294388

389+
fn disconnect_peer(
390+
&mut self,
391+
peer_id: PeerId,
392+
address: &SocketAddress,
393+
callback: &mut impl FnMut(CrawlerCommand),
394+
) {
395+
let address_data = self
396+
.addresses
397+
.get_mut(address)
398+
.expect("address must be known (disconnect_peer)");
399+
400+
callback(CrawlerCommand::Disconnect { peer_id });
401+
402+
Self::change_address_state(
403+
self.now,
404+
address,
405+
address_data,
406+
AddressStateTransitionTo::Disconnecting,
407+
callback,
408+
);
409+
}
410+
295411
/// Remove existing outbound peer
296412
fn remove_outbound_peer(&mut self, peer_id: PeerId, callback: &mut impl FnMut(CrawlerCommand)) {
297413
log::debug!("outbound peer removed, peer_id: {}", peer_id);
@@ -319,10 +435,23 @@ impl Crawler {
319435
///
320436
/// Select random addresses to connect to, delete old addresses from memory and DB.
321437
fn heartbeat(&mut self, callback: &mut impl FnMut(CrawlerCommand), rng: &mut impl Rng) {
438+
self.banned_addresses.retain(|address, banned_until| {
439+
let banned = self.now < *banned_until;
440+
441+
if !banned {
442+
callback(CrawlerCommand::RemoveBannedStatus { address: *address });
443+
}
444+
445+
banned
446+
});
447+
322448
let connecting_addresses = self
323449
.addresses
324450
.iter_mut()
325-
.filter(|(_address, address_data)| address_data.connect_now(self.now))
451+
.filter(|(address, address_data)| {
452+
address_data.connect_now(self.now)
453+
&& self.banned_addresses.get(&address.as_bannable()).is_none()
454+
})
326455
.choose_multiple(rng, MAX_CONNECTS_PER_HEARTBEAT);
327456

328457
for (address, address_data) in connecting_addresses {
@@ -370,6 +499,9 @@ impl Crawler {
370499
CrawlerEvent::ConnectionError { address, error } => {
371500
self.handle_connection_error(address, error, callback);
372501
}
502+
CrawlerEvent::Misbehaved { peer_id, error } => {
503+
self.handle_misbehaved_peer(peer_id, error, callback)
504+
}
373505
}
374506
}
375507
}

0 commit comments

Comments
 (0)