diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 8baf49a9df..3ad99549a4 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -560,17 +560,10 @@ where if let Some(pending_peer) = self.pending.get(&peer_id) { log::debug!("Sending ConnectivityEvent::HandshakeFailed for peer {peer_id}"); - let send_result = self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed { + self.conn_event_tx.send(ConnectivityEvent::HandshakeFailed { address: pending_peer.address, error, - }); - if let Err(send_error) = send_result { - log::error!( - "Unable to report a failed handshake for peer {} to the front end: {}", - peer_id, - send_error - ); - } + })?; } else { log::error!("Cannot find pending peer for peer id {peer_id}"); } @@ -607,6 +600,12 @@ where Ok(()) } + + PeerEvent::Misbehaved { error } => { + self.conn_event_tx.send(ConnectivityEvent::Misbehaved { peer_id, error })?; + + Ok(()) + } } } diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 7510ed2a9d..07cd29d561 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -270,11 +270,15 @@ where ) -> crate::Result<()> { match msg.categorize() { CategorizedMessage::Handshake(_) => { - // TODO: this must be reported to the peer manager, so that it can adjust - // the peer's ban score. (We may add a separate PeerEvent for this and Backend - // can then use the now unused ConnectivityEvent::Misbehaved to forward the error - // to the peer manager.) log::error!("Peer {peer_id} sent unexpected handshake message"); + + peer_event_tx + .send(PeerEvent::Misbehaved { + error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage( + "Unexpected handshake message".to_owned(), + )), + }) + .await?; } CategorizedMessage::PeerManagerMessage(msg) => { peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await? diff --git a/p2p/src/net/default_backend/types.rs b/p2p/src/net/default_backend/types.rs index d5ba89fe41..db311c403b 100644 --- a/p2p/src/net/default_backend/types.rs +++ b/p2p/src/net/default_backend/types.rs @@ -104,6 +104,9 @@ pub enum PeerEvent { /// Message received from remote MessageReceived { message: PeerManagerMessage }, + + /// Protocol violation + Misbehaved { error: P2pError }, } /// Events sent by Backend to Peer diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 156d872cd5..73264dcb62 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -173,6 +173,9 @@ where subscribed_to_peer_addresses: BTreeSet, peer_eviction_random_state: peers_eviction::RandomState, + + /// PeerManager's observer for use by tests. + observer: Option>, } /// Takes IP or socket address and converts it to socket address (adding the default peer port if IP address is used) @@ -196,6 +199,26 @@ where peer_mgr_event_rx: mpsc::UnboundedReceiver, time_getter: TimeGetter, peerdb_storage: S, + ) -> crate::Result { + Self::new_with_observer( + chain_config, + p2p_config, + handle, + peer_mgr_event_rx, + time_getter, + peerdb_storage, + None, + ) + } + + pub fn new_with_observer( + chain_config: Arc, + p2p_config: Arc, + handle: T::ConnectivityHandle, + peer_mgr_event_rx: mpsc::UnboundedReceiver, + time_getter: TimeGetter, + peerdb_storage: S, + observer: Option>, ) -> crate::Result { let mut rng = make_pseudo_rng(); let peerdb = peerdb::PeerDb::new( @@ -218,6 +241,7 @@ where peerdb, subscribed_to_peer_addresses: BTreeSet::new(), peer_eviction_random_state: peers_eviction::RandomState::new(&mut rng), + observer, }) } @@ -367,6 +391,10 @@ where peer.score ); + if let Some(o) = self.observer.as_mut() { + o.on_peer_ban_score_adjustment(peer.address, peer.score) + } + if peer.score >= *self.p2p_config.ban_threshold { let address = peer.address.as_bannable(); self.ban(address); @@ -391,6 +419,10 @@ where return; } + if let Some(o) = self.observer.as_mut() { + o.on_peer_ban_score_adjustment(peer_address, score); + } + if score >= *self.p2p_config.ban_threshold { let address = peer_address.as_bannable(); self.ban(address); @@ -414,6 +446,10 @@ where self.peerdb.ban(address); + if let Some(o) = self.observer.as_mut() { + o.on_peer_ban(address); + } + for peer_id in to_disconnect { self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None); } @@ -1538,11 +1574,21 @@ where self.run_internal(None).await } + #[cfg(test)] + pub fn peers(&self) -> &BTreeMap { + &self.peers + } + #[cfg(test)] pub fn peerdb(&self) -> &peerdb::PeerDb { &self.peerdb } } +pub trait Observer { + fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32); + fn on_peer_ban(&mut self, address: BannableAddress); +} + #[cfg(test)] mod tests; diff --git a/p2p/src/peer_manager/tests/ping.rs b/p2p/src/peer_manager/tests/ping.rs index 990c950d68..d69d4c7049 100644 --- a/p2p/src/peer_manager/tests/ping.rs +++ b/p2p/src/peer_manager/tests/ping.rs @@ -110,7 +110,7 @@ async fn ping_timeout() { }) .unwrap(); - let event = expect_recv!(&mut cmd_rx); + let event = expect_recv!(cmd_rx); match event { Command::Accept { peer_id: _ } => {} _ => panic!("unexpected event: {event:?}"), @@ -120,7 +120,7 @@ async fn ping_timeout() { for _ in 0..5 { time_getter.advance_time(ping_check_period); - let cmd = expect_recv!(&mut cmd_rx); + let cmd = expect_recv!(cmd_rx); let (peer_id, peer_msg) = cmd_to_peer_man_msg(cmd); let nonce = assert_matches_return_val!( peer_msg, @@ -138,7 +138,7 @@ async fn ping_timeout() { // Receive one more ping request but do not send a ping response time_getter.advance_time(ping_check_period); - let cmd = expect_recv!(&mut cmd_rx); + let cmd = expect_recv!(cmd_rx); let (_, peer_msg) = cmd_to_peer_man_msg(cmd); assert_matches!( peer_msg, @@ -148,7 +148,7 @@ async fn ping_timeout() { time_getter.advance_time(ping_timeout); // PeerManager should ask backend to close connection - let event = expect_recv!(&mut cmd_rx); + let event = expect_recv!(cmd_rx); match event { Command::Disconnect { peer_id } => { conn_tx.send(ConnectivityEvent::ConnectionClosed { peer_id }).unwrap(); diff --git a/p2p/src/sync/tests/helpers/mod.rs b/p2p/src/sync/tests/helpers/mod.rs index 74a32bf277..0a3415ba64 100644 --- a/p2p/src/sync/tests/helpers/mod.rs +++ b/p2p/src/sync/tests/helpers/mod.rs @@ -212,7 +212,7 @@ impl TestNode { /// Receives a message from the sync manager. pub async fn message(&mut self) -> (PeerId, SyncMessage) { - expect_recv!(&mut self.sync_msg_receiver) + expect_recv!(self.sync_msg_receiver) } /// Try to receive a message from the sync manager. diff --git a/p2p/src/tests/correct_handshake.rs b/p2p/src/tests/correct_handshake.rs index ac66d18ea8..d7c9b23c3f 100644 --- a/p2p/src/tests/correct_handshake.rs +++ b/p2p/src/tests/correct_handshake.rs @@ -40,7 +40,7 @@ where let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::::start( + let mut test_node = TestNode::::start( Arc::clone(&chain_config), Arc::clone(&p2p_config), TTM::make_address(), @@ -82,11 +82,18 @@ where // which one it is though). let _msg = msg_stream.recv().await.unwrap(); + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_banning().await; + let test_node_remnants = test_node.join().await; - assert_eq!( - test_node_remnants.peer_mgr.peerdb().list_banned().count(), - 0 - ); + + let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count(); + assert_eq!(bans_count, 0); + + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + assert_eq!(peer_score, 0); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -112,7 +119,7 @@ where let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::::start( + let mut test_node = TestNode::::start( Arc::clone(&chain_config), Arc::clone(&p2p_config), TTM::make_address(), @@ -149,11 +156,18 @@ where // which one it is though). let _msg = msg_stream.recv().await.unwrap(); + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_banning().await; + let test_node_remnants = test_node.join().await; - assert_eq!( - test_node_remnants.peer_mgr.peerdb().list_banned().count(), - 0 - ); + + let bans_count = test_node_remnants.peer_mgr.peerdb().list_banned().count(); + assert_eq!(bans_count, 0); + + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + assert_eq!(peer_score, 0); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/p2p/src/tests/helpers.rs b/p2p/src/tests/helpers.rs index 14b06d8d60..9e44cf00ba 100644 --- a/p2p/src/tests/helpers.rs +++ b/p2p/src/tests/helpers.rs @@ -16,15 +16,20 @@ //! A module for tests that behave like integration tests but still need access to private data //! via methods under #[cfg(test)], -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use futures::Future; -use p2p_test_utils::P2pBasicTestTimeGetter; -use p2p_types::{p2p_event::P2pEventHandler, socket_address::SocketAddress}; +use p2p_test_utils::{expect_recv, P2pBasicTestTimeGetter, LONG_TIMEOUT, SHORT_TIMEOUT}; +use p2p_types::{ + bannable_address::BannableAddress, p2p_event::P2pEventHandler, socket_address::SocketAddress, +}; use storage_inmemory::InMemory; use subsystem::manager::ShutdownTrigger; use tokio::{ - sync::{mpsc, oneshot}, + sync::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, task::JoinHandle, time, }; @@ -36,7 +41,7 @@ use crate::{ default_backend::{transport::TransportSocket, DefaultNetworkingService}, ConnectivityService, }, - peer_manager::{peerdb::storage_impl::PeerDbStorageImpl, PeerManager}, + peer_manager::{self, peerdb::storage_impl::PeerDbStorageImpl, PeerManager}, protocol::ProtocolVersion, sync::BlockSyncManager, testing_utils::{peerdb_inmemory_store, test_p2p_config, TestTransportMaker}, @@ -67,6 +72,7 @@ where sync_mgr_join_handle: JoinHandle, shutdown_trigger: ShutdownTrigger, subsystem_mgr_join_handle: subsystem::manager::ManagerJoinHandle, + peer_mgr_notification_rx: mpsc::UnboundedReceiver, } // This is what's left of a test node after it has been stopped. @@ -118,13 +124,17 @@ where let local_address = conn_handle.local_addresses()[0]; - let peer_mgr = PeerMgr::::new( + let (peer_mgr_notification_tx, peer_mgr_notification_rx) = mpsc::unbounded_channel(); + let peer_mgr_observer = Box::new(PeerManagerObserver::new(peer_mgr_notification_tx)); + + let peer_mgr = PeerMgr::::new_with_observer( Arc::clone(&chain_config), Arc::clone(&p2p_config), conn_handle, peer_mgr_event_rx, time_getter.get_time_getter(), peerdb_inmemory_store(), + Some(peer_mgr_observer), ) .unwrap(); let peer_mgr_join_handle = logging::spawn_in_current_span(async move { @@ -165,6 +175,7 @@ where sync_mgr_join_handle, shutdown_trigger, subsystem_mgr_join_handle, + peer_mgr_notification_rx, } } @@ -192,6 +203,31 @@ where connect_result_rx } + pub async fn expect_peer_mgr_notification(&mut self) -> PeerManagerNotification { + expect_recv!(self.peer_mgr_notification_rx) + } + + pub async fn expect_no_banning(&mut self) { + // Note: at the moment the loop is useless, because all existing notification types + // are related to banning, but it may change in the future. + time::timeout(SHORT_TIMEOUT, async { + #[allow(clippy::never_loop)] + loop { + match self.peer_mgr_notification_rx.recv().await.unwrap() { + PeerManagerNotification::BanScoreAdjustment { + address: _, + new_score: _, + } + | PeerManagerNotification::Ban { address: _ } => { + break; + } + } + } + }) + .await + .unwrap_err(); + } + pub async fn join(self) -> TestNodeRemnants { self.shutdown.store(true); let _ = self.shutdown_sender.send(()); @@ -215,5 +251,38 @@ where // TODO: in the case of timeout, a panic is likely to occur in an unrelated place, // e.g. "subsystem manager's handle hasn't been joined" is a common one. This can be // confusing, so we need a way to abort the test before some unrelated code decides to panic. - time::timeout(Duration::from_secs(30), future).await.unwrap(); + time::timeout(LONG_TIMEOUT, future).await.unwrap(); +} + +#[derive(Debug)] +pub enum PeerManagerNotification { + BanScoreAdjustment { + address: SocketAddress, + new_score: u32, + }, + Ban { + address: BannableAddress, + }, +} + +pub struct PeerManagerObserver { + event_tx: UnboundedSender, +} + +impl PeerManagerObserver { + pub fn new(event_tx: UnboundedSender) -> Self { + Self { event_tx } + } +} + +impl peer_manager::Observer for PeerManagerObserver { + fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32) { + self.event_tx + .send(PeerManagerNotification::BanScoreAdjustment { address, new_score }) + .unwrap(); + } + + fn on_peer_ban(&mut self, address: BannableAddress) { + self.event_tx.send(PeerManagerNotification::Ban { address }).unwrap(); + } } diff --git a/p2p/src/tests/incorrect_handshake.rs b/p2p/src/tests/incorrect_handshake.rs index 8d540197ff..b395886c5d 100644 --- a/p2p/src/tests/incorrect_handshake.rs +++ b/p2p/src/tests/incorrect_handshake.rs @@ -27,7 +27,7 @@ use crate::{ test_p2p_config, TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, TEST_PROTOCOL_VERSION, }, - tests::helpers::{timeout, TestNode}, + tests::helpers::{timeout, PeerManagerNotification, TestNode}, }; async fn incorrect_handshake_outgoing() @@ -38,7 +38,7 @@ where let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::::start( + let mut test_node = TestNode::::start( Arc::clone(&chain_config), Arc::clone(&p2p_config), TTM::make_address(), @@ -69,6 +69,10 @@ where // The connection should be closed. msg_stream.recv().await.unwrap_err(); + // This is mainly needed to ensure that the corresponding events, if any, reach + // peer manager before we end the test. + test_node.expect_no_banning().await; + // Note: no peer ban here, because peers are not banned during "manual outbound" connections. let test_node_remnants = test_node.join().await; assert_eq!( @@ -100,7 +104,7 @@ where let chain_config = Arc::new(common::chain::config::create_unit_test_config()); let p2p_config = Arc::new(test_p2p_config()); - let test_node = TestNode::::start( + let mut test_node = TestNode::::start( Arc::clone(&chain_config), Arc::clone(&p2p_config), TTM::make_address(), @@ -120,6 +124,16 @@ where // The connection should be closed. msg_stream.recv().await.unwrap_err(); + // This is mainly needed to ensure that the corresponding event reaches peer manager before + // we end the test. + assert_matches!( + test_node.expect_peer_mgr_notification().await, + PeerManagerNotification::BanScoreAdjustment { + address: _, + new_score: _ + } + ); + // The peer address should be banned. let test_node_remnants = test_node.join().await; // TODO: check the actual address instead of the count, same in other places. diff --git a/p2p/src/tests/misbehavior.rs b/p2p/src/tests/misbehavior.rs new file mode 100644 index 0000000000..9c09eb611b --- /dev/null +++ b/p2p/src/tests/misbehavior.rs @@ -0,0 +1,129 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chainstate::ban_score::BanScore; +use test_utils::assert_matches; + +use crate::{ + error::{P2pError, ProtocolError}, + net::default_backend::{ + transport::{BufferedTranscoder, TransportSocket}, + types::{HandshakeMessage, Message, P2pTimestamp}, + }, + testing_utils::{ + test_p2p_config, TestTransportChannel, TestTransportMaker, TestTransportNoise, + TestTransportTcp, TEST_PROTOCOL_VERSION, + }, + tests::helpers::{timeout, PeerManagerNotification, TestNode}, +}; + +async fn unexpected_handshake_message() +where + TTM: TestTransportMaker, + TTM::Transport: TransportSocket, +{ + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + TTM::make_address(), + TEST_PROTOCOL_VERSION.into(), + ) + .await; + + let transport = TTM::make_transport(); + + let stream = transport.connect(*test_node.local_address()).await.unwrap(); + + let mut msg_stream = BufferedTranscoder::new(stream, *p2p_config.max_message_size); + + // Send the normal Hello + msg_stream + .send(Message::Handshake(HandshakeMessage::Hello { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time( + test_node.time_getter().get_time_getter().get_time(), + ), + handshake_nonce: 0, + })) + .await + .unwrap(); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::HelloAck { .. })); + + // Check that the connection is still up and we can receive the next message (we don't care + // which one it is though). + let _msg = msg_stream.recv().await.unwrap(); + + // Send an unexpected Hello + msg_stream + .send(Message::Handshake(HandshakeMessage::Hello { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time( + test_node.time_getter().get_time_getter().get_time(), + ), + handshake_nonce: 0, + })) + .await + .unwrap(); + + // This is mainly needed to ensure that the corresponding event reaches peer manager before + // we end the test. + assert_matches!( + test_node.expect_peer_mgr_notification().await, + PeerManagerNotification::BanScoreAdjustment { + address: _, + new_score: _ + } + ); + + let test_node_remnants = test_node.join().await; + + assert_eq!(test_node_remnants.peer_mgr.peers().len(), 1); + let peer_score = test_node_remnants.peer_mgr.peers().first_key_value().unwrap().1.score; + let expected_score = + P2pError::ProtocolError(ProtocolError::UnexpectedMessage("".to_owned())).ban_score(); + assert_eq!(peer_score, expected_score); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unexpected_handshake_message_tcp() { + timeout(unexpected_handshake_message::()).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unexpected_handshake_message_channels() { + timeout(unexpected_handshake_message::()).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unexpected_handshake_message_noise() { + timeout(unexpected_handshake_message::()).await; +} diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index c912a5a472..80bcd3316c 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -18,6 +18,7 @@ mod correct_handshake; mod incorrect_handshake; +mod misbehavior; mod unsupported_version; pub mod helpers;