Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bounded channels for peer events #1186

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ testing_logger = "0.1"
thiserror = "1.0"
tokio = { version = "1.27", default-features = false }
tokio-socks = "0.5"
tokio-stream = "0.1"
tokio-util = { version = "0.7", default-features = false }
toml = "0.8"
tower = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions chainstate/tx-verifier/src/transaction_verifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ where
accounting: A,
verifier_config: TransactionVerifierConfig,
) -> Self {
// TODO: both "expect"s in this function may fire when exiting the node-gui app;
// get rid of them and return a proper Result.
// See https://github.com/mintlayer/mintlayer-core/issues/1221
let best_block = storage
.get_best_block_for_utxos()
.expect("Database error while reading utxos best block");
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sscanf.workspace = true
tap.workspace = true
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
tokio-stream.workspace = true
tokio-socks.workspace = true
tokio-util = { workspace = true, default-features = false, features = ["codec"] }

Expand Down
46 changes: 26 additions & 20 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

use std::{collections::HashMap, sync::Arc};

use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt};
use p2p_types::socket_address::SocketAddress;
use tokio::{
sync::{mpsc, oneshot},
time::timeout,
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap};

use common::{
chain::ChainConfig,
Expand Down Expand Up @@ -57,10 +58,19 @@ use super::{
types::{HandshakeNonce, Message, P2pTimestamp},
};

/// Buffer size of the channel to the SyncManager peer task.
/// How many unprocessed messages can be sent before the peer's event loop is blocked.
// TODO: Decide what the optimal value is (for example, by comparing the initial block download time)
const SYNC_CHAN_BUF_SIZE: usize = 20;
/// Buffer sizes for the channels used by Peer to send peer messages to other parts of p2p.
///
/// If the number of unprocessed messages exceeds this limit, the peer's event loop will be
/// blocked; this is needed to prevent DoS attacks where a peer would overload the node with
/// requests, which may lead to memory exhaustion.
/// Note: the values were chosen pretty much arbitrarily; the sync messages channel has a lower
/// limit because it's used to send blocks, so its messages can be up to 1Mb in size; peer events,
/// on the other hand, are small.
/// Also note that basic tests of initial block download time showed that there is no real
/// difference between 20 and 10000 for both of the limits here. These results, of course, depend
/// on the hardware and internet connection, so we've chosen larger limits.
const SYNC_MSG_CHAN_BUF_SIZE: usize = 100;
const PEER_EVENT_CHAN_BUF_SIZE: usize = 1000;

/// Active peer data
struct PeerContext {
Expand Down Expand Up @@ -123,12 +133,8 @@ pub struct Backend<T: TransportSocket> {
/// Pending connections
pending: HashMap<PeerId, PendingPeerContext>,

/// Channel sender for sending events from Peers to Backend; this will be passed to each
/// Peer upon its creation.
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,

/// Channel receiver for receiving events from Peers
peer_event_rx: mpsc::UnboundedReceiver<(PeerId, PeerEvent)>,
/// Map of streams for receiving events from peers.
peer_event_stream_map: StreamMap<PeerId, ReceiverStream<PeerEvent>>,

/// Channel sender for sending connectivity events to the frontend
conn_event_tx: mpsc::UnboundedSender<ConnectivityEvent>,
Expand Down Expand Up @@ -171,7 +177,6 @@ where
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,
node_protocol_version: ProtocolVersion,
) -> Self {
let (peer_event_tx, peer_event_rx) = mpsc::unbounded_channel();
Self {
transport,
socket,
Expand All @@ -183,8 +188,7 @@ where
syncing_event_tx,
peers: HashMap::new(),
pending: HashMap::new(),
peer_event_tx,
peer_event_rx,
peer_event_stream_map: StreamMap::new(),
command_queue: FuturesUnordered::new(),
shutdown,
shutdown_receiver,
Expand Down Expand Up @@ -234,7 +238,7 @@ where
.get_mut(&peer_id)
.ok_or(P2pError::PeerError(PeerError::PeerDoesntExist))?;

let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_CHAN_BUF_SIZE);
let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_MSG_CHAN_BUF_SIZE);
peer.backend_event_tx.send(BackendEvent::Accepted { sync_msg_tx })?;

let old_value = peer.was_accepted.test_and_set();
Expand Down Expand Up @@ -292,13 +296,12 @@ where
self.handle_command(command.ok_or(P2pError::ChannelClosed)?);
},
// Process pending commands
callback = self.command_queue.select_next_some(), if !self.command_queue.is_empty() => {
Some(callback) = self.command_queue.next() => {
callback(&mut self)?;
},
// Handle peer events.
event = self.peer_event_rx.recv() => {
let (peer, event) = event.ok_or(P2pError::ChannelClosed)?;
self.handle_peer_event(peer, event)?;
Some((peer_id, event)) = self.peer_event_stream_map.next() => {
self.handle_peer_event(peer_id, event)?;
},
// Accept a new peer connection.
res = self.socket.accept() => {
Expand Down Expand Up @@ -350,7 +353,10 @@ where
Some(address.as_peer_address())
};

let peer_event_tx = self.peer_event_tx.clone();
let (peer_event_tx, peer_event_rx) = mpsc::channel(PEER_EVENT_CHAN_BUF_SIZE);
let peer_event_stream = ReceiverStream::new(peer_event_rx);

self.peer_event_stream_map.insert(remote_peer_id, peer_event_stream);

let peer = peer::Peer::<T>::new(
remote_peer_id,
Expand Down
102 changes: 54 additions & 48 deletions p2p/src/net/default_backend/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
use std::{sync::Arc, time::Duration};

use p2p_types::services::Services;
use tokio::{
sync::mpsc::{self, Sender},
time::timeout,
};
use tokio::{sync::mpsc, time::timeout};

use common::chain::ChainConfig;
use logging::log;
Expand Down Expand Up @@ -85,7 +82,7 @@ pub struct Peer<T: TransportSocket> {
receiver_address: Option<PeerAddress>,

/// Channel sender for sending events to Backend
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,
peer_event_tx: mpsc::Sender<PeerEvent>,

/// Channel receiver for receiving events from Backend.
backend_event_rx: mpsc::UnboundedReceiver<BackendEvent>,
Expand All @@ -108,7 +105,7 @@ where
p2p_config: Arc<P2pConfig>,
socket: T::Stream,
receiver_address: Option<PeerAddress>,
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,
peer_event_tx: mpsc::Sender<PeerEvent>,
backend_event_rx: mpsc::UnboundedReceiver<BackendEvent>,
node_protocol_version: ProtocolVersion,
) -> Self {
Expand Down Expand Up @@ -178,18 +175,17 @@ where
// Send PeerInfoReceived before sending handshake to remote peer!
// Backend is expected to receive PeerInfoReceived before outgoing connection has chance to complete handshake,
// It's required to reliably detect self-connects.
self.peer_event_tx.send((
self.peer_id,
PeerEvent::PeerInfoReceived {
self.peer_event_tx
.send(PeerEvent::PeerInfoReceived {
protocol_version: common_protocol_version,
network,
common_services,
user_agent,
software_version,
receiver_address,
handshake_nonce,
},
))?;
})
.await?;

self.socket
.send(Message::Handshake(HandshakeMessage::HelloAck {
Expand Down Expand Up @@ -247,60 +243,60 @@ where
let common_protocol_version =
choose_common_protocol_version(protocol_version, self.node_protocol_version)?;

self.peer_event_tx.send((
self.peer_id,
PeerEvent::PeerInfoReceived {
self.peer_event_tx
.send(PeerEvent::PeerInfoReceived {
protocol_version: common_protocol_version,
network,
common_services,
user_agent,
software_version,
receiver_address,
handshake_nonce,
},
))?;
})
.await?;
}
}

Ok(())
}

// Note: the channels used by this function to propagate messages to other parts of p2p
// must be bounded; this is important to prevent DoS attacks.
async fn handle_socket_msg(
peer: PeerId,
peer_id: PeerId,
msg: Message,
peer_event_tx: &mut mpsc::UnboundedSender<(PeerId, PeerEvent)>,
sync_msg_tx: &mut Sender<SyncMessage>,
peer_event_tx: &mut mpsc::Sender<PeerEvent>,
sync_msg_tx: &mut mpsc::Sender<SyncMessage>,
) -> crate::Result<()> {
// TODO: Use a bounded channel to send messages to the peer manager
match msg.categorize() {
CategorizedMessage::Handshake(_) => {
// TODO: this must be reported to the peer manager, so that it can adjust
// the peer's ban score. (We may add a separate PeerEvent for this and Backend
// can then use the now unused ConnectivityEvent::Misbehaved to forward the error
// to the peer manager.)
log::error!("Peer {peer} sent unexpected handshake message");
log::error!("Peer {peer_id} sent unexpected handshake message");
}
CategorizedMessage::PeerManagerMessage(msg) => {
peer_event_tx.send((peer, PeerEvent::MessageReceived { message: msg }))?
peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await?
}
CategorizedMessage::SyncMessage(msg) => sync_msg_tx.send(msg).await?,
}

Ok(())
}

pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> {
async fn run_impl(&mut self, local_time: P2pTimestamp) -> crate::Result<()> {
// handshake with remote peer and send peer's info to backend
let handshake_res = timeout(PEER_HANDSHAKE_TIMEOUT, self.handshake(local_time)).await;
match handshake_res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
log::debug!("handshake failed for peer {}: {err}", self.peer_id);

let send_result = self.peer_event_tx.send((
self.peer_id,
PeerEvent::HandshakeFailed { error: err.clone() },
));
let send_result = self
.peer_event_tx
.send(PeerEvent::HandshakeFailed { error: err.clone() })
.await;
if let Err(send_error) = send_result {
log::error!(
"Cannot send PeerEvent::HandshakeFailed to peer {}: {}",
Expand Down Expand Up @@ -348,11 +344,22 @@ where
}
}
}
}

impl<T: TransportSocket> Drop for Peer<T> {
fn drop(&mut self) {
let _ = self.peer_event_tx.send((self.peer_id, PeerEvent::ConnectionClosed));
pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> {
let run_result = self.run_impl(local_time).await;
let send_result = self.peer_event_tx.send(PeerEvent::ConnectionClosed).await;

if let Err(send_error) = send_result {
// Note: this situation is likely to happen if the connection is already closed,
// so it's not really an error.
log::debug!(
"Unable to send PeerEvent::ConnectionClosed to Backend for peer {}: {}",
self.peer_id,
send_error
);
}

run_result
}
}

Expand All @@ -375,6 +382,8 @@ mod tests {
use chainstate::Locator;
use futures::FutureExt;

const TEST_CHAN_BUF_SIZE: usize = 100;

async fn handshake_inbound<A, T>()
where
A: TestTransportMaker<Transport = T>,
Expand All @@ -383,7 +392,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id2 = PeerId::new();

Expand Down Expand Up @@ -422,7 +431,7 @@ mod tests {

let _peer = handle.await.unwrap();
assert_eq!(
rx1.try_recv().unwrap().1,
rx1.try_recv().unwrap(),
PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
Expand Down Expand Up @@ -458,7 +467,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id3 = PeerId::new();

Expand Down Expand Up @@ -500,18 +509,15 @@ mod tests {
let _peer = handle.await.unwrap();
assert_eq!(
rx1.try_recv(),
Ok((
peer_id3,
PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
common_services: [Service::Blocks, Service::Transactions].as_slice().into(),
user_agent: p2p_config.user_agent.clone(),
software_version: *chain_config.software_version(),
receiver_address: None,
handshake_nonce: 1,
}
))
Ok(PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
common_services: [Service::Blocks, Service::Transactions].as_slice().into(),
user_agent: p2p_config.user_agent.clone(),
software_version: *chain_config.software_version(),
receiver_address: None,
handshake_nonce: 1,
})
);
}

Expand All @@ -538,7 +544,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, _rx1) = mpsc::unbounded_channel();
let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id3 = PeerId::new();

Expand Down Expand Up @@ -599,7 +605,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, _rx1) = mpsc::unbounded_channel();
let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id2 = PeerId::new();

Expand Down