diff --git a/Cargo.toml b/Cargo.toml index 53922f38f96..6b0184f0e93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] relay = ["dep:libp2p-relay", "libp2p-metrics?/relay"] +quic = ["dep:libp2p-quic"] request-response = ["dep:libp2p-request-response"] rendezvous = ["dep:libp2p-rendezvous"] tcp = ["dep:libp2p-tcp"] @@ -98,6 +99,7 @@ libp2p-noise = { version = "0.40.0", path = "transports/noise", optional = true libp2p-ping = { version = "0.40.1", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.37.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.1", path = "transports/pnet", optional = true } +libp2p-quic = { version = "0.8.0", path = "transports/quic", optional = true } libp2p-relay = { version = "0.13.0", path = "protocols/relay", optional = true } libp2p-rendezvous = { version = "0.10.0", path = "protocols/rendezvous", optional = true } libp2p-request-response = { version = "0.22.1", path = "protocols/request-response", optional = true } diff --git a/src/lib.rs b/src/lib.rs index 2f8ac1abb02..19c3673db1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,10 @@ pub use libp2p_plaintext as plaintext; #[cfg(feature = "pnet")] #[doc(inline)] pub use libp2p_pnet as pnet; +#[cfg(feature = "quic")] +#[cfg_attr(docsrs, doc(cfg(feature = "quic")))] +#[doc(inline)] +pub use libp2p_quic as quic; #[cfg(feature = "relay")] #[doc(inline)] pub use libp2p_relay as relay; diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml new file mode 100644 index 00000000000..5b3f4ce2a40 --- /dev/null +++ b/transports/quic/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "libp2p-quic" +version = "0.8.0" +authors = ["Parity Technologies "] +edition = "2021" +description = "TLS based QUIC transport implementation for libp2p" +repository = "https://github.com/libp2p/rust-libp2p" +license = "MIT" + +[dependencies] +if-watch = "2.0.0" +libp2p-core = { version = "0.37.0", path = "../../core" } +libp2p-tls = { version = "0.1.0-alpha", path = "../../transports/tls" } +quinn = { version = "0.9.0", features = ["tls-rustls", "futures-io", "runtime-async-std"] } +futures = "0.3.21" +thiserror = "1.0.26" +tracing = "0.1" +rand = "0.8.5" +rustls = "0.20.2" + +[dev-dependencies] +anyhow = "1.0.41" +async-std = { version = "1.12.0", features = ["attributes"] } +async-trait = "0.1.50" +libp2p = { version = "0.50.0", default-features = false, features = ["request-response"], path = "../.." } +tracing-subscriber = {version = "0.3.8", features = ["env-filter"] } diff --git a/transports/quic/src/in_addr.rs b/transports/quic/src/in_addr.rs new file mode 100644 index 00000000000..c9882ee84de --- /dev/null +++ b/transports/quic/src/in_addr.rs @@ -0,0 +1,66 @@ +use if_watch::{IfEvent, IfWatcher}; + +use futures::stream::Stream; + +use std::{ + io::Result, + net::IpAddr, + pin::Pin, + task::{Context, Poll}, +}; + +/// Watches for interface changes. +#[derive(Debug)] +pub enum InAddr { + /// The socket accepts connections on a single interface. + One { ip: Option }, + /// The socket accepts connections on all interfaces. + Any { if_watch: Box }, +} + +impl InAddr { + /// If ip is specified then only one `IfEvent::Up` with IpNet(ip)/32 will be generated. + /// If ip is unspecified then `IfEvent::Up/Down` events will be generated for all interfaces. + pub fn new(ip: IpAddr) -> Result { + let result = if ip.is_unspecified() { + let watcher = IfWatcher::new()?; + InAddr::Any { + if_watch: Box::new(watcher), + } + } else { + InAddr::One { ip: Some(ip) } + }; + Ok(result) + } +} + +impl Stream for InAddr { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let me = Pin::into_inner(self); + match me { + // If the listener is bound to a single interface, make sure the + // address is reported once. + InAddr::One { ip } => { + if let Some(ip) = ip.take() { + return Poll::Ready(Some(Ok(IfEvent::Up(ip.into())))); + } + } + InAddr::Any { if_watch } => { + // Consume all events for up/down interface changes. + if let Poll::Ready(ev) = if_watch.poll_if_event(cx) { + match ev { + Ok(event) => { + return Poll::Ready(Some(Ok(event))); + } + Err(err) => { + return Poll::Ready(Some(Err(err))); + } + } + } + } + } + Poll::Pending + } +} diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs new file mode 100644 index 00000000000..2ce87281509 --- /dev/null +++ b/transports/quic/src/lib.rs @@ -0,0 +1,649 @@ +use libp2p_core::{ + identity::Keypair, + multiaddr::{Multiaddr, Protocol}, + muxing::StreamMuxerEvent, + transport::{ListenerId, TransportError, TransportEvent}, + PeerId, StreamMuxer, Transport, +}; + +use libp2p_tls as tls; + +use std::{ + future::Future, + io, + net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{future::BoxFuture, FutureExt}; +use futures::{stream::SelectAll, AsyncRead, AsyncWrite, Stream, StreamExt}; + +mod in_addr; + +use in_addr::InAddr; + +pub struct QuicSubstream { + send: quinn::SendStream, + recv: quinn::RecvStream, + closed: bool, +} + +impl QuicSubstream { + fn new(send: quinn::SendStream, recv: quinn::RecvStream) -> Self { + Self { + send, + recv, + closed: false, + } + } +} + +impl AsyncRead for QuicSubstream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + AsyncRead::poll_read(Pin::new(&mut self.get_mut().recv), cx, buf) + } +} + +impl AsyncWrite for QuicSubstream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + AsyncWrite::poll_write(Pin::new(&mut self.get_mut().send), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + AsyncWrite::poll_flush(Pin::new(&mut self.get_mut().send), cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + if this.closed { + // For some reason poll_close needs to be 'fuse'able + return Poll::Ready(Ok(())); + } + let close_result = AsyncWrite::poll_close(Pin::new(&mut this.send), cx); + if close_result.is_ready() { + this.closed = true; + } + close_result + } +} + +pub struct QuicMuxer { + connection: quinn::Connection, + incoming: + BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>, + outgoing: + BoxFuture<'static, Result<(quinn::SendStream, quinn::RecvStream), quinn::ConnectionError>>, +} + +impl StreamMuxer for QuicMuxer { + type Substream = QuicSubstream; + type Error = quinn::ConnectionError; + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let (send, recv) = futures::ready!(this.incoming.poll_unpin(cx))?; + let connection = this.connection.clone(); + this.incoming = Box::pin(async move { connection.accept_bi().await }); + let substream = QuicSubstream::new(send, recv); + Poll::Ready(Ok(substream)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let (send, recv) = futures::ready!(this.outgoing.poll_unpin(cx))?; + let connection = this.connection.clone(); + this.outgoing = Box::pin(async move { connection.open_bi().await }); + let substream = QuicSubstream::new(send, recv); + Poll::Ready(Ok(substream)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + self.connection.close(From::from(0u32), &[]); + Poll::Ready(Ok(())) + } +} + +pub struct QuicUpgrade { + connecting: quinn::Connecting, +} + +impl QuicUpgrade { + /// Builds an [`Upgrade`] that wraps around a [`quinn::Connecting`]. + pub(crate) fn from_connecting(connecting: quinn::Connecting) -> Self { + QuicUpgrade { connecting } + } +} + +impl QuicUpgrade { + /// Returns the address of the node we're connected to. + /// Panics if the connection is still handshaking. + fn remote_peer_id(connection: &quinn::Connection) -> PeerId { + //debug_assert!(!connection.handshake_data().is_some()); + let identity = connection + .peer_identity() + .expect("connection got identity because it passed TLS handshake; qed"); + let certificates: Box> = + identity.downcast().expect("we rely on rustls feature; qed"); + let end_entity = certificates + .get(0) + .expect("there should be exactly one certificate; qed"); + let p2p_cert = tls::certificate::parse(end_entity) + .expect("the certificate was validated during TLS handshake; qed"); + p2p_cert.peer_id() + } +} + +impl Future for QuicUpgrade { + type Output = Result<(PeerId, QuicMuxer), io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let connecting = Pin::new(&mut self.get_mut().connecting); + + let connection = futures::ready!(connecting.poll(cx))?; + + let peer_id = QuicUpgrade::remote_peer_id(&connection); + let connection_c = connection.clone(); + let incoming = Box::pin(async move { connection_c.accept_bi().await }); + let connection_c = connection.clone(); + let outgoing = Box::pin(async move { connection_c.open_bi().await }); + let muxer = QuicMuxer { + connection, + incoming, + outgoing, + }; + Poll::Ready(Ok((peer_id, muxer))) + } +} + +/// Represents the configuration for the [`Endpoint`]. +#[derive(Debug, Clone)] +pub struct Config { + /// The client configuration to pass to `quinn`. + client_config: quinn::ClientConfig, + /// The server configuration to pass to `quinn`. + server_config: quinn::ServerConfig, +} + +impl Config { + /// Creates a new configuration object with default values. + pub fn new(keypair: &Keypair) -> Self { + let mut transport = quinn::TransportConfig::default(); + transport.max_concurrent_uni_streams(0u32.into()); // Can only panic if value is out of range. + transport.datagram_receive_buffer_size(None); + transport.keep_alive_interval(Some(Duration::from_millis(10))); + let transport = Arc::new(transport); + + let client_tls_config = tls::make_client_config(keypair, None).unwrap(); + let server_tls_config = tls::make_server_config(keypair).unwrap(); + + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_tls_config)); + server_config.transport = Arc::clone(&transport); + + let mut client_config = quinn::ClientConfig::new(Arc::new(client_tls_config)); + client_config.transport_config(transport); + Self { + client_config, + server_config, + } + } +} + +pub struct QuicTransport { + config: Config, + listeners: SelectAll, + /// Endpoints to use for dialing Ipv4 addresses if no matching listener exists. + ipv4_dialer: Option, + /// Endpoints to use for dialing Ipv6 addresses if no matching listener exists. + ipv6_dialer: Option, +} + +impl QuicTransport { + pub fn new(config: Config) -> Self { + Self { + config, + listeners: Default::default(), + ipv4_dialer: None, + ipv6_dialer: None, + } + } +} + +impl Transport for QuicTransport { + type Output = (PeerId, QuicMuxer); + type Error = io::Error; + type ListenerUpgrade = QuicUpgrade; + type Dial = BoxFuture<'static, Result>; + + fn listen_on(&mut self, addr: Multiaddr) -> Result> { + let socket_addr = + multiaddr_to_socketaddr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?; + + let client_config = self.config.client_config.clone(); + let server_config = self.config.server_config.clone(); + + let mut endpoint = quinn::Endpoint::server(server_config, socket_addr).unwrap(); + endpoint.set_default_client_config(client_config); + + let in_addr = InAddr::new(socket_addr.ip()).map_err(TransportError::Other)?; + + let listener_id = ListenerId::new(); + let listener = Listener::new(listener_id, endpoint, in_addr); + self.listeners.push(listener); + // Drop reference to dialer endpoint so that the endpoint is dropped once the last + // connection that uses it is closed. + // New outbound connections will use a bidirectional (listener) endpoint. + match socket_addr { + SocketAddr::V4(_) => self.ipv4_dialer.take(), + SocketAddr::V6(_) => self.ipv6_dialer.take(), + }; + Ok(listener_id) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) { + listener.close(Ok(())); + true + } else { + false + } + } + + fn address_translation(&self, _server: &Multiaddr, observed: &Multiaddr) -> Option { + Some(observed.clone()) + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let socket_addr = multiaddr_to_socketaddr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let listeners = self + .listeners + .iter() + .filter(|l| { + let listen_addr = l.socket_addr(); + listen_addr.is_ipv4() == socket_addr.is_ipv4() + && listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback() + }) + .collect::>(); + let endpoint = if listeners.is_empty() { + let dialer = match socket_addr { + SocketAddr::V4(_) => &mut self.ipv4_dialer, + SocketAddr::V6(_) => &mut self.ipv6_dialer, + }; + match dialer { + Some(endpoint) => endpoint.clone(), + None => { + let server_addr = if socket_addr.is_ipv6() { + SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0) + } else { + SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0) + }; + let client_config = self.config.client_config.clone(); + let server_config = self.config.server_config.clone(); + + let mut endpoint = quinn::Endpoint::server(server_config, server_addr).unwrap(); + endpoint.set_default_client_config(client_config); + let _ = dialer.insert(endpoint.clone()); + endpoint + } + } + } else { + // Pick a random listener to use for dialing. + let n = rand::random::() % listeners.len(); + let listener = listeners.get(n).expect("Can not be out of bound."); + listener.endpoint.clone() + }; + + Ok(Box::pin(async move { + let connecting = endpoint.connect(socket_addr, "server_name").unwrap(); + QuicUpgrade::from_connecting(connecting).await + })) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + // TODO: As the listener of a QUIC hole punch, we need to send a random UDP packet to the + // `addr`. See DCUtR specification below. + // + // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol + self.dial(addr) + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.listeners.poll_next_unpin(cx) { + Poll::Ready(Some(ev)) => Poll::Ready(ev), + _ => Poll::Pending, + } + } +} + +struct Listener { + listener_id: ListenerId, + endpoint: quinn::Endpoint, + + accept: BoxFuture<'static, Option>, + + /// The IP addresses of network interfaces on which the listening socket + /// is accepting connections. + /// + /// If the listen socket listens on all interfaces, these may change over + /// time as interfaces become available or unavailable. + in_addr: InAddr, + + /// Set to `Some` if this [`Listener`] should close. + /// Optionally contains a [`TransportEvent::ListenerClosed`] that should be + /// reported before the listener's stream is terminated. + report_closed: Option::Item>>, +} + +impl Listener { + fn new(listener_id: ListenerId, endpoint: quinn::Endpoint, in_addr: InAddr) -> Self { + let endpoint_c = endpoint.clone(); + let accept = Box::pin(async move { endpoint_c.accept().await }); + Self { + listener_id, + endpoint, + accept, + in_addr, + report_closed: None, + } + } + + /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and + /// terminate the stream. + fn close(&mut self, reason: Result<(), io::Error>) { + match self.report_closed { + Some(_) => println!("Listener was already closed."), + None => { + self.endpoint.close(From::from(0u32), &[]); + // Report the listener event as closed. + let _ = self + .report_closed + .insert(Some(TransportEvent::ListenerClosed { + listener_id: self.listener_id, + reason, + })); + } + } + } + + fn socket_addr(&self) -> SocketAddr { + self.endpoint.local_addr().unwrap() + } + + /// Poll for a next If Event. + fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Option<::Item> { + use if_watch::IfEvent; + loop { + match self.in_addr.poll_next_unpin(cx) { + Poll::Ready(mut item) => { + if let Some(item) = item.take() { + // Consume all events for up/down interface changes. + match item { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if self.socket_addr().is_ipv4() == ip.is_ipv4() { + let socket_addr = + SocketAddr::new(ip, self.socket_addr().port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + tracing::debug!("New listen address: {}", ma); + return Some(TransportEvent::NewAddress { + listener_id: self.listener_id, + listen_addr: ma, + }); + } + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if self.socket_addr().is_ipv4() == ip.is_ipv4() { + let socket_addr = + SocketAddr::new(ip, self.socket_addr().port()); + let ma = socketaddr_to_multiaddr(&socket_addr); + tracing::debug!("Expired listen address: {}", ma); + return Some(TransportEvent::AddressExpired { + listener_id: self.listener_id, + listen_addr: ma, + }); + } + } + Err(error) => { + tracing::debug! { + "Failure polling interfaces: {:?}.", + error + }; + return Some(TransportEvent::ListenerError { + listener_id: self.listener_id, + error, + }); + } + } + } + } + Poll::Pending => return None, + } + } + } +} + +impl Stream for Listener { + type Item = TransportEvent<::ListenerUpgrade, io::Error>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Some(closed) = this.report_closed.as_mut() { + // Listener was closed. + // Report the transport event if there is one. On the next iteration, return + // `Poll::Ready(None)` to terminate the stream. + return Poll::Ready(closed.take()); + } + if let Some(event) = this.poll_if_addr(cx) { + return Poll::Ready(Some(event)); + } + let connecting = match futures::ready!(this.accept.poll_unpin(cx)) { + Some(c) => { + let endpoint = this.endpoint.clone(); + this.accept = Box::pin(async move { endpoint.accept().await }); + c + } + None => { + this.close(Err(io::Error::from(quinn::ConnectionError::LocallyClosed))); // TODO Error: TaskCrashed + return Poll::Pending; // TODO recursive return this.poll_next + } + }; + + let local_addr = socketaddr_to_multiaddr(&this.socket_addr()); + let send_back_addr = socketaddr_to_multiaddr(&connecting.remote_address()); + let event = TransportEvent::Incoming { + upgrade: QuicUpgrade::from_connecting(connecting), + local_addr, + send_back_addr, + listener_id: this.listener_id, + }; + Poll::Ready(Some(event)) + } +} + +pub(crate) fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Option { + let mut iter = addr.iter(); + let proto1 = iter.next()?; + let proto2 = iter.next()?; + let proto3 = iter.next()?; + + for proto in iter { + match proto { + Protocol::P2p(_) => {} // Ignore a `/p2p/...` prefix of possibly outer protocols, if present. + _ => return None, + } + } + + match (proto1, proto2, proto3) { + (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::Quic) => { + Some(SocketAddr::new(ip.into(), port)) + } + (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::Quic) => { + Some(SocketAddr::new(ip.into(), port)) + } + _ => None, + } +} + +/// Turns an IP address and port into the corresponding QUIC multiaddr. +pub(crate) fn socketaddr_to_multiaddr(socket_addr: &SocketAddr) -> Multiaddr { + Multiaddr::empty() + .with(socket_addr.ip().into()) + .with(Protocol::Udp(socket_addr.port())) + .with(Protocol::Quic) +} + +#[cfg(test)] +mod test { + + use futures::{future::poll_fn, FutureExt}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + + use super::*; + + #[test] + fn multiaddr_to_udp_conversion() { + assert!( + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + .is_none() + ); + + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/12345/quic" + .parse::() + .unwrap() + ), + Some(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/255.255.255.255/udp/8080/quic" + .parse::() + .unwrap() + ), + Some(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), + 8080, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip4/127.0.0.1/udp/55148/quic/p2p/12D3KooW9xk7Zp1gejwfwNpfm6L9zH5NL4Bx5rm94LRYJJHJuARZ" + .parse::() + .unwrap() + ), + Some(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 55148, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&"/ip6/::1/udp/12345/quic".parse::().unwrap()), + Some(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr( + &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/udp/8080/quic" + .parse::() + .unwrap() + ), + Some(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535, + )), + 8080, + )) + ); + } + + #[async_std::test] + async fn close_listener() { + let keypair = libp2p_core::identity::Keypair::generate_ed25519(); + let mut transport = QuicTransport::new(Config::new(&keypair)); + + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + + // Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener` + // is temporarily empty. + for _ in 0..2 { + let listener = transport + .listen_on("/ip4/0.0.0.0/udp/0/quic".parse().unwrap()) + .unwrap(); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::NewAddress { + listener_id, + listen_addr, + } => { + assert_eq!(listener_id, listener); + assert!( + matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified()) + ); + assert!( + matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0) + ); + assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::Quic))); + } + e => panic!("Unexpected event: {:?}", e), + } + assert!( + transport.remove_listener(listener), + "Expect listener to exist." + ); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::ListenerClosed { + listener_id, + reason: Ok(()), + } => { + assert_eq!(listener_id, listener); + } + e => panic!("Unexpected event: {:?}", e), + } + // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and + // be removed from the list of listeners. + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + assert!(transport.listeners.is_empty()); + } + } +} diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs new file mode 100644 index 00000000000..f66610693a9 --- /dev/null +++ b/transports/quic/tests/smoke.rs @@ -0,0 +1,558 @@ +use anyhow::Result; +use async_trait::async_trait; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::select; +use futures::stream::StreamExt; +use libp2p::core::multiaddr::Protocol; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::{upgrade, ConnectedPoint, Transport}; +use libp2p::request_response::{ + ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, +}; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::{DialError, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p_quic::{Config as QuicConfig, QuicTransport}; +use rand::RngCore; +use std::{io, iter}; + +fn generate_tls_keypair() -> libp2p::identity::Keypair { + libp2p::identity::Keypair::generate_ed25519() +} + +#[tracing::instrument] +async fn create_swarm(keylog: bool) -> Result>> { + let keypair = generate_tls_keypair(); + let peer_id = keypair.public().to_peer_id(); + let config = QuicConfig::new(&keypair); + let transport = QuicTransport::new(config); + + // TODO: + // transport + // .transport + // .max_idle_timeout(Some(quinn_proto::VarInt::from_u32(1_000u32).into())); + // if keylog { + // transport.enable_keylogger(); + // } + + let transport = Transport::map(transport, |(peer, muxer), _| { + (peer, StreamMuxerBox::new(muxer)) + }) + .boxed(); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + let behaviour = RequestResponse::new(PingCodec(), protocols, cfg); + tracing::info!(?peer_id); + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|f| { + async_std::task::spawn(f); + })) + .build(); + Ok(swarm) +} + +fn setup_global_subscriber() { + let filter_layer = tracing_subscriber::EnvFilter::from_default_env(); + tracing_subscriber::fmt() + .with_env_filter(filter_layer) + .try_init() + .ok(); +} + +#[async_std::test] +async fn smoke() -> Result<()> { + setup_global_subscriber(); + let mut rng = rand::thread_rng(); + + let mut a = create_swarm(true).await?; + let mut b = create_swarm(false).await?; + + Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/quic".parse()?)?; + + let addr = match a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + tracing::info!(?addr); + + let mut data = vec![0; 4096 * 10]; + rng.fill_bytes(&mut data); + + b.behaviour_mut() + .add_address(&Swarm::local_peer_id(&a), addr); + b.behaviour_mut() + .send_request(&Swarm::local_peer_id(&a), Ping(data.clone())); + + match b.next().await { + Some(SwarmEvent::Dialing(_)) => {} + e => panic!("{:?}", e), + } + + match a.next().await { + Some(SwarmEvent::IncomingConnection { .. }) => {} + e => panic!("{:?}", e), + }; + + match b.next().await { + Some(SwarmEvent::ConnectionEstablished { .. }) => {} + e => panic!("{:?}", e), + }; + + match a.next().await { + Some(SwarmEvent::ConnectionEstablished { .. }) => {} + e => panic!("{:?}", e), + }; + + assert!(b.next().now_or_never().is_none()); + + match a.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(ping), + channel, + .. + }, + .. + })) => { + a.behaviour_mut() + .send_response(channel, Pong(ping)) + .unwrap(); + } + e => panic!("{:?}", e), + } + + match a.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} + e => panic!("{:?}", e), + } + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(pong), + .. + }, + .. + })) => assert_eq!(data, pong), + e => panic!("{:?}", e), + } + + a.behaviour_mut().send_request( + &Swarm::local_peer_id(&b), + Ping(b"another substream".to_vec()), + ); + + assert!(a.next().now_or_never().is_none()); + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(data), + channel, + .. + }, + .. + })) => { + b.behaviour_mut() + .send_response(channel, Pong(data)) + .unwrap(); + } + e => panic!("{:?}", e), + } + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} + e => panic!("{:?}", e), + } + + match a.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(data), + .. + }, + .. + })) => assert_eq!(data, b"another substream".to_vec()), + e => panic!("{:?}", e), + } + + Ok(()) +} + +#[derive(Debug, Clone)] +struct PingProtocol(); + +#[derive(Clone)] +struct PingCodec(); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct Ping(Vec); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct Pong(Vec); + +impl ProtocolName for PingProtocol { + fn protocol_name(&self) -> &[u8] { + "/ping/1".as_bytes() + } +} + +#[async_trait] +impl RequestResponseCodec for PingCodec { + type Protocol = PingProtocol; + type Request = Ping; + type Response = Pong; + + async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + upgrade::read_length_prefixed(io, 4096 * 10) + .map(|res| match res { + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), + Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), + Ok(vec) => Ok(Ping(vec)), + }) + .await + } + + async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + upgrade::read_length_prefixed(io, 4096 * 10) + .map(|res| match res { + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), + Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), + Ok(vec) => Ok(Pong(vec)), + }) + .await + } + + async fn write_request( + &mut self, + _: &PingProtocol, + io: &mut T, + Ping(data): Ping, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + upgrade::write_length_prefixed(io, data).await?; + io.close().await?; + Ok(()) + } + + async fn write_response( + &mut self, + _: &PingProtocol, + io: &mut T, + Pong(data): Pong, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + upgrade::write_length_prefixed(io, data).await?; + io.close().await?; + Ok(()) + } +} + +#[async_std::test] +async fn dial_failure() -> Result<()> { + setup_global_subscriber(); + + let mut a = create_swarm(false).await?; + let mut b = create_swarm(true).await?; + + Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/quic".parse()?)?; + + let addr = match a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + let a_peer_id = &Swarm::local_peer_id(&a).clone(); + drop(a); // stop a swarm so b can never reach it + + b.behaviour_mut().add_address(a_peer_id, addr); + b.behaviour_mut() + .send_request(a_peer_id, Ping(b"hello world".to_vec())); + + match b.next().await { + Some(SwarmEvent::Dialing(_)) => {} + e => panic!("{:?}", e), + } + + match b.next().await { + Some(SwarmEvent::OutgoingConnectionError { .. }) => {} + e => panic!("{:?}", e), + }; + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { .. })) => {} + e => panic!("{:?}", e), + }; + + Ok(()) +} + +#[async_std::test] +async fn concurrent_connections_and_streams() { + setup_global_subscriber(); + + let number_listeners = 10; + let number_streams = 10; + + let mut data = vec![0; 4096 * 10]; + rand::thread_rng().fill_bytes(&mut data); + let mut listeners = vec![]; + + // Spawn the listener nodes. + for _ in 0..number_listeners { + let mut listener = create_swarm(true).await.unwrap(); + Swarm::listen_on(&mut listener, "/ip4/127.0.0.1/udp/0/quic".parse().unwrap()).unwrap(); + + // Wait to listen on address. + let addr = match listener.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + listeners.push((*listener.local_peer_id(), addr)); + + async_std::task::spawn(async move { + loop { + match listener.next().await { + Some(SwarmEvent::ConnectionEstablished { .. }) => { + tracing::info!("listener ConnectionEstablished"); + } + Some(SwarmEvent::IncomingConnection { .. }) => { + tracing::info!("listener IncomingConnection"); + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(ping), + channel, + .. + }, + .. + })) => { + tracing::info!("listener got Message"); + listener + .behaviour_mut() + .send_response(channel, Pong(ping)) + .unwrap(); + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => { + tracing::info!("listener ResponseSent"); + } + Some(SwarmEvent::ConnectionClosed { .. }) => {} + Some(e) => { + panic!("unexpected event {:?}", e); + } + None => { + panic!("listener stopped"); + } + } + } + }); + } + + let mut dialer = create_swarm(true).await.unwrap(); + + // For each listener node start `number_streams` requests. + for (listener_peer_id, listener_addr) in &listeners { + dialer + .behaviour_mut() + .add_address(&listener_peer_id, listener_addr.clone()); + + dialer.dial(listener_peer_id.clone()).unwrap(); + } + + // Wait for responses to each request. + let mut num_responses = 0; + loop { + match dialer.next().await { + Some(SwarmEvent::Dialing(_)) => { + tracing::info!("dialer Dialing"); + } + Some(SwarmEvent::ConnectionEstablished { peer_id, .. }) => { + tracing::info!("dialer Connection established"); + for _ in 0..number_streams { + dialer + .behaviour_mut() + .send_request(&peer_id, Ping(data.clone())); + } + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(pong), + .. + }, + .. + })) => { + tracing::info!("dialer got Message"); + num_responses += 1; + assert_eq!(data, pong); + let should_be = number_listeners as usize * (number_streams) as usize; + tracing::info!(?num_responses, ?should_be); + if num_responses == should_be { + break; + } + } + Some(SwarmEvent::ConnectionClosed { .. }) => { + tracing::info!("dialer ConnectionClosed"); + } + e => { + panic!("unexpected event {:?}", e); + } + } + } +} + +#[ignore] +#[async_std::test] +async fn endpoint_reuse() -> Result<()> { + setup_global_subscriber(); + + let mut swarm_a = create_swarm(false).await?; + let mut swarm_b = create_swarm(false).await?; + let b_peer_id = *swarm_b.local_peer_id(); + + swarm_a.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?; + let a_addr = match swarm_a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + swarm_b.dial(a_addr.clone()).unwrap(); + let b_send_back_addr = loop { + select! { + ev = swarm_a.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { endpoint, .. } => { + break endpoint.get_remote_address().clone() + } + SwarmEvent::IncomingConnection { local_addr, ..} => { + assert!(swarm_a.listeners().any(|a| a == &local_addr)); + } + e => panic!("{:?}", e), + }, + ev = swarm_b.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { .. } => {}, + e => panic!("{:?}", e), + } + } + }; + + let dial_opts = DialOpts::peer_id(b_peer_id) + .addresses(vec![b_send_back_addr.clone()]) + .extend_addresses_through_behaviour() + .condition(PeerCondition::Always) + .build(); + swarm_a.dial(dial_opts).unwrap(); + + // Expect the dial to fail since b is not listening on an address. + loop { + select! { + ev = swarm_a.select_next_some() => match ev { + e @ SwarmEvent::ConnectionEstablished { .. } => { + panic!("Unexpected dial success: {:?}", e) + }, + SwarmEvent::OutgoingConnectionError {error, .. } => { + assert!(matches!(error, DialError::Transport(_))); + break + } + _ => {} + }, + _ = swarm_b.select_next_some() => {}, + } + } + swarm_b.listen_on("/ip4/127.0.0.1/udp/0/quic".parse()?)?; + let b_addr = match swarm_b.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + let dial_opts = DialOpts::peer_id(b_peer_id) + .addresses(vec![b_addr.clone(), b_send_back_addr]) + .condition(PeerCondition::Always) + .build(); + swarm_a.dial(dial_opts).unwrap(); + let expected_b_addr = b_addr.with(Protocol::P2p(b_peer_id.into())); + + let mut a_reported = false; + let mut b_reported = false; + while !a_reported || !b_reported { + select! { + ev = swarm_a.select_next_some() => match ev{ + SwarmEvent::ConnectionEstablished { endpoint, ..} => { + assert!(endpoint.is_dialer()); + assert_eq!(endpoint.get_remote_address(), &expected_b_addr); + a_reported = true; + } + SwarmEvent::OutgoingConnectionError {error, .. } => { + panic!("Unexpected error {:}", error) + } + _ => {} + }, + ev = swarm_b.select_next_some() => match ev{ + SwarmEvent::ConnectionEstablished { endpoint, ..} => { + match endpoint { + ConnectedPoint::Dialer{..} => panic!("Unexpected outbound connection"), + ConnectedPoint::Listener {send_back_addr, local_addr} => { + // Expect that the local listening endpoint was used for dialing. + assert!(swarm_b.listeners().any(|a| a == &local_addr)); + assert_eq!(send_back_addr, a_addr); + b_reported = true; + } + } + } + _ => {} + }, + } + } + + Ok(()) +} + +#[async_std::test] +async fn ipv4_dial_ipv6() -> Result<()> { + setup_global_subscriber(); + + let mut swarm_a = create_swarm(false).await?; + let mut swarm_b = create_swarm(false).await?; + + swarm_a.listen_on("/ip6/::1/udp/0/quic".parse()?)?; + let a_addr = match swarm_a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + swarm_b.dial(a_addr.clone()).unwrap(); + + loop { + select! { + ev = swarm_a.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { .. } => { + return Ok(()) + } + SwarmEvent::IncomingConnection { local_addr, ..} => { + assert!(swarm_a.listeners().any(|a| a == &local_addr)); + } + e => panic!("{:?}", e), + }, + ev = swarm_b.select_next_some() => match ev { + SwarmEvent::ConnectionEstablished { .. } => {}, + e => panic!("{:?}", e), + } + } + } +}