diff --git a/prdoc/pr_3983.prdoc b/prdoc/pr_3983.prdoc new file mode 100644 index 0000000000000..755979f6c431f --- /dev/null +++ b/prdoc/pr_3983.prdoc @@ -0,0 +1,20 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Detect closed notification substreams instead of evicting all peers + +doc: + - audience: Node Operator + description: | + Replace eviction of all peers when syncing has completely stalled with a more granular + detection of notification substreams closed by remote peers. + + This change is expected to make the _reported_ peer count lower, as before this change + the number also included already closed substreams. Nevertheless, the currently reported + number should be seen as the real number of working connections. + +crates: + - name: sc-network-sync + bump: minor + - name: sc-network + bump: minor diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 28662be29feed..2a99aaad62684 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -799,6 +799,9 @@ impl ConnectionHandler for NotifsHandler { // performed before the code paths that can produce `Ready` (with some rare exceptions). // Importantly, however, the flush is performed *after* notifications are queued with // `Sink::start_send`. + // Note that we must call `poll_flush` on all substreams and not only on those we + // have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush` + // also reports the substream termination (even if no data was written into it). for protocol_index in 0..self.protocols.len() { match &mut self.protocols[protocol_index].state { State::Open { out_substream: out_substream @ Some(_), .. } => { @@ -841,7 +844,7 @@ impl ConnectionHandler for NotifsHandler { State::OpenDesiredByRemote { in_substream, pending_opening } => match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) { Poll::Pending => {}, - Poll::Ready(Ok(void)) => match void {}, + Poll::Ready(Ok(())) => {}, Poll::Ready(Err(_)) => { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening }; @@ -857,7 +860,7 @@ impl ConnectionHandler for NotifsHandler { cx, ) { Poll::Pending => {}, - Poll::Ready(Ok(void)) => match void {}, + Poll::Ready(Ok(())) => {}, Poll::Ready(Err(_)) => *in_substream = None, }, } diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs index 4e1c033f33b68..b86a863337231 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -44,7 +44,6 @@ use log::{error, warn}; use unsigned_varint::codec::UviBytes; use std::{ - convert::Infallible, io, mem, pin::Pin, task::{Context, Poll}, @@ -223,10 +222,7 @@ where /// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is /// guaranteed to not generate any notification. - pub fn poll_process( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { + pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); loop { @@ -248,8 +244,10 @@ where }, NotificationsInSubstreamHandshake::Flush => { match Sink::poll_flush(this.socket.as_mut(), cx)? { - Poll::Ready(()) => - *this.handshake = NotificationsInSubstreamHandshake::Sent, + Poll::Ready(()) => { + *this.handshake = NotificationsInSubstreamHandshake::Sent; + return Poll::Ready(Ok(())); + }, Poll::Pending => { *this.handshake = NotificationsInSubstreamHandshake::Flush; return Poll::Pending @@ -262,7 +260,7 @@ where st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote | st @ NotificationsInSubstreamHandshake::BothSidesClosed => { *this.handshake = st; - return Poll::Pending + return Poll::Ready(Ok(())); }, } } @@ -445,6 +443,21 @@ where fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); + + // `Sink::poll_flush` does not expose stream closed error until we write something into + // the stream, so the code below makes sure we detect that the substream was closed + // even if we don't write anything into it. + match Stream::poll_next(this.socket.as_mut(), cx) { + Poll::Pending => {}, + Poll::Ready(Some(_)) => { + error!( + target: "sub-libp2p", + "Unexpected incoming data in `NotificationsOutSubstream`", + ); + }, + Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)), + } + Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io) } @@ -494,13 +507,19 @@ pub enum NotificationsOutError { /// I/O error on the substream. #[error(transparent)] Io(#[from] io::Error), + #[error("substream was closed/reset")] + Terminated, } #[cfg(test)] mod tests { - use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen}; - use futures::{channel::oneshot, prelude::*}; + use super::{ + NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutError, + NotificationsOutOpen, + }; + use futures::{channel::oneshot, future, prelude::*}; use libp2p::core::upgrade; + use std::{pin::Pin, task::Poll}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -693,4 +712,95 @@ mod tests { client.await.unwrap(); } + + #[tokio::test] + async fn send_handshake_without_polling_for_incoming_data() { + const PROTO_NAME: &str = "/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = tokio::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let NotificationsOutOpen { handshake, .. } = upgrade::apply_outbound( + socket.compat(), + NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), + upgrade::Version::V1, + ) + .await + .unwrap(); + + assert_eq!(handshake, b"hello world"); + }); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( + socket.compat(), + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), + ) + .await + .unwrap(); + + assert_eq!(handshake, b"initial message"); + substream.send_handshake(&b"hello world"[..]); + + // Actually send the handshake. + future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap(); + + client.await.unwrap(); + } + + #[tokio::test] + async fn can_detect_dropped_out_substream_without_writing_data() { + const PROTO_NAME: &str = "/test/proto/1"; + let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); + + let client = tokio::spawn(async move { + let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); + let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound( + socket.compat(), + NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), + upgrade::Version::V1, + ) + .await + .unwrap(); + + assert_eq!(handshake, b"hello world"); + + future::poll_fn(|cx| match Pin::new(&mut substream).poll_flush(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => { + cx.waker().wake_by_ref(); + Poll::Pending + }, + Poll::Ready(Err(e)) => { + assert!(matches!(e, NotificationsOutError::Terminated)); + Poll::Ready(()) + }, + }) + .await; + }); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( + socket.compat(), + NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), + ) + .await + .unwrap(); + + assert_eq!(handshake, b"initial message"); + + // Send the handhsake. + substream.send_handshake(&b"hello world"[..]); + future::poll_fn(|cx| Pin::new(&mut substream).poll_process(cx)).await.unwrap(); + + drop(substream); + + client.await.unwrap(); + } } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 952300a14d891..b986b73e79b86 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -88,7 +88,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - time::{Duration, Instant}, }; /// Interval at which we perform time based maintenance @@ -97,26 +96,6 @@ const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100) /// Maximum number of known block hashes to keep for a peer. const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead -/// Logging target for the file. -const LOG_TARGET: &str = "sync"; - -/// If the block announces stream to peer has been inactive for 30 seconds meaning local node -/// has not sent or received block announcements to/from the peer, report the node for inactivity, -/// disconnect it and attempt to establish connection to some other peer. -const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30); - -/// When `SyncingEngine` is started, wait two minutes before actually staring to count peers as -/// evicted. -/// -/// Parachain collator may incorrectly get evicted because it's waiting to receive a number of -/// relaychain blocks before it can start creating parachain blocks. During this wait, -/// `SyncingEngine` still counts it as active and as the peer is not sending blocks, it may get -/// evicted if a block is not received within the first 30 secons since the peer connected. -/// -/// To prevent this from happening, define a threshold for how long `SyncingEngine` should wait -/// before it starts evicting peers. -const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60); - /// Maximum allowed size for a block announce. const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; @@ -126,8 +105,6 @@ mod rep { pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer send us a block announcement that failed at validation. pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement"); - /// Block announce substream with the peer has been inactive too long - pub const INACTIVE_SUBSTREAM: Rep = Rep::new(-(1 << 10), "Inactive block announce substream"); /// We received a message that failed to decode. pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); /// Peer is on unsupported protocol version. @@ -313,18 +290,9 @@ pub struct SyncingEngine { /// Handle that is used to communicate with `sc_network::Notifications`. notification_service: Box, - /// When the syncing was started. - /// - /// Stored as an `Option` so once the initial wait has passed, `SyncingEngine` - /// can reset the peer timers and continue with the normal eviction process. - syncing_started: Option, - /// Handle to `PeerStore`. peer_store_handle: PeerStoreHandle, - /// Instant when the last notification was sent or received. - last_notification_io: Instant, - /// Pending responses pending_responses: PendingResponses, @@ -513,9 +481,7 @@ where event_streams: Vec::new(), notification_service, tick_timeout, - syncing_started: None, peer_store_handle, - last_notification_io: Instant::now(), metrics: if let Some(r) = metrics_registry { match Metrics::register(r, is_major_syncing.clone()) { Ok(metrics) => Some(metrics), @@ -677,15 +643,12 @@ where data: Some(data.clone()), }; - self.last_notification_io = Instant::now(); let _ = self.notification_service.send_sync_notification(peer_id, message.encode()); } } } pub async fn run(mut self) { - self.syncing_started = Some(Instant::now()); - loop { tokio::select! { _ = self.tick_timeout.tick() => self.perform_periodic_actions(), @@ -786,39 +749,6 @@ where fn perform_periodic_actions(&mut self) { self.report_metrics(); - - // if `SyncingEngine` has just started, don't evict seemingly inactive peers right away - // as they may not have produced blocks not because they've disconnected but because - // they're still waiting to receive enough relaychain blocks to start producing blocks. - if let Some(started) = self.syncing_started { - if started.elapsed() < INITIAL_EVICTION_WAIT_PERIOD { - return - } - - self.syncing_started = None; - self.last_notification_io = Instant::now(); - } - - // if syncing hasn't sent or received any blocks within `INACTIVITY_EVICT_THRESHOLD`, - // it means the local node has stalled and is connected to peers who either don't - // consider it connected or are also all stalled. In order to unstall the node, - // disconnect all peers and allow `ProtocolController` to establish new connections. - if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD { - log::debug!( - target: LOG_TARGET, - "syncing has halted due to inactivity, evicting all peers", - ); - - for peer in self.peers.keys() { - self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM); - self.network_service - .disconnect_peer(*peer, self.block_announce_protocol_name.clone()); - } - - // after all the peers have been evicted, start timer again to prevent evicting - // new peers that join after the old peer have been evicted - self.last_notification_io = Instant::now(); - } } fn process_service_command(&mut self, command: ToServiceCommand) { @@ -953,7 +883,6 @@ where return }; - self.last_notification_io = Instant::now(); self.push_block_announce_validation(peer, announce); }, }