Skip to content

Commit d96dc62

Browse files
committed
Use ConnectivityEvent::Misbehaved to report unexpected handshakes; some cleanup
1 parent 6443562 commit d96dc62

File tree

11 files changed

+317
-38
lines changed

11 files changed

+317
-38
lines changed

Diff for: p2p/src/net/default_backend/backend.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -560,17 +560,10 @@ where
560560
if let Some(pending_peer) = self.pending.get(&peer_id) {
561561
log::debug!("Sending ConnectivityEvent::HandshakeFailed for peer {peer_id}");
562562

563-
let send_result = self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed {
563+
self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed {
564564
address: pending_peer.address,
565565
error,
566-
});
567-
if let Err(send_error) = send_result {
568-
log::error!(
569-
"Unable to report a failed handshake for peer {} to the front end: {}",
570-
peer_id,
571-
send_error
572-
);
573-
}
566+
})?;
574567
} else {
575568
log::error!("Cannot find pending peer for peer id {peer_id}");
576569
}
@@ -607,6 +600,12 @@ where
607600

608601
Ok(())
609602
}
603+
604+
PeerEvent::Misbehaved { error } => {
605+
self.conn_event_tx.send(ConnectivityEvent::Misbehaved { peer_id, error })?;
606+
607+
Ok(())
608+
}
610609
}
611610
}
612611

Diff for: p2p/src/net/default_backend/peer.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,15 @@ where
270270
) -> crate::Result<()> {
271271
match msg.categorize() {
272272
CategorizedMessage::Handshake(_) => {
273-
// TODO: this must be reported to the peer manager, so that it can adjust
274-
// the peer's ban score. (We may add a separate PeerEvent for this and Backend
275-
// can then use the now unused ConnectivityEvent::Misbehaved to forward the error
276-
// to the peer manager.)
277273
log::error!("Peer {peer_id} sent unexpected handshake message");
274+
275+
peer_event_tx
276+
.send(PeerEvent::Misbehaved {
277+
error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
278+
"Unexpected handshake message".to_owned(),
279+
)),
280+
})
281+
.await?;
278282
}
279283
CategorizedMessage::PeerManagerMessage(msg) => {
280284
peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await?

Diff for: p2p/src/net/default_backend/types.rs

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ pub enum PeerEvent {
104104

105105
/// Message received from remote
106106
MessageReceived { message: PeerManagerMessage },
107+
108+
/// Protocol violation
109+
Misbehaved { error: P2pError },
107110
}
108111

109112
/// Events sent by Backend to Peer

Diff for: p2p/src/peer_manager/mod.rs

+46
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ where
173173
subscribed_to_peer_addresses: BTreeSet<PeerId>,
174174

175175
peer_eviction_random_state: peers_eviction::RandomState,
176+
177+
/// PeerManager's observer for use by tests.
178+
observer: Option<Box<dyn Observer + Send>>,
176179
}
177180

178181
/// Takes IP or socket address and converts it to socket address (adding the default peer port if IP address is used)
@@ -196,6 +199,26 @@ where
196199
peer_mgr_event_rx: mpsc::UnboundedReceiver<PeerManagerEvent>,
197200
time_getter: TimeGetter,
198201
peerdb_storage: S,
202+
) -> crate::Result<Self> {
203+
Self::new_with_observer(
204+
chain_config,
205+
p2p_config,
206+
handle,
207+
peer_mgr_event_rx,
208+
time_getter,
209+
peerdb_storage,
210+
None,
211+
)
212+
}
213+
214+
pub fn new_with_observer(
215+
chain_config: Arc<ChainConfig>,
216+
p2p_config: Arc<P2pConfig>,
217+
handle: T::ConnectivityHandle,
218+
peer_mgr_event_rx: mpsc::UnboundedReceiver<PeerManagerEvent>,
219+
time_getter: TimeGetter,
220+
peerdb_storage: S,
221+
observer: Option<Box<dyn Observer + Send>>,
199222
) -> crate::Result<Self> {
200223
let mut rng = make_pseudo_rng();
201224
let peerdb = peerdb::PeerDb::new(
@@ -218,6 +241,7 @@ where
218241
peerdb,
219242
subscribed_to_peer_addresses: BTreeSet::new(),
220243
peer_eviction_random_state: peers_eviction::RandomState::new(&mut rng),
244+
observer,
221245
})
222246
}
223247

@@ -367,6 +391,10 @@ where
367391
peer.score
368392
);
369393

394+
if let Some(o) = self.observer.as_mut() {
395+
o.on_peer_ban_score_adjustment(peer.address, peer.score)
396+
}
397+
370398
if peer.score >= *self.p2p_config.ban_threshold {
371399
let address = peer.address.as_bannable();
372400
self.ban(address);
@@ -391,6 +419,10 @@ where
391419
return;
392420
}
393421

422+
if let Some(o) = self.observer.as_mut() {
423+
o.on_peer_ban_score_adjustment(peer_address, score);
424+
}
425+
394426
if score >= *self.p2p_config.ban_threshold {
395427
let address = peer_address.as_bannable();
396428
self.ban(address);
@@ -414,6 +446,10 @@ where
414446

415447
self.peerdb.ban(address);
416448

449+
if let Some(o) = self.observer.as_mut() {
450+
o.on_peer_ban(address);
451+
}
452+
417453
for peer_id in to_disconnect {
418454
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
419455
}
@@ -1538,11 +1574,21 @@ where
15381574
self.run_internal(None).await
15391575
}
15401576

1577+
#[cfg(test)]
1578+
pub fn peers(&self) -> &BTreeMap<PeerId, PeerContext> {
1579+
&self.peers
1580+
}
1581+
15411582
#[cfg(test)]
15421583
pub fn peerdb(&self) -> &peerdb::PeerDb<S> {
15431584
&self.peerdb
15441585
}
15451586
}
15461587

1588+
pub trait Observer {
1589+
fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32);
1590+
fn on_peer_ban(&mut self, address: BannableAddress);
1591+
}
1592+
15471593
#[cfg(test)]
15481594
mod tests;

Diff for: p2p/src/peer_manager/tests/ping.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async fn ping_timeout() {
110110
})
111111
.unwrap();
112112

113-
let event = expect_recv!(&mut cmd_rx);
113+
let event = expect_recv!(cmd_rx);
114114
match event {
115115
Command::Accept { peer_id: _ } => {}
116116
_ => panic!("unexpected event: {event:?}"),
@@ -120,7 +120,7 @@ async fn ping_timeout() {
120120
for _ in 0..5 {
121121
time_getter.advance_time(ping_check_period);
122122

123-
let cmd = expect_recv!(&mut cmd_rx);
123+
let cmd = expect_recv!(cmd_rx);
124124
let (peer_id, peer_msg) = cmd_to_peer_man_msg(cmd);
125125
let nonce = assert_matches_return_val!(
126126
peer_msg,
@@ -138,7 +138,7 @@ async fn ping_timeout() {
138138

139139
// Receive one more ping request but do not send a ping response
140140
time_getter.advance_time(ping_check_period);
141-
let cmd = expect_recv!(&mut cmd_rx);
141+
let cmd = expect_recv!(cmd_rx);
142142
let (_, peer_msg) = cmd_to_peer_man_msg(cmd);
143143
assert_matches!(
144144
peer_msg,
@@ -148,7 +148,7 @@ async fn ping_timeout() {
148148
time_getter.advance_time(ping_timeout);
149149

150150
// PeerManager should ask backend to close connection
151-
let event = expect_recv!(&mut cmd_rx);
151+
let event = expect_recv!(cmd_rx);
152152
match event {
153153
Command::Disconnect { peer_id } => {
154154
conn_tx.send(ConnectivityEvent::ConnectionClosed { peer_id }).unwrap();

Diff for: p2p/src/sync/tests/helpers/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl TestNode {
212212

213213
/// Receives a message from the sync manager.
214214
pub async fn message(&mut self) -> (PeerId, SyncMessage) {
215-
expect_recv!(&mut self.sync_msg_receiver)
215+
expect_recv!(self.sync_msg_receiver)
216216
}
217217

218218
/// Try to receive a message from the sync manager.

Diff for: p2p/src/tests/correct_handshake.rs

+24-10
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ where
4040
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
4141
let p2p_config = Arc::new(test_p2p_config());
4242

43-
let test_node = TestNode::<TTM>::start(
43+
let mut test_node = TestNode::<TTM>::start(
4444
Arc::clone(&chain_config),
4545
Arc::clone(&p2p_config),
4646
TTM::make_address(),
@@ -82,11 +82,18 @@ where
8282
// which one it is though).
8383
let _msg = msg_stream.recv().await.unwrap();
8484

85+
// This is mainly needed to ensure that the corresponding events, if any, reach
86+
// peer manager before we end the test.
87+
test_node.expect_no_banning().await;
88+
8589
let test_node_remnants = test_node.join().await;
86-
assert_eq!(
87-
test_node_remnants.peer_mgr.peerdb().list_banned().count(),
88-
0
89-
);
90+
91+
let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count();
92+
assert_eq!(bans_count, 0);
93+
94+
assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1);
95+
let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score;
96+
assert_eq!(peer_score, 0);
9097
}
9198

9299
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -112,7 +119,7 @@ where
112119
let chain_config = Arc::new(common::chain::config::create_unit_test_config());
113120
let p2p_config = Arc::new(test_p2p_config());
114121

115-
let test_node = TestNode::<TTM>::start(
122+
let mut test_node = TestNode::<TTM>::start(
116123
Arc::clone(&chain_config),
117124
Arc::clone(&p2p_config),
118125
TTM::make_address(),
@@ -149,11 +156,18 @@ where
149156
// which one it is though).
150157
let _msg = msg_stream.recv().await.unwrap();
151158

159+
// This is mainly needed to ensure that the corresponding events, if any, reach
160+
// peer manager before we end the test.
161+
test_node.expect_no_banning().await;
162+
152163
let test_node_remnants = test_node.join().await;
153-
assert_eq!(
154-
test_node_remnants.peer_mgr.peerdb().list_banned().count(),
155-
0
156-
);
164+
165+
let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count();
166+
assert_eq!(bans_count, 0);
167+
168+
assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1);
169+
let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score;
170+
assert_eq!(peer_score, 0);
157171
}
158172

159173
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

0 commit comments

Comments
 (0)