diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml index bb4ae80d0ad..959dd9715b0 100644 --- a/transports/webrtc/Cargo.toml +++ b/transports/webrtc/Cargo.toml @@ -16,7 +16,7 @@ bytes = "1" futures = "0.3" futures-timer = "3" hex = "0.4" -if-watch = "2.0" +if-watch = "3.0" libp2p-core = { version = "0.38.0", path = "../../core" } libp2p-noise = { version = "0.41.0", path = "../../transports/noise" } log = "0.4" @@ -34,7 +34,7 @@ tokio-util = { version = "0.7", features = ["compat"], optional = true } webrtc = { version = "0.6.0", optional = true } [features] -tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc"] +tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc", "if-watch/tokio"] pem = ["webrtc?/pem"] [build-dependencies] diff --git a/transports/webrtc/src/tokio/transport.rs b/transports/webrtc/src/tokio/transport.rs index 24839c6030d..1030098c781 100644 --- a/transports/webrtc/src/tokio/transport.rs +++ b/transports/webrtc/src/tokio/transport.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream}; -use if_watch::{IfEvent, IfWatcher}; +use if_watch::{tokio::IfWatcher, IfEvent}; use libp2p_core::{ identity, multiaddr::{Multiaddr, Protocol}, @@ -30,6 +30,7 @@ use webrtc::peer_connection::configuration::RTCConfiguration; use std::net::IpAddr; use std::{ + io, net::SocketAddr, pin::Pin, task::{Context, Poll}, @@ -87,12 +88,10 @@ impl libp2p_core::Transport for Transport { let udp_mux = UDPMuxNewAddr::listen_on(socket_addr) .map_err(|io| TransportError::Other(Error::Io(io)))?; - self.listeners.push(ListenStream::new( - id, - self.config.clone(), - udp_mux, - IfWatcher::new().map_err(|io| TransportError::Other(Error::Io(io)))?, - )); + self.listeners.push( + ListenStream::new(id, self.config.clone(), udp_mux) + .map_err(|e| TransportError::Other(Error::Io(e)))?, + ); Ok(id) } @@ -193,25 +192,40 @@ struct ListenStream { /// become or stop being available. /// /// `None` if the socket is only listening on a single interface. - if_watcher: IfWatcher, + if_watcher: Option, + + /// Pending event to reported. + pending_event: Option<::Item>, } impl ListenStream { /// Constructs a `WebRTCListenStream` for incoming connections. - fn new( - listener_id: ListenerId, - config: Config, - udp_mux: UDPMuxNewAddr, - if_watcher: IfWatcher, - ) -> Self { - ListenStream { + fn new(listener_id: ListenerId, config: Config, udp_mux: UDPMuxNewAddr) -> io::Result { + let listen_addr = udp_mux.listen_addr(); + + let if_watcher; + let pending_event; + if listen_addr.ip().is_unspecified() { + if_watcher = Some(IfWatcher::new()?); + pending_event = None; + } else { + if_watcher = None; + let ma = socketaddr_to_multiaddr(&listen_addr, Some(config.fingerprint)); + pending_event = Some(TransportEvent::NewAddress { + listener_id, + listen_addr: ma, + }) + } + + Ok(ListenStream { listener_id, - listen_addr: udp_mux.listen_addr(), + listen_addr, config, udp_mux, report_closed: None, if_watcher, - } + pending_event, + }) } /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and @@ -232,7 +246,12 @@ impl ListenStream { } fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { - while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) { + let if_watcher = match self.if_watcher.as_mut() { + Some(w) => w, + None => return Poll::Pending, + }; + + while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { match event { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); @@ -284,6 +303,10 @@ impl Stream for ListenStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { + if let Some(event) = self.pending_event.take() { + return Poll::Ready(Some(event)); + } + if let Some(closed) = self.report_closed.as_mut() { // Listener was closed. // Report the transport event if there is one. On the next iteration, return