Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Attempt to relieve pressure on mpsc_network_worker (#13725)
Browse files Browse the repository at this point in the history
* Attempt to relieve pressure on `mpsc_network_worker`

`SyncingEngine` interacting with `NetworkWorker` can put a lot of strain
on the channel if the number of inbound connections is high. This is
because `SyncingEngine` is notified of each inbound substream which it
then can either accept or reject and this causes a lot of message
exchange on the already busy channel.

Use a direct channel pair between `Protocol` and `SyncingEngine`
to exchange notification events. It is a temporary change to alleviate
the problems caused by syncing being an independent protocol and the
fix will be removed once `NotificationService` is implemented.

* Apply review comments

* fixes

* trigger ci

* Fix tests

Verify that both peers have a connection now that the validation goes
through `SyncingEngine`. Depending on how the tasks are scheduled,
one of them might not have the peer registered in `SyncingEngine` at which
point the test won't make any progress because block announcement received
from an unknown peer is discarded.

Move polling of `ChainSync` at the end of the function so that if a block
announcement causes a block request to be sent, that can be sent in the
same call to `SyncingEngine::poll()`.

---------

Co-authored-by: parity-processbot <>
  • Loading branch information
altonen authored and pgherveou committed Apr 4, 2023
1 parent 27aa298 commit f1e3fe5
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! See the documentation of [`Params`].
pub use crate::{
protocol::NotificationsSink,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Expand All @@ -31,7 +32,12 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;

use sp_runtime::traits::Block as BlockT;
Expand Down Expand Up @@ -714,6 +720,9 @@ pub struct Params<Block: BlockT> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
47 changes: 45 additions & 2 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use crate::types::ProtocolName;
use crate::{types::ProtocolName, NotificationsSink};

use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -90,3 +92,44 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}

/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},

/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
6 changes: 3 additions & 3 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub mod request_responses;
pub mod types;
pub mod utils;

pub use event::{DhtEvent, Event};
pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
Expand All @@ -278,8 +278,8 @@ pub use service::{
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
};
pub use types::ProtocolName;

Expand Down
108 changes: 90 additions & 18 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{

use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use libp2p::{
core::connection::ConnectionId,
swarm::{
Expand All @@ -35,11 +36,14 @@ use libp2p::{
use log::{debug, error, warn};

use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use std::{
collections::{HashMap, HashSet, VecDeque},
future::Future,
iter,
pin::Pin,
task::Poll,
};

Expand Down Expand Up @@ -68,6 +72,9 @@ mod rep {
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}

type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Pending list of messages to return from `poll` as a priority.
Expand All @@ -87,6 +94,8 @@ pub struct Protocol<B: BlockT> {
bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>,
/// Connected peers.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
_marker: std::marker::PhantomData<B>,
}

Expand All @@ -96,6 +105,7 @@ impl<B: BlockT> Protocol<B> {
roles: Roles,
network_config: &config::NetworkConfiguration,
block_announces_protocol: config::NonDefaultSetConfig,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let mut known_addresses = Vec::new();

Expand Down Expand Up @@ -179,6 +189,8 @@ impl<B: BlockT> Protocol<B> {
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
_marker: Default::default(),
};
Expand Down Expand Up @@ -418,6 +430,23 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }),
};

while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}

let outcome = match event {
NotificationsOut::CustomProtocolOpen {
peer_id,
Expand All @@ -440,16 +469,29 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
received_handshake: handshake.encode(),
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));

CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
Expand All @@ -469,15 +511,27 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
let roles = handshake.roles;
self.peers.insert(peer_id, roles);

CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)]
.clone(),
negotiated_fallback,
received_handshake,
roles,
notifications_sink,
}
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
Expand Down Expand Up @@ -535,6 +589,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
Expand All @@ -548,6 +608,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
// handshake. The outer layers have never received an opening event about this
// substream, and consequently shouldn't receive a closing event either.
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
Expand All @@ -558,6 +624,12 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
Expand Down
6 changes: 4 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready},
protocol::{self, NotifsHandlerError, Protocol, Ready},
request_responses::{IfDisconnected, RequestFailure},
service::{
signature::{Signature, SigningError},
Expand Down Expand Up @@ -91,6 +91,7 @@ use std::{

pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey};
pub use protocol::NotificationsSink;

mod metrics;
mod out_events;
Expand Down Expand Up @@ -146,7 +147,7 @@ where
/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new<Block: BlockT>(mut params: Params<Block>) -> Result<Self, Error> {
pub fn new(mut params: Params<B>) -> Result<Self, Error> {
// Private and public keys configuration.
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
Expand Down Expand Up @@ -227,6 +228,7 @@ where
From::from(&params.role),
&params.network_config,
params.block_announce_config,
params.tx,
)?;

// List of multiaddresses that we know in the network.
Expand Down
Loading

0 comments on commit f1e3fe5

Please sign in to comment.