diff --git a/Cargo.toml b/Cargo.toml index 0ceed838ed6..47dfd2014df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "core", "examples/autonat", + "examples/autonatv2", "examples/browser-webrtc", "examples/chat", "examples/dcutr", @@ -46,9 +47,9 @@ members = [ "protocols/rendezvous", "protocols/request-response", "protocols/upnp", - "swarm", "swarm-derive", "swarm-test", + "swarm", "transports/dns", "transports/noise", "transports/plaintext", @@ -57,12 +58,12 @@ members = [ "transports/tcp", "transports/tls", "transports/uds", - "transports/webrtc", "transports/webrtc-websys", + "transports/webrtc", + "transports/websocket-websys", "transports/websocket", "transports/webtransport-websys", - "transports/websocket-websys", - "wasm-tests/webtransport-tests", "examples/autonatv2", + "wasm-tests/webtransport-tests", ] resolver = "2" diff --git a/core/src/transport/global_only.rs b/core/src/transport/global_only.rs index a40f99054d0..d975070ea0f 100644 --- a/core/src/transport/global_only.rs +++ b/core/src/transport/global_only.rs @@ -17,272 +17,250 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -// -mod ip { - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - trait Ipv4Ext { - /// Returns [`true`] if this address is reserved by IANA for future use. [IETF RFC 1112] - /// defines the block of reserved addresses as `240.0.0.0/4`. This range normally includes the - /// broadcast address `255.255.255.255`, but this implementation explicitly excludes it, since - /// it is obviously not reserved for future use. - /// - /// [IETF RFC 1112]: https://tools.ietf.org/html/rfc1112 - /// - /// # Warning - /// - /// As IANA assigns new addresses, this method will be - /// updated. This may result in non-reserved addresses being - /// treated as reserved in code that relies on an outdated version - /// of this method. - #[must_use] - fn is_reserved(&self) -> bool; - /// Returns [`true`] if this address part of the `198.18.0.0/15` range, which is reserved for - /// network devices benchmarking. This range is defined in [IETF RFC 2544] as `192.18.0.0` - /// through `198.19.255.255` but [errata 423] corrects it to `198.18.0.0/15`. - /// - /// [IETF RFC 2544]: https://tools.ietf.org/html/rfc2544 - /// [errata 423]: https://www.rfc-editor.org/errata/eid423 - #[must_use] - fn is_benchmarking(&self) -> bool; - /// Returns [`true`] if this address is part of the Shared Address Space defined in - /// [IETF RFC 6598] (`100.64.0.0/10`). - /// - /// [IETF RFC 6598]: https://tools.ietf.org/html/rfc6598 - #[must_use] - fn is_shared(&self) -> bool; - /// Returns [`true`] if this is a private address. - /// - /// The private address ranges are defined in [IETF RFC 1918] and include: - /// - /// - `10.0.0.0/8` - /// - `172.16.0.0/12` - /// - `192.168.0.0/16` - /// - /// [IETF RFC 1918]: https://tools.ietf.org/html/rfc1918 - #[must_use] - fn is_private(&self) -> bool; +use crate::{ + multiaddr::{Multiaddr, Protocol}, + transport::{DialOpts, ListenerId, TransportError, TransportEvent}, +}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// Dropping all dial requests to non-global IP addresses. +#[derive(Debug, Clone, Default)] +pub struct Transport { + inner: T, +} + +/// This module contains an implementation of the `is_global` IPv4 address space. +/// +/// Credit for this implementation goes to the Rust standard library team. +/// +/// Unstable tracking issue: [#27709](https://github.com/rust-lang/rust/issues/27709) +mod ipv4_global { + use std::net::Ipv4Addr; + + /// Returns [`true`] if this address is reserved by IANA for future use. [IETF RFC 1112] + /// defines the block of reserved addresses as `240.0.0.0/4`. This range normally includes the + /// broadcast address `255.255.255.255`, but this implementation explicitly excludes it, since + /// it is obviously not reserved for future use. + /// + /// [IETF RFC 1112]: https://tools.ietf.org/html/rfc1112 + /// + /// # Warning + /// + /// As IANA assigns new addresses, this method will be + /// updated. This may result in non-reserved addresses being + /// treated as reserved in code that relies on an outdated version + /// of this method. + #[must_use] + #[inline] + const fn is_reserved(a: Ipv4Addr) -> bool { + a.octets()[0] & 240 == 240 && !a.is_broadcast() } - impl Ipv4Ext for Ipv4Addr { - #[inline] - fn is_reserved(&self) -> bool { - self.octets()[0] & 240 == 240 && !self.is_broadcast() - } - #[inline] - fn is_benchmarking(&self) -> bool { - self.octets()[0] == 198 && (self.octets()[1] & 0xfe) == 18 - } - #[inline] - fn is_shared(&self) -> bool { - self.octets()[0] == 100 && (self.octets()[1] & 0b1100_0000 == 0b0100_0000) - } - #[inline] - fn is_private(&self) -> bool { - match self.octets() { - [10, ..] => true, - [172, b, ..] if (16..=31).contains(&b) => true, - [192, 168, ..] => true, - _ => false, - } - } + /// Returns [`true`] if this address part of the `198.18.0.0/15` range, which is reserved for + /// network devices benchmarking. This range is defined in [IETF RFC 2544] as `192.18.0.0` + /// through `198.19.255.255` but [errata 423] corrects it to `198.18.0.0/15`. + /// + /// [IETF RFC 2544]: https://tools.ietf.org/html/rfc2544 + /// [errata 423]: https://www.rfc-editor.org/errata/eid423 + #[must_use] + #[inline] + const fn is_benchmarking(a: Ipv4Addr) -> bool { + a.octets()[0] == 198 && (a.octets()[1] & 0xfe) == 18 } - trait Ipv6Ext { - /// Returns `true` if the address is a unicast address with link-local scope, - /// as defined in [RFC 4291]. - /// - /// A unicast address has link-local scope if it has the prefix `fe80::/10`, as per [RFC 4291 section 2.4]. - /// Note that this encompasses more addresses than those defined in [RFC 4291 section 2.5.6], - /// which describes "Link-Local IPv6 Unicast Addresses" as having the following stricter format: - /// - /// ```text - /// | 10 bits | 54 bits | 64 bits | - /// +----------+-------------------------+----------------------------+ - /// |1111111010| 0 | interface ID | - /// +----------+-------------------------+----------------------------+ - /// ``` - /// So while currently the only addresses with link-local scope an application will encounter are all in `fe80::/64`, - /// this might change in the future with the publication of new standards. More addresses in `fe80::/10` could be allocated, - /// and those addresses will have link-local scope. - /// - /// Also note that while [RFC 4291 section 2.5.3] mentions about the [loopback address] (`::1`) that "it is treated as having Link-Local scope", - /// this does not mean that the loopback address actually has link-local scope and this method will return `false` on it. - /// - /// [RFC 4291]: https://tools.ietf.org/html/rfc4291 - /// [RFC 4291 section 2.4]: https://tools.ietf.org/html/rfc4291#section-2.4 - /// [RFC 4291 section 2.5.3]: https://tools.ietf.org/html/rfc4291#section-2.5.3 - /// [RFC 4291 section 2.5.6]: https://tools.ietf.org/html/rfc4291#section-2.5.6 - /// [loopback address]: Ipv6Addr::LOCALHOST - #[must_use] - fn is_unicast_link_local(&self) -> bool; - /// Returns [`true`] if this is a unique local address (`fc00::/7`). - /// - /// This property is defined in [IETF RFC 4193]. - /// - /// [IETF RFC 4193]: https://tools.ietf.org/html/rfc4193 - #[must_use] - fn is_unique_local(&self) -> bool; - /// Returns [`true`] if this is an address reserved for documentation - /// (`2001:db8::/32`). - /// - /// This property is defined in [IETF RFC 3849]. - /// - /// [IETF RFC 3849]: https://tools.ietf.org/html/rfc3849 - #[must_use] - fn is_documentation(&self) -> bool; + /// Returns [`true`] if this address is part of the Shared Address Space defined in + /// [IETF RFC 6598] (`100.64.0.0/10`). + /// + /// [IETF RFC 6598]: https://tools.ietf.org/html/rfc6598 + #[must_use] + #[inline] + const fn is_shared(a: Ipv4Addr) -> bool { + a.octets()[0] == 100 && (a.octets()[1] & 0b1100_0000 == 0b0100_0000) } - impl Ipv6Ext for Ipv6Addr { - #[inline] - fn is_unicast_link_local(&self) -> bool { - (self.segments()[0] & 0xffc0) == 0xfe80 + /// Returns [`true`] if this is a private address. + /// + /// The private address ranges are defined in [IETF RFC 1918] and include: + /// + /// - `10.0.0.0/8` + /// - `172.16.0.0/12` + /// - `192.168.0.0/16` + /// + /// [IETF RFC 1918]: https://tools.ietf.org/html/rfc1918 + #[must_use] + #[inline] + const fn is_private(a: Ipv4Addr) -> bool { + match a.octets() { + [10, ..] => true, + [172, b, ..] if b >= 16 && b <= 31 => true, + [192, 168, ..] => true, + _ => false, } + } - #[inline] - fn is_unique_local(&self) -> bool { - (self.segments()[0] & 0xfe00) == 0xfc00 - } + /// Returns [`true`] if the address appears to be globally reachable + /// as specified by the [IANA IPv4 Special-Purpose Address Registry]. + /// Whether or not an address is practically reachable will depend on your network configuration. + /// + /// Most IPv4 addresses are globally reachable; + /// unless they are specifically defined as *not* globally reachable. + /// + /// Non-exhaustive list of notable addresses that are not globally reachable: + /// + /// - The [unspecified address] ([`is_unspecified`](Ipv4Addr::is_unspecified)) + /// - Addresses reserved for private use ([`is_private`](Ipv4Addr::is_private)) + /// - Addresses in the shared address space ([`is_shared`](Ipv4Addr::is_shared)) + /// - Loopback addresses ([`is_loopback`](Ipv4Addr::is_loopback)) + /// - Link-local addresses ([`is_link_local`](Ipv4Addr::is_link_local)) + /// - Addresses reserved for documentation ([`is_documentation`](Ipv4Addr::is_documentation)) + /// - Addresses reserved for benchmarking ([`is_benchmarking`](Ipv4Addr::is_benchmarking)) + /// - Reserved addresses ([`is_reserved`](Ipv4Addr::is_reserved)) + /// - The [broadcast address] ([`is_broadcast`](Ipv4Addr::is_broadcast)) + /// + /// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv4 Special-Purpose Address Registry]. + /// + /// [IANA IPv4 Special-Purpose Address Registry]: https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml + /// [unspecified address]: Ipv4Addr::UNSPECIFIED + /// [broadcast address]: Ipv4Addr::BROADCAST + #[must_use] + #[inline] + pub(crate) const fn is_global(a: Ipv4Addr) -> bool { + !(a.octets()[0] == 0 // "This network" + || is_private(a) + || is_shared(a) + || a.is_loopback() + || a.is_link_local() + // addresses reserved for future protocols (`192.0.0.0/24`) + ||(a.octets()[0] == 192 && a.octets()[1] == 0 && a.octets()[2] == 0) + || a.is_documentation() + || is_benchmarking(a) + || is_reserved(a) + || a.is_broadcast()) + } +} - #[inline] - fn is_documentation(&self) -> bool { - (self.segments()[0] == 0x2001) && (self.segments()[1] == 0xdb8) - } +/// This module contains an implementation of the `is_global` IPv6 address space. +/// +/// Credit for this implementation goes to the Rust standard library team. +/// +/// Unstable tracking issue: [#27709](https://github.com/rust-lang/rust/issues/27709) +mod ipv6_global { + use std::net::Ipv6Addr; + + /// Returns `true` if the address is a unicast address with link-local scope, + /// as defined in [RFC 4291]. + /// + /// A unicast address has link-local scope if it has the prefix `fe80::/10`, as per [RFC 4291 section 2.4]. + /// Note that this encompasses more addresses than those defined in [RFC 4291 section 2.5.6], + /// which describes "Link-Local IPv6 Unicast Addresses" as having the following stricter format: + /// + /// ```text + /// | 10 bits | 54 bits | 64 bits | + /// +----------+-------------------------+----------------------------+ + /// |1111111010| 0 | interface ID | + /// +----------+-------------------------+----------------------------+ + /// ``` + /// So while currently the only addresses with link-local scope an application will encounter are all in `fe80::/64`, + /// this might change in the future with the publication of new standards. More addresses in `fe80::/10` could be allocated, + /// and those addresses will have link-local scope. + /// + /// Also note that while [RFC 4291 section 2.5.3] mentions about the [loopback address] (`::1`) that "it is treated as having Link-Local scope", + /// this does not mean that the loopback address actually has link-local scope and this method will return `false` on it. + /// + /// [RFC 4291]: https://tools.ietf.org/html/rfc4291 + /// [RFC 4291 section 2.4]: https://tools.ietf.org/html/rfc4291#section-2.4 + /// [RFC 4291 section 2.5.3]: https://tools.ietf.org/html/rfc4291#section-2.5.3 + /// [RFC 4291 section 2.5.6]: https://tools.ietf.org/html/rfc4291#section-2.5.6 + /// [loopback address]: Ipv6Addr::LOCALHOST + #[must_use] + #[inline] + const fn is_unicast_link_local(a: Ipv6Addr) -> bool { + (a.segments()[0] & 0xffc0) == 0xfe80 } - pub(super) trait IpExt { - /// Returns [`true`] if the address appears to be globally routable. - /// - /// See the documentation for [`Ipv4Addr::is_global()`] and Ipv6Addr::is_global() for more details. - #[must_use] - fn is_global(&self) -> bool; + /// Returns [`true`] if this is a unique local address (`fc00::/7`). + /// + /// This property is defined in [IETF RFC 4193]. + /// + /// [IETF RFC 4193]: https://tools.ietf.org/html/rfc4193 + #[must_use] + #[inline] + const fn is_unique_local(a: Ipv6Addr) -> bool { + (a.segments()[0] & 0xfe00) == 0xfc00 } - impl IpExt for Ipv4Addr { - /// Returns [`true`] if the address appears to be globally reachable - /// as specified by the [IANA IPv4 Special-Purpose Address Registry]. - /// Whether or not an address is practically reachable will depend on your network configuration. - /// - /// Most IPv4 addresses are globally reachable; - /// unless they are specifically defined as *not* globally reachable. - /// - /// Non-exhaustive list of notable addresses that are not globally reachable: - /// - /// - The [unspecified address] ([`is_unspecified`](Ipv4Addr::is_unspecified)) - /// - Addresses reserved for private use ([`is_private`](Ipv4Addr::is_private)) - /// - Addresses in the shared address space ([`is_shared`](Ipv4Addr::is_shared)) - /// - Loopback addresses ([`is_loopback`](Ipv4Addr::is_loopback)) - /// - Link-local addresses ([`is_link_local`](Ipv4Addr::is_link_local)) - /// - Addresses reserved for documentation ([`is_documentation`](Ipv4Addr::is_documentation)) - /// - Addresses reserved for benchmarking ([`is_benchmarking`](Ipv4Addr::is_benchmarking)) - /// - Reserved addresses ([`is_reserved`](Ipv4Addr::is_reserved)) - /// - The [broadcast address] ([`is_broadcast`](Ipv4Addr::is_broadcast)) - /// - /// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv4 Special-Purpose Address Registry]. - /// - /// [IANA IPv4 Special-Purpose Address Registry]: https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml - /// [unspecified address]: Ipv4Addr::UNSPECIFIED - /// [broadcast address]: Ipv4Addr::BROADCAST - #[inline] - fn is_global(&self) -> bool { - !(self.octets()[0] == 0 // "This network" - || self.is_private() - || Ipv4Ext::is_shared(self) - || self.is_loopback() - || self.is_link_local() - // addresses reserved for future protocols (`192.0.0.0/24`) - ||(self.octets()[0] == 192 && self.octets()[1] == 0 && self.octets()[2] == 0) - || self.is_documentation() - || Ipv4Ext::is_benchmarking(self) - || Ipv4Ext::is_reserved(self) - || self.is_broadcast()) - } + /// Returns [`true`] if this is an address reserved for documentation + /// (`2001:db8::/32`). + /// + /// This property is defined in [IETF RFC 3849]. + /// + /// [IETF RFC 3849]: https://tools.ietf.org/html/rfc3849 + #[must_use] + #[inline] + const fn is_documentation(a: Ipv6Addr) -> bool { + (a.segments()[0] == 0x2001) && (a.segments()[1] == 0xdb8) } - impl IpExt for Ipv6Addr { - /// Returns [`true`] if the address appears to be globally reachable - /// as specified by the [IANA IPv6 Special-Purpose Address Registry]. - /// Whether or not an address is practically reachable will depend on your network configuration. - /// - /// Most IPv6 addresses are globally reachable; - /// unless they are specifically defined as *not* globally reachable. - /// - /// Non-exhaustive list of notable addresses that are not globally reachable: - /// - The [unspecified address] ([`is_unspecified`](Ipv6Addr::is_unspecified)) - /// - The [loopback address] ([`is_loopback`](Ipv6Addr::is_loopback)) - /// - IPv4-mapped addresses - /// - Addresses reserved for benchmarking - /// - Addresses reserved for documentation ([`is_documentation`](Ipv6Addr::is_documentation)) - /// - Unique local addresses ([`is_unique_local`](Ipv6Addr::is_unique_local)) - /// - Unicast addresses with link-local scope ([`is_unicast_link_local`](Ipv6Addr::is_unicast_link_local)) - /// - /// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv6 Special-Purpose Address Registry]. - /// - /// Note that an address having global scope is not the same as being globally reachable, - /// and there is no direct relation between the two concepts: There exist addresses with global scope - /// that are not globally reachable (for example unique local addresses), - /// and addresses that are globally reachable without having global scope - /// (multicast addresses with non-global scope). - /// - /// [IANA IPv6 Special-Purpose Address Registry]: https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml - /// [unspecified address]: Ipv6Addr::UNSPECIFIED - /// [loopback address]: Ipv6Addr::LOCALHOST - #[inline] - fn is_global(&self) -> bool { - !(self.is_unspecified() - || self.is_loopback() + /// Returns [`true`] if the address appears to be globally reachable + /// as specified by the [IANA IPv6 Special-Purpose Address Registry]. + /// Whether or not an address is practically reachable will depend on your network configuration. + /// + /// Most IPv6 addresses are globally reachable; + /// unless they are specifically defined as *not* globally reachable. + /// + /// Non-exhaustive list of notable addresses that are not globally reachable: + /// - The [unspecified address] ([`is_unspecified`](Ipv6Addr::is_unspecified)) + /// - The [loopback address] ([`is_loopback`](Ipv6Addr::is_loopback)) + /// - IPv4-mapped addresses + /// - Addresses reserved for benchmarking + /// - Addresses reserved for documentation ([`is_documentation`](Ipv6Addr::is_documentation)) + /// - Unique local addresses ([`is_unique_local`](Ipv6Addr::is_unique_local)) + /// - Unicast addresses with link-local scope ([`is_unicast_link_local`](Ipv6Addr::is_unicast_link_local)) + /// + /// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv6 Special-Purpose Address Registry]. + /// + /// Note that an address having global scope is not the same as being globally reachable, + /// and there is no direct relation between the two concepts: There exist addresses with global scope + /// that are not globally reachable (for example unique local addresses), + /// and addresses that are globally reachable without having global scope + /// (multicast addresses with non-global scope). + /// + /// [IANA IPv6 Special-Purpose Address Registry]: https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml + /// [unspecified address]: Ipv6Addr::UNSPECIFIED + /// [loopback address]: Ipv6Addr::LOCALHOST + #[must_use] + #[inline] + pub(crate) const fn is_global(a: Ipv6Addr) -> bool { + !(a.is_unspecified() + || a.is_loopback() // IPv4-mapped Address (`::ffff:0:0/96`) - || matches!(self.segments(), [0, 0, 0, 0, 0, 0xffff, _, _]) + || matches!(a.segments(), [0, 0, 0, 0, 0, 0xffff, _, _]) // IPv4-IPv6 Translat. (`64:ff9b:1::/48`) - || matches!(self.segments(), [0x64, 0xff9b, 1, _, _, _, _, _]) + || matches!(a.segments(), [0x64, 0xff9b, 1, _, _, _, _, _]) // Discard-Only Address Block (`100::/64`) - || matches!(self.segments(), [0x100, 0, 0, 0, _, _, _, _]) + || matches!(a.segments(), [0x100, 0, 0, 0, _, _, _, _]) // IETF Protocol Assignments (`2001::/23`) - || (matches!(self.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200) + || (matches!(a.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200) && !( // Port Control Protocol Anycast (`2001:1::1`) - u128::from_be_bytes(self.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001 + u128::from_be_bytes(a.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001 // Traversal Using Relays around NAT Anycast (`2001:1::2`) - || u128::from_be_bytes(self.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002 + || u128::from_be_bytes(a.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002 // AMT (`2001:3::/32`) - || matches!(self.segments(), [0x2001, 3, _, _, _, _, _, _]) + || matches!(a.segments(), [0x2001, 3, _, _, _, _, _, _]) // AS112-v6 (`2001:4:112::/48`) - || matches!(self.segments(), [0x2001, 4, 0x112, _, _, _, _, _]) + || matches!(a.segments(), [0x2001, 4, 0x112, _, _, _, _, _]) // ORCHIDv2 (`2001:20::/28`) - || matches!(self.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x2f).contains(&b)) + || matches!(a.segments(), [0x2001, b, _, _, _, _, _, _] if b >= 0x20 && b <= 0x2F) )) - || Ipv6Ext::is_documentation(self) - || Ipv6Ext::is_unique_local(self) - || Ipv6Ext::is_unicast_link_local(self)) - } + || is_documentation(a) + || is_unique_local(a) + || is_unicast_link_local(a)) } - - impl IpExt for IpAddr { - #[inline] - fn is_global(&self) -> bool { - match self { - Self::V4(v4) => IpExt::is_global(v4), - Self::V6(v6) => IpExt::is_global(v6), - } - } - } -} - -use crate::{ - multiaddr::{Multiaddr, Protocol}, - transport::{DialOpts, ListenerId, TransportError, TransportEvent}, -}; -use ip::IpExt; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tracing::debug; - -/// Dropping all dial requests to non-global IP addresses. -#[derive(Debug, Clone, Default)] -pub struct Transport { - inner: T, } impl Transport { @@ -316,15 +294,15 @@ impl crate::Transport for Transport { ) -> Result> { match addr.iter().next() { Some(Protocol::Ip4(a)) => { - if !IpExt::is_global(&a) { - debug!("Not dialing non global IP address {:?}.", a); + if !ipv4_global::is_global(a) { + tracing::debug!(ip=%a, "Not dialing non global IP address"); return Err(TransportError::MultiaddrNotSupported(addr)); } self.inner.dial(addr, opts) } Some(Protocol::Ip6(a)) => { - if !IpExt::is_global(&a) { - debug!("Not dialing non global IP address {:?}.", a); + if !ipv6_global::is_global(a) { + tracing::debug!(ip=%a, "Not dialing non global IP address"); return Err(TransportError::MultiaddrNotSupported(addr)); } self.inner.dial(addr, opts) diff --git a/protocols/autonat/src/lib.rs b/protocols/autonat/src/lib.rs index 8a06bea6605..a6fc66b28d1 100644 --- a/protocols/autonat/src/lib.rs +++ b/protocols/autonat/src/lib.rs @@ -5,7 +5,5 @@ pub mod v1; pub mod v2; #[cfg(feature = "v1")] -#[deprecated(since = "0.13.0", note = "Please use `v1` module instead.")] +#[allow(deprecated)] pub use v1::*; - - diff --git a/protocols/autonat/src/v1.rs b/protocols/autonat/src/v1.rs index 9974d7af59a..245748125cf 100644 --- a/protocols/autonat/src/v1.rs +++ b/protocols/autonat/src/v1.rs @@ -21,9 +21,10 @@ //! Implementation of the [AutoNAT](https://github.com/libp2p/specs/blob/master/autonat/README.md) protocol. #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![deprecated(note = "Please use `v2` module instead.")] -mod behaviour; -mod protocol; +pub(crate) mod behaviour; +pub(crate) mod protocol; pub use self::{ behaviour::{ @@ -34,7 +35,7 @@ pub use self::{ }; pub use libp2p_request_response::{InboundFailure, OutboundFailure}; -mod proto { +pub(crate) mod proto { #![allow(unreachable_pub)] include!("v1/generated/mod.rs"); pub(crate) use self::structs::{mod_Message::*, Message}; diff --git a/protocols/autonat/src/v1/behaviour.rs b/protocols/autonat/src/v1/behaviour.rs index b60fcbb635c..4104e9688dc 100644 --- a/protocols/autonat/src/v1/behaviour.rs +++ b/protocols/autonat/src/v1/behaviour.rs @@ -21,8 +21,8 @@ mod as_client; mod as_server; -use crate::v1::protocol::{AutoNatCodec, DialRequest, DialResponse, ResponseError}; -use crate::v1::DEFAULT_PROTOCOL_NAME; +use crate::protocol::{AutoNatCodec, DialRequest, DialResponse, ResponseError}; +use crate::DEFAULT_PROTOCOL_NAME; use as_client::AsClient; pub use as_client::{OutboundProbeError, OutboundProbeEvent}; use as_server::AsServer; diff --git a/protocols/autonat/src/v1/behaviour/as_client.rs b/protocols/autonat/src/v1/behaviour/as_client.rs index 867a5bf171d..668f3b93719 100644 --- a/protocols/autonat/src/v1/behaviour/as_client.rs +++ b/protocols/autonat/src/v1/behaviour/as_client.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::v1::ResponseError; +use crate::ResponseError; use super::{ Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus, diff --git a/protocols/autonat/src/v1/behaviour/as_server.rs b/protocols/autonat/src/v1/behaviour/as_server.rs index 4e3cfc77891..878fd713dda 100644 --- a/protocols/autonat/src/v1/behaviour/as_server.rs +++ b/protocols/autonat/src/v1/behaviour/as_server.rs @@ -135,7 +135,6 @@ impl<'a> HandleInnerEvent for AsServer<'a> { NonZeroU8::new(1).expect("1 > 0"), ) .addresses(addrs) - .allocate_new_port() .build(), }, ]) diff --git a/protocols/autonat/src/v1/protocol.rs b/protocols/autonat/src/v1/protocol.rs index bee29cae44c..c1862058400 100644 --- a/protocols/autonat/src/v1/protocol.rs +++ b/protocols/autonat/src/v1/protocol.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::v1::proto; +use crate::proto; use async_trait::async_trait; use asynchronous_codec::{FramedRead, FramedWrite}; use futures::io::{AsyncRead, AsyncWrite}; diff --git a/protocols/autonat/src/v2.rs b/protocols/autonat/src/v2.rs index 0fae928921b..885c8fda4d0 100644 --- a/protocols/autonat/src/v2.rs +++ b/protocols/autonat/src/v2.rs @@ -6,9 +6,9 @@ mod global_only; pub(crate) mod protocol; pub mod server; -pub(crate) const REQUEST_PROTOCOL_NAME: StreamProtocol = +pub(crate) const DIAL_REQUEST_PROTOCOL: StreamProtocol = StreamProtocol::new("/libp2p/autonat/2/dial-request"); -pub(crate) const DIAL_BACK_PROTOCOL_NAME: StreamProtocol = +pub(crate) const DIAL_BACK_PROTOCOL: StreamProtocol = StreamProtocol::new("/libp2p/autonat/2/dial-back"); type Nonce = u64; diff --git a/protocols/autonat/src/v2/client/behaviour.rs b/protocols/autonat/src/v2/client/behaviour.rs index 657bee24e86..a7836add87b 100644 --- a/protocols/autonat/src/v2/client/behaviour.rs +++ b/protocols/autonat/src/v2/client/behaviour.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, VecDeque}, task::{Context, Poll}, time::Duration, }; @@ -10,40 +10,37 @@ use futures_timer::Delay; use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::{ - behaviour::{ConnectionEstablished, ExternalAddrConfirmed}, - ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, FromSwarm, - NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm, + behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionHandler, + ConnectionId, FromSwarm, NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm, }; use rand::prelude::*; use rand_core::OsRng; use std::fmt::{Debug, Display, Formatter}; -use crate::v2::client::handler::dial_request::InternalError; -use crate::v2::{global_only::IpExt, protocol::DialRequest}; +use crate::v2::{global_only::IpExt, protocol::DialRequest, Nonce}; -use super::handler::{ - dial_back, - dial_request::{self, InternalStatusUpdate}, - TestEnd, -}; +use super::handler::{dial_back, dial_request}; #[derive(Debug, Clone, Copy)] pub struct Config { - pub(crate) max_addrs_count: usize, - pub(crate) recheck_interval: Duration, + /// How many candidates we will test at most. + pub(crate) max_candidates: usize, + + /// The interval at which we will attempt to confirm candidates as external addresses. + pub(crate) probe_interval: Duration, } impl Config { - pub fn with_max_addrs_count(self, max_addrs_count: usize) -> Self { + pub fn with_max_candidates(self, max_candidates: usize) -> Self { Self { - max_addrs_count, + max_candidates, ..self } } - pub fn with_recheck_interval(self, recheck_interval: Duration) -> Self { + pub fn with_probe_interval(self, probe_interval: Duration) -> Self { Self { - recheck_interval, + probe_interval, ..self } } @@ -52,8 +49,8 @@ impl Config { impl Default for Config { fn default() -> Self { Self { - max_addrs_count: 10, - recheck_interval: Duration::from_secs(5), + max_candidates: 10, + probe_interval: Duration::from_secs(5), } } } @@ -62,7 +59,6 @@ pub struct Behaviour where R: RngCore + 'static, { - pending_nonces: HashMap, rng: R, config: Config, pending_events: VecDeque< @@ -72,7 +68,6 @@ where >, >, address_candidates: HashMap, - already_tested: HashSet, next_tick: Delay, peer_info: HashMap, } @@ -87,43 +82,23 @@ where fn handle_established_inbound_connection( &mut self, - connection_id: ConnectionId, - peer_id: PeerId, - _local_addr: &Multiaddr, - remote_addr: &Multiaddr, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, ) -> Result<::ConnectionHandler, ConnectionDenied> { - if addr_is_local(remote_addr) { - self.peer_info - .entry(connection_id) - .or_insert(ConnectionInfo { - peer_id, - supports_autonat: false, - is_local: true, - }) - .is_local = true; - } Ok(Either::Right(dial_back::Handler::new())) } fn handle_established_outbound_connection( &mut self, - connection_id: ConnectionId, - peer_id: PeerId, - addr: &Multiaddr, - _role_override: Endpoint, - _port_use: PortUse, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + _: PortUse, ) -> Result<::ConnectionHandler, ConnectionDenied> { - if addr_is_local(addr) { - self.peer_info - .entry(connection_id) - .or_insert(ConnectionInfo { - peer_id, - supports_autonat: false, - is_local: true, - }) - .is_local = true; - } - Ok(Either::Left(dial_request::Handler::new(peer_id))) + Ok(Either::Left(dial_request::Handler::new())) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -134,37 +109,34 @@ where .or_default() .score += 1; } - FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => { - if let Some(info) = self.address_candidates.get_mut(addr) { - info.is_tested = true; - } - } FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, connection_id, + endpoint, .. }) => { - self.peer_info - .entry(connection_id) - .or_insert(ConnectionInfo { + self.peer_info.insert( + connection_id, + ConnectionInfo { peer_id, supports_autonat: false, - is_local: false, - }); + is_local: addr_is_local(endpoint.get_remote_address()), + }, + ); } FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { - self.handle_no_connection(peer_id, connection_id); - } - FromSwarm::DialFailure(DialFailure { - peer_id: Some(peer_id), - connection_id, - .. - }) => { - self.handle_no_connection(peer_id, connection_id); + let info = self + .peer_info + .remove(&connection_id) + .expect("inconsistent state"); + + if info.supports_autonat { + tracing::debug!(%peer_id, "Disconnected from AutoNAT server"); + } } _ => {} } @@ -176,93 +148,90 @@ where connection_id: ConnectionId, event: ::ToBehaviour, ) { - match event { + let (nonce, outcome) = match event { Either::Right(nonce) => { - if let Some(status) = self.pending_nonces.get_mut(&nonce) { - *status = NonceStatus::Received; - tracing::trace!("Received pending nonce from {peer_id:?}"); - } else { - tracing::warn!("Received unexpected nonce from {peer_id:?}, this means that another node tried to be reachable on an address this node is reachable on."); - } + let Some((_, info)) = self + .address_candidates + .iter_mut() + .find(|(_, info)| info.is_pending_with_nonce(nonce)) + else { + tracing::warn!(%peer_id, %nonce, "Received unexpected nonce"); + return; + }; + + info.status = TestStatus::Received(nonce); + tracing::debug!(%peer_id, %nonce, "Successful dial-back"); + + return; } Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => { self.peer_info - .values_mut() - .filter(|info| info.peer_id == peer_id) - .for_each(|info| { - info.supports_autonat = true; - }); - self.peer_info - .entry(connection_id) - .or_insert(ConnectionInfo { - peer_id, - supports_autonat: true, - is_local: false, - }) + .get_mut(&connection_id) + .expect("inconsistent state") .supports_autonat = true; + return; } - Either::Left(dial_request::ToBehaviour::TestCompleted(InternalStatusUpdate { - tested_addr, - bytes_sent: data_amount, - server, - result, - server_no_support, - })) => { - if server_no_support { - self.peer_info - .values_mut() - .filter(|info| info.peer_id == peer_id) - .for_each(|info| { - info.supports_autonat = false; - }); - } - match result { - Ok(TestEnd { - dial_request: DialRequest { nonce, .. }, - ref reachable_addr, - }) => { - if !matches!(self.pending_nonces.get(&nonce), Some(NonceStatus::Received)) { - tracing::debug!( - "server reported reachbility, but didn't actually reached this node." - ); - } else { - self.pending_events - .push_back(ToSwarm::ExternalAddrConfirmed(reachable_addr.clone())); - } - } - Err(ref err) => match &err.internal { - dial_request::InternalError::FailureDuringDialBack { addr: Some(addr) } - | dial_request::InternalError::UnableToConnectOnSelectedAddress { - addr: Some(addr), - } => { - if let Some(peer_info) = self.address_candidates.get_mut(addr) { - peer_info.is_tested = true; - } - tracing::debug!(addr = %addr, "Was unable to connect to the server on the selected address.") - } - dial_request::InternalError::InternalServer - | dial_request::InternalError::DataRequestTooLarge { .. } - | dial_request::InternalError::DataRequestTooSmall { .. } - | dial_request::InternalError::InvalidResponse - | dial_request::InternalError::ServerRejectedDialRequest - | dial_request::InternalError::InvalidReferencedAddress { .. } - | dial_request::InternalError::ServerChoseNotToDialAnyAddress => { - self.handle_no_connection(peer_id, connection_id); - } - _ => { - tracing::debug!("Test failed: {:?}", err); - } - }, + Either::Left(dial_request::ToBehaviour::TestOutcome { nonce, outcome }) => { + (nonce, outcome) + } + }; + + let ((tested_addr, bytes_sent), result) = match outcome { + Ok(address) => { + let received_dial_back = self + .address_candidates + .iter_mut() + .any(|(_, info)| info.is_received_with_nonce(nonce)); + + if !received_dial_back { + tracing::warn!( + %peer_id, + %nonce, + "Server reported reachbility but we never received a dial-back" + ); + return; } - let event = crate::v2::client::Event { - tested_addr, - bytes_sent: data_amount, - server: server.unwrap_or(peer_id), - result: result.map(|_| ()), - }; - self.pending_events.push_back(ToSwarm::GenerateEvent(event)); + + (address, Ok(())) } - } + Err(dial_request::Error::UnsupportedProtocol) => { + self.peer_info + .get_mut(&connection_id) + .expect("inconsistent state") + .supports_autonat = false; + + self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again. + + return; + } + Err(dial_request::Error::Io(e)) => { + tracing::debug!( + %peer_id, + %nonce, + "Failed to complete AutoNAT probe: {e}" + ); + + self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again. + + return; + } + Err(dial_request::Error::AddressNotReachable { + address, + bytes_sent, + error, + }) => { + self.reset_status_to(nonce, TestStatus::Failed); + + ((address, bytes_sent), Err(error)) + } + }; + + self.pending_events.push_back(ToSwarm::GenerateEvent(Event { + tested_addr, + bytes_sent, + server: peer_id, + result: result.map_err(|e| Error { inner: e }), + })); } fn poll( @@ -270,16 +239,20 @@ where cx: &mut Context<'_>, ) -> Poll::FromBehaviour>> { - if let Some(event) = self.pending_events.pop_front() { - return Poll::Ready(event); - } - if self.next_tick.poll_unpin(cx).is_ready() { - self.inject_address_candiate_test(); + loop { if let Some(event) = self.pending_events.pop_front() { return Poll::Ready(event); } + + if self.next_tick.poll_unpin(cx).is_ready() { + self.next_tick.reset(self.config.probe_interval); + + self.issue_dial_requests_for_untested_candidates(); + continue; + } + + return Poll::Pending; } - Poll::Pending } } @@ -289,106 +262,94 @@ where { pub fn new(rng: R, config: Config) -> Self { Self { - pending_nonces: HashMap::new(), rng, - next_tick: Delay::new(config.recheck_interval), + next_tick: Delay::new(config.probe_interval), config, pending_events: VecDeque::new(), address_candidates: HashMap::new(), - already_tested: HashSet::new(), peer_info: HashMap::new(), } } - /// Inject an immediate test for all pending address candidates. - fn inject_address_candiate_test(&mut self) { - if self.peer_info.values().all(|info| !info.supports_autonat) { - return; - } - if self.address_candidates.is_empty() { - return; - } - if self.address_candidates.values().all(|info| info.is_tested) { - return; + /// Issues dial requests to random AutoNAT servers for the most frequently reported, untested candidates. + /// + /// In the current implementation, we only send a single address to each AutoNAT server. + /// This spreads our candidates out across all servers we are connected to which should give us pretty fast feedback on all of them. + fn issue_dial_requests_for_untested_candidates(&mut self) { + for addr in self.untested_candidates() { + let Some((conn_id, peer_id)) = self.random_autonat_server() else { + tracing::debug!("Not connected to any AutoNAT servers"); + return; + }; + + let nonce = self.rng.gen(); + self.address_candidates + .get_mut(&addr) + .expect("only emit candidates") + .status = TestStatus::Pending(nonce); + + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(conn_id), + event: Either::Left(DialRequest { + nonce, + addrs: vec![addr], + }), + }); } + } + + /// Returns all untested candidates, sorted by the frequency they were reported at. + /// + /// More frequently reported candidates are considered to more likely be external addresses and thus tested first. + fn untested_candidates(&self) -> impl Iterator { let mut entries = self .address_candidates .iter() - .filter(|(_, info)| !info.is_tested) - .filter(|(addr, _)| !self.already_tested.contains(addr)) + .filter(|(_, info)| info.status == TestStatus::Untested) .map(|(addr, count)| (addr.clone(), *count)) .collect::>(); + + entries.sort_unstable_by_key(|(_, info)| info.score); + if entries.is_empty() { - return; + tracing::debug!("No untested address candidates"); } - entries.sort_unstable_by_key(|(_, count)| *count); - let addrs = entries - .iter() - .rev() + + entries + .into_iter() + .rev() // `sort_unstable` is ascending + .take(self.config.max_candidates) .map(|(addr, _)| addr) - .take(self.config.max_addrs_count) - .cloned() - .collect(); - if let Some(ConnectionInfo { peer_id, .. }) = self - .peer_info - .values() - .filter(|e| e.supports_autonat) - .choose(&mut self.rng) - { - self.submit_req_for_peer(*peer_id, addrs); - } - self.next_tick.reset(self.config.recheck_interval); } - fn submit_req_for_peer(&mut self, peer: PeerId, addrs: Vec) { - let nonce = self.rng.gen(); - let req = DialRequest { nonce, addrs }; - self.pending_nonces.insert(nonce, NonceStatus::Pending); - if let Some(conn_id) = self + /// Chooses an active connection to one of our peers that reported support for the [`DIAL_REQUEST_PROTOCOL`](crate::v2::DIAL_REQUEST_PROTOCOL) protocol. + fn random_autonat_server(&mut self) -> Option<(ConnectionId, PeerId)> { + let (conn_id, info) = self .peer_info .iter() .filter(|(_, info)| info.supports_autonat) - .find(|(_, info)| info.peer_id == peer) - .map(|(id, _)| *id) - { - self.pending_events.push_back(ToSwarm::NotifyHandler { - peer_id: peer, - handler: NotifyHandler::One(conn_id), - event: Either::Left(req), - }); - } + .choose(&mut self.rng)?; + + Some((*conn_id, info.peer_id)) } - fn handle_no_connection(&mut self, peer_id: PeerId, connection_id: ConnectionId) { - let removeable_conn_ids = self - .peer_info - .iter() - .filter(|(conn_id, info)| info.peer_id == peer_id && **conn_id == connection_id) - .map(|(id, _)| *id) - .collect::>(); - for conn_id in removeable_conn_ids { - self.peer_info.remove(&conn_id); - } - let known_servers_n = self - .peer_info - .values() - .filter(|info| info.supports_autonat) - .count(); - let changed_n = self - .peer_info - .values_mut() - .filter(|info| info.supports_autonat) - .filter(|info| info.peer_id == peer_id) - .map(|info| info.supports_autonat = false) - .count(); - if known_servers_n != changed_n { - tracing::trace!(server = %peer_id, "Removing potential Autonat server due to dial failure"); - } + fn reset_status_to(&mut self, nonce: Nonce, new_status: TestStatus) { + let Some((_, info)) = self + .address_candidates + .iter_mut() + .find(|(_, i)| i.is_pending_with_nonce(nonce) || i.is_received_with_nonce(nonce)) + else { + return; + }; + + info.status = new_status; } + // FIXME: We don't want test-only APIs in our public API. pub fn validate_addr(&mut self, addr: &Multiaddr) { if let Some(info) = self.address_candidates.get_mut(addr) { - info.is_tested = true; + info.status = TestStatus::Received(self.rng.next_u64()); } } } @@ -400,32 +361,25 @@ impl Default for Behaviour { } pub struct Error { - pub(crate) internal: InternalError, -} - -impl From for Error { - fn from(internal: InternalError) -> Self { - Self { internal } - } + pub(crate) inner: dial_request::DialBackError, } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.internal, f) + Display::fmt(&self.inner, f) } } impl Debug for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - Debug::fmt(&self.internal, f) + Debug::fmt(&self.inner, f) } } #[derive(Debug)] pub struct Event { /// The address that was selected for testing. - /// Is `None` in the case that the server respond with something unexpected. - pub tested_addr: Option, + pub tested_addr: Multiaddr, /// The amount of data that was sent to the server. /// Is 0 if it wasn't necessary to send any data. /// Otherwise it's a number between 30.000 and 100.000. @@ -445,11 +399,6 @@ fn addr_is_local(addr: &Multiaddr) -> bool { }) } -enum NonceStatus { - Pending, - Received, -} - struct ConnectionInfo { peer_id: PeerId, supports_autonat: bool, @@ -459,25 +408,30 @@ struct ConnectionInfo { #[derive(Copy, Clone, Default)] struct AddressInfo { score: usize, - is_tested: bool, + status: TestStatus, } -impl PartialOrd for AddressInfo { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.score.cmp(&other.score)) +impl AddressInfo { + fn is_pending_with_nonce(&self, nonce: Nonce) -> bool { + match self.status { + TestStatus::Pending(c) => c == nonce, + _ => false, + } } -} -impl PartialEq for AddressInfo { - fn eq(&self, other: &Self) -> bool { - self.score == other.score + fn is_received_with_nonce(&self, nonce: Nonce) -> bool { + match self.status { + TestStatus::Received(c) => c == nonce, + _ => false, + } } } -impl Ord for AddressInfo { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.score.cmp(&other.score) - } +#[derive(Clone, Copy, Default, PartialEq)] +enum TestStatus { + #[default] + Untested, + Pending(Nonce), + Failed, + Received(Nonce), } - -impl Eq for AddressInfo {} diff --git a/protocols/autonat/src/v2/client/handler.rs b/protocols/autonat/src/v2/client/handler.rs index bb6c9636e2c..e526c2fb44c 100644 --- a/protocols/autonat/src/v2/client/handler.rs +++ b/protocols/autonat/src/v2/client/handler.rs @@ -1,4 +1,2 @@ pub(crate) mod dial_back; pub(crate) mod dial_request; - -pub(crate) use dial_request::TestEnd; diff --git a/protocols/autonat/src/v2/client/handler/dial_back.rs b/protocols/autonat/src/v2/client/handler/dial_back.rs index 492cc7729a2..18bfee792f1 100644 --- a/protocols/autonat/src/v2/client/handler/dial_back.rs +++ b/protocols/autonat/src/v2/client/handler/dial_back.rs @@ -12,7 +12,7 @@ use libp2p_swarm::{ }; use void::Void; -use crate::v2::{protocol, Nonce, DIAL_BACK_PROTOCOL_NAME}; +use crate::v2::{protocol, Nonce, DIAL_BACK_PROTOCOL}; pub struct Handler { inbound: FuturesSet>, @@ -35,7 +35,7 @@ impl ConnectionHandler for Handler { type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL_NAME), ()) + SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL), ()) } fn poll( diff --git a/protocols/autonat/src/v2/client/handler/dial_request.rs b/protocols/autonat/src/v2/client/handler/dial_request.rs index a34c76003a4..4b071894215 100644 --- a/protocols/autonat/src/v2/client/handler/dial_request.rs +++ b/protocols/autonat/src/v2/client/handler/dial_request.rs @@ -1,11 +1,10 @@ -use futures::{channel::oneshot, AsyncRead, AsyncWrite}; -use futures_bounded::FuturesSet; +use futures::{channel::oneshot, AsyncWrite}; +use futures_bounded::FuturesMap; use libp2p_core::{ upgrade::{DeniedUpgrade, ReadyUpgrade}, Multiaddr, }; -use libp2p_identity::PeerId; use libp2p_swarm::{ handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound, OutboundUpgradeSend, @@ -23,58 +22,49 @@ use std::{ }; use crate::v2::{ - client::behaviour::Error, generated::structs::{mod_DialResponse::ResponseStatus, DialStatus}, protocol::{ - Coder, DialDataRequest, DialDataResponse, DialRequest, DialResponse, Request, Response, + Coder, DialDataRequest, DialDataResponse, DialRequest, Response, DATA_FIELD_LEN_UPPER_BOUND, DATA_LEN_LOWER_BOUND, DATA_LEN_UPPER_BOUND, }, - REQUEST_PROTOCOL_NAME, + Nonce, DIAL_REQUEST_PROTOCOL, }; -#[derive(Debug, thiserror::Error)] -pub(crate) enum InternalError { - #[error("io error")] - Io(#[from] io::Error), - #[error("invalid referenced address index: {index} (max number of addr: {max})")] - InvalidReferencedAddress { index: usize, max: usize }, - #[error("data request too large: {len} (max: {max})")] - DataRequestTooLarge { len: usize, max: usize }, - #[error("data request too small: {len} (min: {min})")] - DataRequestTooSmall { len: usize, min: usize }, - #[error("server rejected dial request")] - ServerRejectedDialRequest, - #[error("server chose not to dial any provided address")] - ServerChoseNotToDialAnyAddress, - #[error("server ran into an internal error")] - InternalServer, - #[error("server did not respond correctly to dial request")] - InvalidResponse, - #[error("server was unable to connect to address: {addr:?}")] - UnableToConnectOnSelectedAddress { addr: Option }, - #[error("server experienced failure during dial back on address: {addr:?}")] - FailureDuringDialBack { addr: Option }, +#[derive(Debug)] +pub enum ToBehaviour { + TestOutcome { + nonce: Nonce, + outcome: Result<(Multiaddr, usize), Error>, + }, + PeerHasServerSupport, } -#[derive(Debug)] -pub struct InternalStatusUpdate { - pub(crate) tested_addr: Option, - pub(crate) bytes_sent: usize, - pub(crate) server: Option, - pub result: Result, - pub(crate) server_no_support: bool, +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Address is not reachable: {error}")] + AddressNotReachable { + address: Multiaddr, + bytes_sent: usize, + error: DialBackError, + }, + #[error("Peer does not support AutoNAT dial-request protocol")] + UnsupportedProtocol, + #[error("IO error: {0}")] + Io(io::Error), } -#[derive(Debug)] -pub struct TestEnd { - pub(crate) dial_request: DialRequest, - pub(crate) reachable_addr: Multiaddr, +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } } -#[derive(Debug)] -pub enum ToBehaviour { - TestCompleted(InternalStatusUpdate), - PeerHasServerSupport, +#[derive(thiserror::Error, Debug)] +pub enum DialBackError { + #[error("server failed to establish a connection")] + NoConnection, + #[error("dial back stream failed")] + StreamFailed, } pub struct Handler { @@ -85,7 +75,7 @@ pub struct Handler { ::ToBehaviour, >, >, - outbound: futures_bounded::FuturesSet, + outbound: FuturesMap>, queued_streams: VecDeque< oneshot::Sender< Result< @@ -94,16 +84,14 @@ pub struct Handler { >, >, >, - server: PeerId, } impl Handler { - pub(crate) fn new(server: PeerId) -> Self { + pub(crate) fn new() -> Self { Self { queued_events: VecDeque::new(), - outbound: FuturesSet::new(Duration::from_secs(10), 10), + outbound: FuturesMap::new(Duration::from_secs(10), 10), queued_streams: VecDeque::default(), - server, } } @@ -112,11 +100,11 @@ impl Handler { self.queued_streams.push_back(tx); self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(REQUEST_PROTOCOL_NAME), ()), + protocol: SubstreamProtocol::new(ReadyUpgrade::new(DIAL_REQUEST_PROTOCOL), ()), }); if self .outbound - .try_push(start_stream_handle(self.server, req, rx)) + .try_push(req.nonce, start_stream_handle(req, rx)) .is_err() { tracing::debug!("Dial request dropped, too many requests in flight"); @@ -145,23 +133,24 @@ impl ConnectionHandler for Handler { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } - if let Poll::Ready(m) = self.outbound.poll_unpin(cx) { - let status_update = match m { - Ok(ok) => ok, - Err(_) => InternalStatusUpdate { - tested_addr: None, - bytes_sent: 0, - server: None, - result: Err(Error { - internal: InternalError::Io(io::Error::from(io::ErrorKind::TimedOut)), - }), - server_no_support: false, - }, - }; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - ToBehaviour::TestCompleted(status_update), - )); + + match self.outbound.poll_unpin(cx) { + Poll::Ready((nonce, Ok(outcome))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::TestOutcome { nonce, outcome }, + )) + } + Poll::Ready((nonce, Err(_))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::TestOutcome { + nonce, + outcome: Err(Error::Io(io::ErrorKind::TimedOut.into())), + }, + )); + } + Poll::Pending => {} } + Poll::Pending } @@ -205,7 +194,7 @@ impl ConnectionHandler for Handler { } }, ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(mut added)) => { - if added.any(|p| p.as_ref() == REQUEST_PROTOCOL_NAME) { + if added.any(|p| p.as_ref() == DIAL_REQUEST_PROTOCOL) { self.queued_events .push_back(ConnectionHandlerEvent::NotifyBehaviour( ToBehaviour::PeerHasServerSupport, @@ -218,180 +207,126 @@ impl ConnectionHandler for Handler { } async fn start_stream_handle( - server: PeerId, - dial_request: DialRequest, - stream_recv: oneshot::Receiver< - Result< - Stream, - StreamUpgradeError< as OutboundUpgradeSend>::Error>, - >, - >, -) -> InternalStatusUpdate { - let mut server_no_support = false; - let substream_result = match stream_recv.await { - Ok(Ok(substream)) => Ok(substream), - Ok(Err(StreamUpgradeError::Io(io))) => Err(InternalError::from(io).into()), - Ok(Err(StreamUpgradeError::Timeout)) => { - Err(InternalError::Io(io::Error::from(io::ErrorKind::TimedOut)).into()) - } - Ok(Err(StreamUpgradeError::Apply(upgrade_error))) => void::unreachable(upgrade_error), - Ok(Err(StreamUpgradeError::NegotiationFailed)) => { - server_no_support = true; - Err( - InternalError::Io(io::Error::new(io::ErrorKind::Other, "negotiation failed")) - .into(), - ) - } - Err(_) => Err(InternalError::InternalServer.into()), - }; - let substream = match substream_result { - Ok(substream) => substream, - Err(err) => { - let status_update = InternalStatusUpdate { - tested_addr: None, - bytes_sent: 0, - server: Some(server), - result: Err(err), - server_no_support, - }; - return status_update; - } - }; - let mut data_amount = 0; - let mut checked_addr_idx = None; - let addrs = dial_request.addrs.clone(); - assert_ne!(addrs, vec![]); - let result = handle_stream( - dial_request, - substream, - &mut data_amount, - &mut checked_addr_idx, - ) - .await - .map_err(crate::v2::client::behaviour::Error::from); - InternalStatusUpdate { - tested_addr: checked_addr_idx.and_then(|idx| addrs.get(idx).cloned()), - bytes_sent: data_amount, - server: Some(server), - result, - server_no_support, - } -} + req: DialRequest, + stream_recv: oneshot::Receiver>>, +) -> Result<(Multiaddr, usize), Error> { + let stream = stream_recv + .await + .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))? + .map_err(|e| match e { + StreamUpgradeError::NegotiationFailed => Error::UnsupportedProtocol, + StreamUpgradeError::Timeout => Error::Io(io::ErrorKind::TimedOut.into()), + StreamUpgradeError::Apply(v) => void::unreachable(v), + StreamUpgradeError::Io(e) => Error::Io(e), + })?; -async fn handle_stream( - dial_request: DialRequest, - stream: impl AsyncRead + AsyncWrite + Unpin, - data_amount: &mut usize, - checked_addr_idx: &mut Option, -) -> Result { let mut coder = Coder::new(stream); - coder.send(Request::Dial(dial_request.clone())).await?; - match coder.next().await? { + coder.send(req.clone()).await?; + + let (res, bytes_sent) = match coder.next().await? { Response::Data(DialDataRequest { addr_idx, num_bytes, }) => { - if addr_idx >= dial_request.addrs.len() { - return Err(InternalError::InvalidReferencedAddress { - index: addr_idx, - max: dial_request.addrs.len(), - }); - } - if num_bytes > DATA_LEN_UPPER_BOUND { - return Err(InternalError::DataRequestTooLarge { - len: num_bytes, - max: DATA_LEN_UPPER_BOUND, - }); + if addr_idx >= req.addrs.len() { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "address index out of bounds", + ))); } - if num_bytes < DATA_LEN_LOWER_BOUND { - return Err(InternalError::DataRequestTooSmall { - len: num_bytes, - min: DATA_LEN_LOWER_BOUND, - }); - } - *checked_addr_idx = Some(addr_idx); - send_aap_data(&mut coder, num_bytes, data_amount).await?; - if let Response::Dial(dial_response) = coder.next().await? { - *checked_addr_idx = Some(dial_response.addr_idx); - coder.close().await?; - test_end_from_dial_response(dial_request, dial_response) - } else { - Err(InternalError::InternalServer) + if num_bytes > DATA_LEN_UPPER_BOUND || num_bytes < DATA_LEN_LOWER_BOUND { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "requested bytes out of bounds", + ))); } + + send_aap_data(&mut coder, num_bytes).await?; + + let Response::Dial(dial_response) = coder.next().await? else { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "expected message", + ))); + }; + + (dial_response, num_bytes) } - Response::Dial(dial_response) => { - *checked_addr_idx = Some(dial_response.addr_idx); - coder.close().await?; - test_end_from_dial_response(dial_request, dial_response) + Response::Dial(dial_response) => (dial_response, 0), + }; + coder.close().await?; + + match res.status { + ResponseStatus::E_REQUEST_REJECTED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server rejected request", + ))) + } + ResponseStatus::E_DIAL_REFUSED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server refused dial", + ))) + } + ResponseStatus::E_INTERNAL_ERROR => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server encountered internal error", + ))) } + ResponseStatus::OK => {} } -} -fn test_end_from_dial_response( - req: DialRequest, - resp: DialResponse, -) -> Result { - if resp.addr_idx >= req.addrs.len() { - return Err(InternalError::InvalidReferencedAddress { - index: resp.addr_idx, - max: req.addrs.len(), - }); - } - match (resp.status, resp.dial_status) { - (ResponseStatus::E_REQUEST_REJECTED, _) => Err(InternalError::ServerRejectedDialRequest), - (ResponseStatus::E_DIAL_REFUSED, _) => Err(InternalError::ServerChoseNotToDialAnyAddress), - (ResponseStatus::E_INTERNAL_ERROR, _) => Err(InternalError::InternalServer), - (ResponseStatus::OK, DialStatus::UNUSED) => Err(InternalError::InvalidResponse), - (ResponseStatus::OK, DialStatus::E_DIAL_ERROR) => { - Err(InternalError::UnableToConnectOnSelectedAddress { - addr: req.addrs.get(resp.addr_idx).cloned(), - }) + let tested_address = req + .addrs + .get(res.addr_idx) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "address index out of bounds"))? + .clone(); + + match res.dial_status { + DialStatus::UNUSED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "unexpected message", + ))) } - (ResponseStatus::OK, DialStatus::E_DIAL_BACK_ERROR) => { - Err(InternalError::FailureDuringDialBack { - addr: req.addrs.get(resp.addr_idx).cloned(), + DialStatus::E_DIAL_ERROR => { + return Err(Error::AddressNotReachable { + address: tested_address, + bytes_sent, + error: DialBackError::NoConnection, }) } - (ResponseStatus::OK, DialStatus::OK) => req - .addrs - .get(resp.addr_idx) - .ok_or(InternalError::InvalidReferencedAddress { - index: resp.addr_idx, - max: req.addrs.len(), + DialStatus::E_DIAL_BACK_ERROR => { + return Err(Error::AddressNotReachable { + address: tested_address, + bytes_sent, + error: DialBackError::StreamFailed, }) - .cloned() - .map(|reachable_addr| TestEnd { - dial_request: req, - reachable_addr, - }), + } + DialStatus::OK => {} } + + Ok((tested_address, bytes_sent)) } -async fn send_aap_data( - stream: &mut Coder, - num_bytes: usize, - data_amount: &mut usize, -) -> io::Result<()> +async fn send_aap_data(stream: &mut Coder, num_bytes: usize) -> io::Result<()> where I: AsyncWrite + Unpin, { let count_full = num_bytes / DATA_FIELD_LEN_UPPER_BOUND; let partial_len = num_bytes % DATA_FIELD_LEN_UPPER_BOUND; - for (data_count, req) in repeat(DATA_FIELD_LEN_UPPER_BOUND) + for req in repeat(DATA_FIELD_LEN_UPPER_BOUND) .take(count_full) .chain(once(partial_len)) .filter(|e| *e > 0) .map(|data_count| { - ( - data_count, - Request::Data( - DialDataResponse::new(data_count).expect("data count is unexpectedly too big"), - ), - ) + DialDataResponse::new(data_count).expect("data count is unexpectedly too big") }) { - *data_amount += data_count; stream.send(req).await?; } + Ok(()) } diff --git a/protocols/autonat/src/v2/protocol.rs b/protocols/autonat/src/v2/protocol.rs index 69f80bf8d10..b4a1cbab4f6 100644 --- a/protocols/autonat/src/v2/protocol.rs +++ b/protocols/autonat/src/v2/protocol.rs @@ -5,7 +5,7 @@ use std::{borrow::Cow, io}; use asynchronous_codec::{Framed, FramedRead, FramedWrite}; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, SinkExt, StreamExt}; +use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; use libp2p_core::Multiaddr; use quick_protobuf_codec::Codec; @@ -90,6 +90,32 @@ pub(crate) enum Request { Data(DialDataResponse), } +impl From for proto::Message { + fn from(val: DialRequest) -> Self { + let addrs = val.addrs.iter().map(|e| e.to_vec()).collect(); + let nonce = Some(val.nonce); + + proto::Message { + msg: proto::mod_Message::OneOfmsg::dialRequest(proto::DialRequest { addrs, nonce }), + } + } +} + +impl From for proto::Message { + fn from(val: DialDataResponse) -> Self { + debug_assert!( + val.data_count <= DATA_FIELD_LEN_UPPER_BOUND, + "data_count too large" + ); + static DATA: &[u8] = &[0u8; DATA_FIELD_LEN_UPPER_BOUND]; + proto::Message { + msg: proto::mod_Message::OneOfmsg::dialDataResponse(proto::DialDataResponse { + data: Some(Cow::Borrowed(&DATA[..val.data_count])), + }), + } + } +} + #[derive(Debug, Clone, PartialEq)] pub struct DialRequest { pub(crate) addrs: Vec, @@ -144,35 +170,6 @@ impl TryFrom for Request { } } -impl Into for Request { - fn into(self) -> proto::Message { - match self { - Request::Dial(DialRequest { addrs, nonce }) => { - let addrs = addrs.iter().map(|e| e.to_vec()).collect(); - let nonce = Some(nonce); - proto::Message { - msg: proto::mod_Message::OneOfmsg::dialRequest(proto::DialRequest { - addrs, - nonce, - }), - } - } - Request::Data(DialDataResponse { data_count }) => { - debug_assert!( - data_count <= DATA_FIELD_LEN_UPPER_BOUND, - "data_count too large" - ); - static DATA: &[u8] = &[0u8; DATA_FIELD_LEN_UPPER_BOUND]; - proto::Message { - msg: proto::mod_Message::OneOfmsg::dialDataResponse(proto::DialDataResponse { - data: Some(Cow::Borrowed(&DATA[..data_count])), - }), - } - } - } - } -} - #[derive(Debug, Clone)] pub(crate) enum Response { Dial(DialResponse), @@ -266,46 +263,32 @@ impl DialDataRequest { } } -pub(crate) async fn dial_back(mut stream: impl AsyncWrite + Unpin, nonce: Nonce) -> io::Result<()> { - let dial_back = DialBack { nonce }; - dial_back.write_into(&mut stream).await?; - stream.close().await -} +const DIAL_BACK_MAX_SIZE: usize = 10; -pub(crate) async fn recv_dial_back( - mut stream: impl AsyncRead + AsyncWrite + Unpin, -) -> io::Result { - let DialBack { nonce } = DialBack::read_from(&mut stream).await?; - stream.close().await?; - Ok(nonce) -} +pub(crate) async fn dial_back(stream: impl AsyncWrite + Unpin, nonce: Nonce) -> io::Result<()> { + let msg = proto::DialBack { nonce: Some(nonce) }; + let mut framed = FramedWrite::new(stream, Codec::::new(DIAL_BACK_MAX_SIZE)); -const DIAL_BACK_MAX_SIZE: usize = 10; + framed + .send(msg) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + framed.close().await?; -pub(crate) struct DialBack { - pub(crate) nonce: Nonce, + Ok(()) } -impl DialBack { - pub(crate) async fn read_from(io: impl AsyncRead + Unpin) -> io::Result { - let proto::DialBack { nonce } = - FramedRead::new(io, Codec::::new(DIAL_BACK_MAX_SIZE)) - .next() - .await - .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; - let nonce = ok_or_invalid_data!(nonce)?; - Ok(Self { nonce }) - } +pub(crate) async fn recv_dial_back( + stream: impl AsyncRead + AsyncWrite + Unpin, +) -> io::Result { + let framed = &mut FramedRead::new(stream, Codec::::new(DIAL_BACK_MAX_SIZE)); + let proto::DialBack { nonce } = framed + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; + let nonce = ok_or_invalid_data!(nonce)?; - async fn write_into(self, io: impl AsyncWrite + Unpin) -> io::Result<()> { - let msg = proto::DialBack { - nonce: Some(self.nonce), - }; - FramedWrite::new(io, Codec::::new(DIAL_BACK_MAX_SIZE)) - .send(msg) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - } + Ok(nonce) } #[cfg(test)] diff --git a/protocols/autonat/src/v2/server/handler/dial_back.rs b/protocols/autonat/src/v2/server/handler/dial_back.rs index f671434eabe..d9a6e61f4be 100644 --- a/protocols/autonat/src/v2/server/handler/dial_back.rs +++ b/protocols/autonat/src/v2/server/handler/dial_back.rs @@ -14,7 +14,7 @@ use libp2p_swarm::{ SubstreamProtocol, }; -use crate::v2::{protocol::dial_back, DIAL_BACK_PROTOCOL_NAME}; +use crate::v2::{protocol::dial_back, DIAL_BACK_PROTOCOL}; use super::dial_request::{DialBackCommand, DialBackStatus as DialBackRes}; @@ -70,7 +70,7 @@ impl ConnectionHandler for Handler { if let Some(cmd) = self.pending_nonce.take() { self.requested_substream_nonce = Some(cmd); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL_NAME), ()), + protocol: SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL), ()), }); } Poll::Pending diff --git a/protocols/autonat/src/v2/server/handler/dial_request.rs b/protocols/autonat/src/v2/server/handler/dial_request.rs index beb17d38b7f..04e61d54082 100644 --- a/protocols/autonat/src/v2/server/handler/dial_request.rs +++ b/protocols/autonat/src/v2/server/handler/dial_request.rs @@ -25,7 +25,7 @@ use crate::v2::{ generated::structs::{mod_DialResponse::ResponseStatus, DialStatus}, protocol::{Coder, DialDataRequest, DialRequest, DialResponse, Request, Response}, server::behaviour::StatusUpdate, - Nonce, REQUEST_PROTOCOL_NAME, + Nonce, DIAL_REQUEST_PROTOCOL, }; #[derive(Debug, PartialEq)] @@ -92,7 +92,7 @@ where type OutboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(ReadyUpgrade::new(REQUEST_PROTOCOL_NAME), ()) + SubstreamProtocol::new(ReadyUpgrade::new(DIAL_REQUEST_PROTOCOL), ()) } fn poll( diff --git a/protocols/autonat/tests/autonatv2.rs b/protocols/autonat/tests/autonatv2.rs index 618ad73f6ec..951610d84b2 100644 --- a/protocols/autonat/tests/autonatv2.rs +++ b/protocols/autonat/tests/autonatv2.rs @@ -445,7 +445,7 @@ async fn new_client() -> Swarm { let mut node = Swarm::new_ephemeral(|identity| CombinedClient { autonat: libp2p_autonat::v2::client::Behaviour::new( OsRng, - Config::default().with_recheck_interval(Duration::from_millis(100)), + Config::default().with_probe_interval(Duration::from_millis(100)), ), identify: libp2p_identify::Behaviour::new(libp2p_identify::Config::new( "/libp2p-test/1.0.0".into(),