Skip to content

Commit

Permalink
fix(webrtc): Don't emit addresses from other interfaces (#3142)
Browse files Browse the repository at this point in the history
Previously, we would always run `IfWatcher`, even if we were only listening on a specific interface. This patch fixes this behaviour and aligns it with how `libp2p-quic` operates.
  • Loading branch information
thomaseizinger authored Nov 18, 2022
1 parent 08510dd commit 05c0794
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
4 changes: 2 additions & 2 deletions transports/webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bytes = "1"
futures = "0.3"
futures-timer = "3"
hex = "0.4"
if-watch = "2.0"
if-watch = "3.0"
libp2p-core = { version = "0.38.0", path = "../../core" }
libp2p-noise = { version = "0.41.0", path = "../../transports/noise" }
log = "0.4"
Expand All @@ -34,7 +34,7 @@ tokio-util = { version = "0.7", features = ["compat"], optional = true }
webrtc = { version = "0.6.0", optional = true }

[features]
tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc"]
tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc", "if-watch/tokio"]
pem = ["webrtc?/pem"]

[build-dependencies]
Expand Down
59 changes: 41 additions & 18 deletions transports/webrtc/src/tokio/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream};
use if_watch::{IfEvent, IfWatcher};
use if_watch::{tokio::IfWatcher, IfEvent};
use libp2p_core::{
identity,
multiaddr::{Multiaddr, Protocol},
Expand All @@ -30,6 +30,7 @@ use webrtc::peer_connection::configuration::RTCConfiguration;

use std::net::IpAddr;
use std::{
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -87,12 +88,10 @@ impl libp2p_core::Transport for Transport {
let udp_mux = UDPMuxNewAddr::listen_on(socket_addr)
.map_err(|io| TransportError::Other(Error::Io(io)))?;

self.listeners.push(ListenStream::new(
id,
self.config.clone(),
udp_mux,
IfWatcher::new().map_err(|io| TransportError::Other(Error::Io(io)))?,
));
self.listeners.push(
ListenStream::new(id, self.config.clone(), udp_mux)
.map_err(|e| TransportError::Other(Error::Io(e)))?,
);

Ok(id)
}
Expand Down Expand Up @@ -193,25 +192,40 @@ struct ListenStream {
/// become or stop being available.
///
/// `None` if the socket is only listening on a single interface.
if_watcher: IfWatcher,
if_watcher: Option<IfWatcher>,

/// Pending event to reported.
pending_event: Option<<Self as Stream>::Item>,
}

impl ListenStream {
/// Constructs a `WebRTCListenStream` for incoming connections.
fn new(
listener_id: ListenerId,
config: Config,
udp_mux: UDPMuxNewAddr,
if_watcher: IfWatcher,
) -> Self {
ListenStream {
fn new(listener_id: ListenerId, config: Config, udp_mux: UDPMuxNewAddr) -> io::Result<Self> {
let listen_addr = udp_mux.listen_addr();

let if_watcher;
let pending_event;
if listen_addr.ip().is_unspecified() {
if_watcher = Some(IfWatcher::new()?);
pending_event = None;
} else {
if_watcher = None;
let ma = socketaddr_to_multiaddr(&listen_addr, Some(config.fingerprint));
pending_event = Some(TransportEvent::NewAddress {
listener_id,
listen_addr: ma,
})
}

Ok(ListenStream {
listener_id,
listen_addr: udp_mux.listen_addr(),
listen_addr,
config,
udp_mux,
report_closed: None,
if_watcher,
}
pending_event,
})
}

/// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and
Expand All @@ -232,7 +246,12 @@ impl ListenStream {
}

fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) {
let if_watcher = match self.if_watcher.as_mut() {
Some(w) => w,
None => return Poll::Pending,
};

while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
Expand Down Expand Up @@ -284,6 +303,10 @@ impl Stream for ListenStream {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(event) = self.pending_event.take() {
return Poll::Ready(Some(event));
}

if let Some(closed) = self.report_closed.as_mut() {
// Listener was closed.
// Report the transport event if there is one. On the next iteration, return
Expand Down

0 comments on commit 05c0794

Please sign in to comment.