From ef495b8ccd3db79bda767b7e050e748351f5a067 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:41:48 +0200 Subject: [PATCH] mdns/fix: Failed to register opened substream (#301) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR ensures that when MDNS encounters an error it does not terminate other litep2p components. Previously, if MDNS failed to send a query or to handle the incoming packets it would exit. The exit is presented by the following log line observed on kusama validator: ``` tokio-runtime-worker litep2p::mdns: failed to send mdns query error=IoError(NetworkUnreachable) ``` This situation is causing the substrate Discovery mechanism to also exit, which propagates to the litep2p kademlia handler that exits as well. This leaves the node unable to discover the network or handle incoming substreams. ### Testing Done The issue was reproduced locally with a tokio interval patch that exits the MDNS component after having connectivity in Kusama: ``` 2024-12-11 12:50:34.425 ERROR tokio-runtime-worker litep2p::mdns: interval tick MDNS 2024-12-11 12:50:34.425 ERROR tokio-runtime-worker litep2p::mdns: interval tick expired, closing MDNS 2024-12-11 12:50:35.111 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWEwh9AwKFUJKPFqmJXWByH7JKYRcfAUfPvp9f3xzj3ibJ") endpoint=Dialer { address: "/ip4/3.96.91.180/tcp/30333", connection_id: ConnectionId(200) } error=ConnectionClosed ... 2024-12-11 12:50:38.753 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWJb1W7jmqDCaU3Hsh6NRfDo12gnj8hnKfGwA77vRE4jBv") endpoint=Dialer { address: "/ip4/51.38.63.126/tcp/30333", connection_id: ConnectionId(294) } error=ConnectionClosed 2024-12-11 12:50:40.389 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGXXuap75AN24aA5XP9S1X3BKqdDbYyHwBTJakMyv1P5V") endpoint=Dialer { address: "/ip4/104.243.41.217/tcp/30330", connection_id: ConnectionId(29) } error=ConnectionClosed ... 2024-12-11 12:53:15.690 ERROR tokio-runtime-worker litep2p::tcp: connection exited with error connection_id=ConnectionId(29) error=EssentialTaskClosed 2024-12-11 12:53:40.071 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGphqiEqsfR5ZnV7R2Lgubxi7eAo6MTx3tVmso8oCkvJn") endpoint=Dialer { address: "/ip4/51.163.1.153/tcp/30003", connection_id: ConnectionId(51) } error=ConnectionClosed 2024-12-11 12:53:40.233 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWM5mnupyiDGtdN6qm3riQDjBbAZfFqAJfMbcbPQbkEn8u") endpoint=Dialer { address: "/ip4/168.119.149.170/tcp/30333", connection_id: ConnectionId(28) } error=ConnectionClosed 2024-12-11 12:53:41.060 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGphqiEqsfR5ZnV7R2Lgubxi7eAo6MTx3tVmso8oCkvJn") endpoint=Dialer { address: "/ip4/51.163.1.153/tcp/30003", connection_id: ConnectionId(51) } error=ConnectionClosed 2024-12-11 12:53:42.766 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWM5mnupyiDGtdN6qm3riQDjBbAZfFqAJfMbcbPQbkEn8u") endpoint=Dialer { address: "/ip4/168.119.149.170/tcp/30333", connection_id: ConnectionId(28) } error=ConnectionClosed ``` Closes: https://github.com/paritytech/litep2p/issues/300 Thanks @dmitry-markin for also confirming this 🙏 cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile --- src/protocol/mdns.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/protocol/mdns.rs b/src/protocol/mdns.rs index d305a3f7..1fb3dc2e 100644 --- a/src/protocol/mdns.rs +++ b/src/protocol/mdns.rs @@ -21,7 +21,7 @@ //! [Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) implementation. -use crate::{error::Error, transport::manager::TransportManagerHandle, DEFAULT_CHANNEL_SIZE}; +use crate::{transport::manager::TransportManagerHandle, DEFAULT_CHANNEL_SIZE}; use futures::Stream; use multiaddr::Multiaddr; @@ -95,7 +95,7 @@ pub(crate) struct Mdns { socket: UdpSocket, /// Query interval. - query_interval: Duration, + query_interval: tokio::time::Interval, /// TX channel for sending events to user. event_tx: Sender, @@ -138,12 +138,15 @@ impl Mdns { socket.join_multicast_v4(&IPV4_MULTICAST_ADDRESS, &Ipv4Addr::UNSPECIFIED)?; socket.set_nonblocking(true)?; + let mut query_interval = tokio::time::interval(config.query_interval); + query_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + Ok(Self { _transport_handle, event_tx: config.tx, next_query_id: 1337u16, discovered: HashSet::new(), - query_interval: config.query_interval, + query_interval, receive_buffer: vec![0u8; 4096], username: rand::thread_rng() .sample_iter(&Alphanumeric) @@ -276,24 +279,19 @@ impl Mdns { } /// Event loop for [`Mdns`]. - pub(crate) async fn start(mut self) -> crate::Result<()> { + pub(crate) async fn start(mut self) { tracing::debug!(target: LOG_TARGET, "starting mdns event loop"); - // before starting the loop, make an initial query to the network - // - // bail early if the socket is not working - self.on_outbound_request().await?; - loop { tokio::select! { - _ = tokio::time::sleep(self.query_interval) => { - tracing::trace!(target: LOG_TARGET, "timeout expired"); + _ = self.query_interval.tick() => { + tracing::trace!(target: LOG_TARGET, "query interval ticked"); if let Err(error) = self.on_outbound_request().await { tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns query"); - return Err(error); } - } + }, + result = self.socket.recv_from(&mut self.receive_buffer) => match result { Ok((nread, address)) => match Packet::parse(&self.receive_buffer[..nread]) { Ok(packet) => match packet.has_flags(PacketFlag::RESPONSE) { @@ -308,9 +306,11 @@ impl Mdns { } } false => if let Some(response) = self.on_inbound_request(packet) { - self.socket + if let Err(error) = self.socket .send_to(&response, (IPV4_MULTICAST_ADDRESS, IPV4_MULTICAST_PORT)) - .await?; + .await { + tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns response"); + } } } Err(error) => tracing::debug!( @@ -323,7 +323,6 @@ impl Mdns { } Err(error) => { tracing::error!(target: LOG_TARGET, ?error, "failed to read from socket"); - return Err(Error::from(error)); } }, }