From 59be58eb5edd24a8cd8c396d44191641197c9788 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 12 Sep 2023 18:57:29 +0300 Subject: [PATCH 1/2] Use bounded channels for peer events --- .../src/transaction_verifier/mod.rs | 2 + p2p/src/net/default_backend/backend.rs | 21 ++-- p2p/src/net/default_backend/peer.rs | 113 ++++++++++-------- 3 files changed, 81 insertions(+), 55 deletions(-) diff --git a/chainstate/tx-verifier/src/transaction_verifier/mod.rs b/chainstate/tx-verifier/src/transaction_verifier/mod.rs index 350eff5004..81a62efe89 100644 --- a/chainstate/tx-verifier/src/transaction_verifier/mod.rs +++ b/chainstate/tx-verifier/src/transaction_verifier/mod.rs @@ -163,6 +163,8 @@ where accounting: A, verifier_config: TransactionVerifierConfig, ) -> Self { + // TODO: both "expect"s in this function may fire when exiting the node-gui app; + // get rid of them and return a proper Result. let best_block = storage .get_best_block_for_utxos() .expect("Database error while reading utxos best block"); diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 552cde0fed..e0d694c693 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -57,10 +57,15 @@ use super::{ types::{HandshakeNonce, Message, P2pTimestamp}, }; -/// Buffer size of the channel to the SyncManager peer task. -/// How many unprocessed messages can be sent before the peer's event loop is blocked. -// TODO: Decide what the optimal value is (for example, by comparing the initial block download time) -const SYNC_CHAN_BUF_SIZE: usize = 20; +/// Buffer sizes for the channels used by Peer to send peer messages to other parts of p2p. +/// +/// If the number of unprocessed messages exceeds this limit, the peer's event loop will be +/// blocked; this is needed to prevent DoS attacks where a peer would overload the node with +/// requests, which may lead to memory exhaustion. +/// Note: the value was chosen pretty much arbitrarily, but judging by the initial block download +/// time there is no difference between 20 and 10000, so the former seems reasonable enough. +const SYNC_MSG_CHAN_BUF_SIZE: usize = 20; +const PEER_EVENT_CHAN_BUF_SIZE: usize = 20; /// Active peer data struct PeerContext { @@ -125,10 +130,10 @@ pub struct Backend { /// Channel sender for sending events from Peers to Backend; this will be passed to each /// Peer upon its creation. - peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>, + peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, /// Channel receiver for receiving events from Peers - peer_event_rx: mpsc::UnboundedReceiver<(PeerId, PeerEvent)>, + peer_event_rx: mpsc::Receiver<(PeerId, PeerEvent)>, /// Channel sender for sending connectivity events to the frontend conn_event_tx: mpsc::UnboundedSender, @@ -171,7 +176,7 @@ where subscribers_receiver: mpsc::UnboundedReceiver, node_protocol_version: ProtocolVersion, ) -> Self { - let (peer_event_tx, peer_event_rx) = mpsc::unbounded_channel(); + let (peer_event_tx, peer_event_rx) = mpsc::channel(PEER_EVENT_CHAN_BUF_SIZE); Self { transport, socket, @@ -234,7 +239,7 @@ where .get_mut(&peer_id) .ok_or(P2pError::PeerError(PeerError::PeerDoesntExist))?; - let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_CHAN_BUF_SIZE); + let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_MSG_CHAN_BUF_SIZE); peer.backend_event_tx.send(BackendEvent::Accepted { sync_msg_tx })?; let old_value = peer.was_accepted.test_and_set(); diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 01d868a8d1..d62ea801b3 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -16,10 +16,7 @@ use std::{sync::Arc, time::Duration}; use p2p_types::services::Services; -use tokio::{ - sync::mpsc::{self, Sender}, - time::timeout, -}; +use tokio::{sync::mpsc, time::timeout}; use common::chain::ChainConfig; use logging::log; @@ -85,7 +82,7 @@ pub struct Peer { receiver_address: Option, /// Channel sender for sending events to Backend - peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>, + peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, /// Channel receiver for receiving events from Backend. backend_event_rx: mpsc::UnboundedReceiver, @@ -108,7 +105,7 @@ where p2p_config: Arc, socket: T::Stream, receiver_address: Option, - peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>, + peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, backend_event_rx: mpsc::UnboundedReceiver, node_protocol_version: ProtocolVersion, ) -> Self { @@ -178,18 +175,20 @@ where // Send PeerInfoReceived before sending handshake to remote peer! // Backend is expected to receive PeerInfoReceived before outgoing connection has chance to complete handshake, // It's required to reliably detect self-connects. - self.peer_event_tx.send(( - self.peer_id, - PeerEvent::PeerInfoReceived { - protocol_version: common_protocol_version, - network, - common_services, - user_agent, - software_version, - receiver_address, - handshake_nonce, - }, - ))?; + self.peer_event_tx + .send(( + self.peer_id, + PeerEvent::PeerInfoReceived { + protocol_version: common_protocol_version, + network, + common_services, + user_agent, + software_version, + receiver_address, + handshake_nonce, + }, + )) + .await?; self.socket .send(Message::Handshake(HandshakeMessage::HelloAck { @@ -247,31 +246,34 @@ where let common_protocol_version = choose_common_protocol_version(protocol_version, self.node_protocol_version)?; - self.peer_event_tx.send(( - self.peer_id, - PeerEvent::PeerInfoReceived { - protocol_version: common_protocol_version, - network, - common_services, - user_agent, - software_version, - receiver_address, - handshake_nonce, - }, - ))?; + self.peer_event_tx + .send(( + self.peer_id, + PeerEvent::PeerInfoReceived { + protocol_version: common_protocol_version, + network, + common_services, + user_agent, + software_version, + receiver_address, + handshake_nonce, + }, + )) + .await?; } } Ok(()) } + // Note: the channels used by this function to propagate messages to other parts of p2p + // must be bounded; this is important to prevent DoS attacks. async fn handle_socket_msg( peer: PeerId, msg: Message, - peer_event_tx: &mut mpsc::UnboundedSender<(PeerId, PeerEvent)>, - sync_msg_tx: &mut Sender, + peer_event_tx: &mut mpsc::Sender<(PeerId, PeerEvent)>, + sync_msg_tx: &mut mpsc::Sender, ) -> crate::Result<()> { - // TODO: Use a bounded channel to send messages to the peer manager match msg.categorize() { CategorizedMessage::Handshake(_) => { // TODO: this must be reported to the peer manager, so that it can adjust @@ -281,7 +283,7 @@ where log::error!("Peer {peer} sent unexpected handshake message"); } CategorizedMessage::PeerManagerMessage(msg) => { - peer_event_tx.send((peer, PeerEvent::MessageReceived { message: msg }))? + peer_event_tx.send((peer, PeerEvent::MessageReceived { message: msg })).await? } CategorizedMessage::SyncMessage(msg) => sync_msg_tx.send(msg).await?, } @@ -289,7 +291,7 @@ where Ok(()) } - pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> { + async fn run_impl(&mut self, local_time: P2pTimestamp) -> crate::Result<()> { // handshake with remote peer and send peer's info to backend let handshake_res = timeout(PEER_HANDSHAKE_TIMEOUT, self.handshake(local_time)).await; match handshake_res { @@ -297,10 +299,13 @@ where Ok(Err(err)) => { log::debug!("handshake failed for peer {}: {err}", self.peer_id); - let send_result = self.peer_event_tx.send(( - self.peer_id, - PeerEvent::HandshakeFailed { error: err.clone() }, - )); + let send_result = self + .peer_event_tx + .send(( + self.peer_id, + PeerEvent::HandshakeFailed { error: err.clone() }, + )) + .await; if let Err(send_error) = send_result { log::error!( "Cannot send PeerEvent::HandshakeFailed to peer {}: {}", @@ -348,11 +353,23 @@ where } } } -} -impl Drop for Peer { - fn drop(&mut self) { - let _ = self.peer_event_tx.send((self.peer_id, PeerEvent::ConnectionClosed)); + pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> { + let run_result = self.run_impl(local_time).await; + let send_result = + self.peer_event_tx.send((self.peer_id, PeerEvent::ConnectionClosed)).await; + + if let Err(send_error) = send_result { + // Note: this situation is likely to happen if the connection is already closed, + // so it's not really an error. + log::debug!( + "Unable to send PeerEvent::ConnectionClosed to Backend for peer {}: {}", + self.peer_id, + send_error + ); + } + + run_result } } @@ -375,6 +392,8 @@ mod tests { use chainstate::Locator; use futures::FutureExt; + const TEST_CHAN_BUF_SIZE: usize = 100; + async fn handshake_inbound() where A: TestTransportMaker, @@ -383,7 +402,7 @@ mod tests { let (socket1, socket2) = get_two_connected_sockets::().await; let chain_config = Arc::new(common::chain::config::create_mainnet()); let p2p_config = Arc::new(test_p2p_config()); - let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE); let (_tx2, rx2) = mpsc::unbounded_channel(); let peer_id2 = PeerId::new(); @@ -458,7 +477,7 @@ mod tests { let (socket1, socket2) = get_two_connected_sockets::().await; let chain_config = Arc::new(common::chain::config::create_mainnet()); let p2p_config = Arc::new(test_p2p_config()); - let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE); let (_tx2, rx2) = mpsc::unbounded_channel(); let peer_id3 = PeerId::new(); @@ -538,7 +557,7 @@ mod tests { let (socket1, socket2) = get_two_connected_sockets::().await; let chain_config = Arc::new(common::chain::config::create_mainnet()); let p2p_config = Arc::new(test_p2p_config()); - let (tx1, _rx1) = mpsc::unbounded_channel(); + let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE); let (_tx2, rx2) = mpsc::unbounded_channel(); let peer_id3 = PeerId::new(); @@ -599,7 +618,7 @@ mod tests { let (socket1, socket2) = get_two_connected_sockets::().await; let chain_config = Arc::new(common::chain::config::create_mainnet()); let p2p_config = Arc::new(test_p2p_config()); - let (tx1, _rx1) = mpsc::unbounded_channel(); + let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE); let (_tx2, rx2) = mpsc::unbounded_channel(); let peer_id2 = PeerId::new(); From b2e912211ed29abfe70b936129210488094b3978 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Mon, 18 Sep 2023 20:48:40 +0300 Subject: [PATCH 2/2] Use separate PeerEvent channels for each peer in the backend --- Cargo.lock | 1 + Cargo.toml | 1 + .../src/transaction_verifier/mod.rs | 1 + p2p/Cargo.toml | 1 + p2p/src/net/default_backend/backend.rs | 39 ++++----- p2p/src/net/default_backend/peer.rs | 85 ++++++++----------- 6 files changed, 60 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9357b416c5..f112099854 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4430,6 +4430,7 @@ dependencies = [ "thiserror", "tokio", "tokio-socks", + "tokio-stream", "tokio-util", "utils", ] diff --git a/Cargo.toml b/Cargo.toml index 91ef40e927..ecf71d4486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -196,6 +196,7 @@ testing_logger = "0.1" thiserror = "1.0" tokio = { version = "1.27", default-features = false } tokio-socks = "0.5" +tokio-stream = "0.1" tokio-util = { version = "0.7", default-features = false } toml = "0.8" tower = "0.4" diff --git a/chainstate/tx-verifier/src/transaction_verifier/mod.rs b/chainstate/tx-verifier/src/transaction_verifier/mod.rs index 81a62efe89..6276bc0e44 100644 --- a/chainstate/tx-verifier/src/transaction_verifier/mod.rs +++ b/chainstate/tx-verifier/src/transaction_verifier/mod.rs @@ -165,6 +165,7 @@ where ) -> Self { // TODO: both "expect"s in this function may fire when exiting the node-gui app; // get rid of them and return a proper Result. + // See https://github.com/mintlayer/mintlayer-core/issues/1221 let best_block = storage .get_best_block_for_utxos() .expect("Database error while reading utxos best block"); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 776d838b12..da051f203f 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -41,6 +41,7 @@ sscanf.workspace = true tap.workspace = true thiserror.workspace = true tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"] } +tokio-stream.workspace = true tokio-socks.workspace = true tokio-util = { workspace = true, default-features = false, features = ["codec"] } diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index e0d694c693..a4e59b41d5 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -19,12 +19,13 @@ use std::{collections::HashMap, sync::Arc}; -use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt}; use p2p_types::socket_address::SocketAddress; use tokio::{ sync::{mpsc, oneshot}, time::timeout, }; +use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap}; use common::{ chain::ChainConfig, @@ -62,10 +63,14 @@ use super::{ /// If the number of unprocessed messages exceeds this limit, the peer's event loop will be /// blocked; this is needed to prevent DoS attacks where a peer would overload the node with /// requests, which may lead to memory exhaustion. -/// Note: the value was chosen pretty much arbitrarily, but judging by the initial block download -/// time there is no difference between 20 and 10000, so the former seems reasonable enough. -const SYNC_MSG_CHAN_BUF_SIZE: usize = 20; -const PEER_EVENT_CHAN_BUF_SIZE: usize = 20; +/// Note: the values were chosen pretty much arbitrarily; the sync messages channel has a lower +/// limit because it's used to send blocks, so its messages can be up to 1Mb in size; peer events, +/// on the other hand, are small. +/// Also note that basic tests of initial block download time showed that there is no real +/// difference between 20 and 10000 for both of the limits here. These results, of course, depend +/// on the hardware and internet connection, so we've chosen larger limits. +const SYNC_MSG_CHAN_BUF_SIZE: usize = 100; +const PEER_EVENT_CHAN_BUF_SIZE: usize = 1000; /// Active peer data struct PeerContext { @@ -128,12 +133,8 @@ pub struct Backend { /// Pending connections pending: HashMap, - /// Channel sender for sending events from Peers to Backend; this will be passed to each - /// Peer upon its creation. - peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, - - /// Channel receiver for receiving events from Peers - peer_event_rx: mpsc::Receiver<(PeerId, PeerEvent)>, + /// Map of streams for receiving events from peers. + peer_event_stream_map: StreamMap>, /// Channel sender for sending connectivity events to the frontend conn_event_tx: mpsc::UnboundedSender, @@ -176,7 +177,6 @@ where subscribers_receiver: mpsc::UnboundedReceiver, node_protocol_version: ProtocolVersion, ) -> Self { - let (peer_event_tx, peer_event_rx) = mpsc::channel(PEER_EVENT_CHAN_BUF_SIZE); Self { transport, socket, @@ -188,8 +188,7 @@ where syncing_event_tx, peers: HashMap::new(), pending: HashMap::new(), - peer_event_tx, - peer_event_rx, + peer_event_stream_map: StreamMap::new(), command_queue: FuturesUnordered::new(), shutdown, shutdown_receiver, @@ -297,13 +296,12 @@ where self.handle_command(command.ok_or(P2pError::ChannelClosed)?); }, // Process pending commands - callback = self.command_queue.select_next_some(), if !self.command_queue.is_empty() => { + Some(callback) = self.command_queue.next() => { callback(&mut self)?; }, // Handle peer events. - event = self.peer_event_rx.recv() => { - let (peer, event) = event.ok_or(P2pError::ChannelClosed)?; - self.handle_peer_event(peer, event)?; + Some((peer_id, event)) = self.peer_event_stream_map.next() => { + self.handle_peer_event(peer_id, event)?; }, // Accept a new peer connection. res = self.socket.accept() => { @@ -355,7 +353,10 @@ where Some(address.as_peer_address()) }; - let peer_event_tx = self.peer_event_tx.clone(); + let (peer_event_tx, peer_event_rx) = mpsc::channel(PEER_EVENT_CHAN_BUF_SIZE); + let peer_event_stream = ReceiverStream::new(peer_event_rx); + + self.peer_event_stream_map.insert(remote_peer_id, peer_event_stream); let peer = peer::Peer::::new( remote_peer_id, diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index d62ea801b3..83dec5d512 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -82,7 +82,7 @@ pub struct Peer { receiver_address: Option, /// Channel sender for sending events to Backend - peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, + peer_event_tx: mpsc::Sender, /// Channel receiver for receiving events from Backend. backend_event_rx: mpsc::UnboundedReceiver, @@ -105,7 +105,7 @@ where p2p_config: Arc, socket: T::Stream, receiver_address: Option, - peer_event_tx: mpsc::Sender<(PeerId, PeerEvent)>, + peer_event_tx: mpsc::Sender, backend_event_rx: mpsc::UnboundedReceiver, node_protocol_version: ProtocolVersion, ) -> Self { @@ -176,18 +176,15 @@ where // Backend is expected to receive PeerInfoReceived before outgoing connection has chance to complete handshake, // It's required to reliably detect self-connects. self.peer_event_tx - .send(( - self.peer_id, - PeerEvent::PeerInfoReceived { - protocol_version: common_protocol_version, - network, - common_services, - user_agent, - software_version, - receiver_address, - handshake_nonce, - }, - )) + .send(PeerEvent::PeerInfoReceived { + protocol_version: common_protocol_version, + network, + common_services, + user_agent, + software_version, + receiver_address, + handshake_nonce, + }) .await?; self.socket @@ -247,18 +244,15 @@ where choose_common_protocol_version(protocol_version, self.node_protocol_version)?; self.peer_event_tx - .send(( - self.peer_id, - PeerEvent::PeerInfoReceived { - protocol_version: common_protocol_version, - network, - common_services, - user_agent, - software_version, - receiver_address, - handshake_nonce, - }, - )) + .send(PeerEvent::PeerInfoReceived { + protocol_version: common_protocol_version, + network, + common_services, + user_agent, + software_version, + receiver_address, + handshake_nonce, + }) .await?; } } @@ -269,9 +263,9 @@ where // Note: the channels used by this function to propagate messages to other parts of p2p // must be bounded; this is important to prevent DoS attacks. async fn handle_socket_msg( - peer: PeerId, + peer_id: PeerId, msg: Message, - peer_event_tx: &mut mpsc::Sender<(PeerId, PeerEvent)>, + peer_event_tx: &mut mpsc::Sender, sync_msg_tx: &mut mpsc::Sender, ) -> crate::Result<()> { match msg.categorize() { @@ -280,10 +274,10 @@ where // the peer's ban score. (We may add a separate PeerEvent for this and Backend // can then use the now unused ConnectivityEvent::Misbehaved to forward the error // to the peer manager.) - log::error!("Peer {peer} sent unexpected handshake message"); + log::error!("Peer {peer_id} sent unexpected handshake message"); } CategorizedMessage::PeerManagerMessage(msg) => { - peer_event_tx.send((peer, PeerEvent::MessageReceived { message: msg })).await? + peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await? } CategorizedMessage::SyncMessage(msg) => sync_msg_tx.send(msg).await?, } @@ -301,10 +295,7 @@ where let send_result = self .peer_event_tx - .send(( - self.peer_id, - PeerEvent::HandshakeFailed { error: err.clone() }, - )) + .send(PeerEvent::HandshakeFailed { error: err.clone() }) .await; if let Err(send_error) = send_result { log::error!( @@ -356,8 +347,7 @@ where pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> { let run_result = self.run_impl(local_time).await; - let send_result = - self.peer_event_tx.send((self.peer_id, PeerEvent::ConnectionClosed)).await; + let send_result = self.peer_event_tx.send(PeerEvent::ConnectionClosed).await; if let Err(send_error) = send_result { // Note: this situation is likely to happen if the connection is already closed, @@ -441,7 +431,7 @@ mod tests { let _peer = handle.await.unwrap(); assert_eq!( - rx1.try_recv().unwrap().1, + rx1.try_recv().unwrap(), PeerEvent::PeerInfoReceived { protocol_version: TEST_PROTOCOL_VERSION, network: *chain_config.magic_bytes(), @@ -519,18 +509,15 @@ mod tests { let _peer = handle.await.unwrap(); assert_eq!( rx1.try_recv(), - Ok(( - peer_id3, - PeerEvent::PeerInfoReceived { - protocol_version: TEST_PROTOCOL_VERSION, - network: *chain_config.magic_bytes(), - common_services: [Service::Blocks, Service::Transactions].as_slice().into(), - user_agent: p2p_config.user_agent.clone(), - software_version: *chain_config.software_version(), - receiver_address: None, - handshake_nonce: 1, - } - )) + Ok(PeerEvent::PeerInfoReceived { + protocol_version: TEST_PROTOCOL_VERSION, + network: *chain_config.magic_bytes(), + common_services: [Service::Blocks, Service::Transactions].as_slice().into(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + receiver_address: None, + handshake_nonce: 1, + }) ); }