Skip to content

Commit

Permalink
mdns/fix: Failed to register opened substream (#301)
Browse files Browse the repository at this point in the history
This PR ensures that when MDNS encounters an error it does not terminate
other litep2p components.

Previously, if MDNS failed to send a query or to handle the incoming
packets it would exit.
The exit is presented by the following log line observed on kusama
validator:

```
tokio-runtime-worker litep2p::mdns: failed to send mdns query error=IoError(NetworkUnreachable)
```

This situation is causing the substrate Discovery mechanism to also
exit, which propagates to the litep2p kademlia handler that exits as
well. This leaves the node unable to discover the network or handle
incoming substreams.


### Testing Done

The issue was reproduced locally with a tokio interval patch that exits
the MDNS component after having connectivity in Kusama:

```
2024-12-11 12:50:34.425 ERROR tokio-runtime-worker litep2p::mdns: interval tick MDNS
2024-12-11 12:50:34.425 ERROR tokio-runtime-worker litep2p::mdns: interval tick expired, closing MDNS

2024-12-11 12:50:35.111 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWEwh9AwKFUJKPFqmJXWByH7JKYRcfAUfPvp9f3xzj3ibJ") endpoint=Dialer { address: "/ip4/3.96.91.180/tcp/30333", connection_id: ConnectionId(200) } error=ConnectionClosed
...
2024-12-11 12:50:38.753 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWJb1W7jmqDCaU3Hsh6NRfDo12gnj8hnKfGwA77vRE4jBv") endpoint=Dialer { address: "/ip4/51.38.63.126/tcp/30333", connection_id: ConnectionId(294) } error=ConnectionClosed
2024-12-11 12:50:40.389 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGXXuap75AN24aA5XP9S1X3BKqdDbYyHwBTJakMyv1P5V") endpoint=Dialer { address: "/ip4/104.243.41.217/tcp/30330", connection_id: ConnectionId(29) } error=ConnectionClosed
...

2024-12-11 12:53:15.690 ERROR tokio-runtime-worker litep2p::tcp: connection exited with error connection_id=ConnectionId(29) error=EssentialTaskClosed
2024-12-11 12:53:40.071 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGphqiEqsfR5ZnV7R2Lgubxi7eAo6MTx3tVmso8oCkvJn") endpoint=Dialer { address: "/ip4/51.163.1.153/tcp/30003", connection_id: ConnectionId(51) } error=ConnectionClosed
2024-12-11 12:53:40.233 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWM5mnupyiDGtdN6qm3riQDjBbAZfFqAJfMbcbPQbkEn8u") endpoint=Dialer { address: "/ip4/168.119.149.170/tcp/30333", connection_id: ConnectionId(28) } error=ConnectionClosed
2024-12-11 12:53:41.060 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWGphqiEqsfR5ZnV7R2Lgubxi7eAo6MTx3tVmso8oCkvJn") endpoint=Dialer { address: "/ip4/51.163.1.153/tcp/30003", connection_id: ConnectionId(51) } error=ConnectionClosed
2024-12-11 12:53:42.766 ERROR tokio-runtime-worker litep2p::tcp::connection: failed to register opened substream to protocol protocol=Allocated("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/kad") peer=PeerId("12D3KooWM5mnupyiDGtdN6qm3riQDjBbAZfFqAJfMbcbPQbkEn8u") endpoint=Dialer { address: "/ip4/168.119.149.170/tcp/30333", connection_id: ConnectionId(28) } error=ConnectionClosed
```


Closes: #300

Thanks @dmitry-markin for also confirming this 🙏 

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv authored Dec 12, 2024
1 parent e9a009f commit ef495b8
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions src/protocol/mdns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

//! [Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) implementation.
use crate::{error::Error, transport::manager::TransportManagerHandle, DEFAULT_CHANNEL_SIZE};
use crate::{transport::manager::TransportManagerHandle, DEFAULT_CHANNEL_SIZE};

use futures::Stream;
use multiaddr::Multiaddr;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub(crate) struct Mdns {
socket: UdpSocket,

/// Query interval.
query_interval: Duration,
query_interval: tokio::time::Interval,

/// TX channel for sending events to user.
event_tx: Sender<MdnsEvent>,
Expand Down Expand Up @@ -138,12 +138,15 @@ impl Mdns {
socket.join_multicast_v4(&IPV4_MULTICAST_ADDRESS, &Ipv4Addr::UNSPECIFIED)?;
socket.set_nonblocking(true)?;

let mut query_interval = tokio::time::interval(config.query_interval);
query_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

Ok(Self {
_transport_handle,
event_tx: config.tx,
next_query_id: 1337u16,
discovered: HashSet::new(),
query_interval: config.query_interval,
query_interval,
receive_buffer: vec![0u8; 4096],
username: rand::thread_rng()
.sample_iter(&Alphanumeric)
Expand Down Expand Up @@ -276,24 +279,19 @@ impl Mdns {
}

/// Event loop for [`Mdns`].
pub(crate) async fn start(mut self) -> crate::Result<()> {
pub(crate) async fn start(mut self) {
tracing::debug!(target: LOG_TARGET, "starting mdns event loop");

// before starting the loop, make an initial query to the network
//
// bail early if the socket is not working
self.on_outbound_request().await?;

loop {
tokio::select! {
_ = tokio::time::sleep(self.query_interval) => {
tracing::trace!(target: LOG_TARGET, "timeout expired");
_ = self.query_interval.tick() => {
tracing::trace!(target: LOG_TARGET, "query interval ticked");

if let Err(error) = self.on_outbound_request().await {
tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns query");
return Err(error);
}
}
},

result = self.socket.recv_from(&mut self.receive_buffer) => match result {
Ok((nread, address)) => match Packet::parse(&self.receive_buffer[..nread]) {
Ok(packet) => match packet.has_flags(PacketFlag::RESPONSE) {
Expand All @@ -308,9 +306,11 @@ impl Mdns {
}
}
false => if let Some(response) = self.on_inbound_request(packet) {
self.socket
if let Err(error) = self.socket
.send_to(&response, (IPV4_MULTICAST_ADDRESS, IPV4_MULTICAST_PORT))
.await?;
.await {
tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns response");
}
}
}
Err(error) => tracing::debug!(
Expand All @@ -323,7 +323,6 @@ impl Mdns {
}
Err(error) => {
tracing::error!(target: LOG_TARGET, ?error, "failed to read from socket");
return Err(Error::from(error));
}
},
}
Expand Down

0 comments on commit ef495b8

Please sign in to comment.