diff --git a/Cargo.lock b/Cargo.lock index 57bca1723d4..ed333712a62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3354,7 +3354,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.44.0" +version = "0.44.1" dependencies = [ "async-std", "futures", @@ -6729,14 +6729,15 @@ dependencies = [ [[package]] name = "yamux" -version = "0.10.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d9ba232399af1783a58d8eb26f6b5006fbefe2dc9ef36bd283324792d03ea5" +checksum = "0329ef377816896f014435162bb3711ea7a07729c23d0960e6f8048b21b8fe91" dependencies = [ "futures", "log", "nohash-hasher", "parking_lot", + "pin-project", "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 0a23bc54815..42d4375cb1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" } libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" } libp2p-websocket = { version = "0.42.0", path = "transports/websocket" } libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" } -libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" } +libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" } multistream-select = { version = "0.13.0", path = "misc/multistream-select" } quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" } quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" } diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index a2e281dbb87..2d0d8ea25b7 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -19,7 +19,7 @@ rand = "0.8.5" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] axum = "0.6" -libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns"] } +libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns", "identify"] } libp2p-quic = { workspace = true, features = ["tokio"] } libp2p-webrtc = { workspace = true, features = ["tokio"] } libp2p-mplex = { path = "../muxers/mplex" } @@ -34,7 +34,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [target.'cfg(target_arch = "wasm32")'.dependencies] -libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen"] } +libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] } wasm-bindgen = { version = "0.2" } wasm-bindgen-futures = { version = "0.4" } wasm-logger = { version = "0.2.0" } diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index beb7c91c63d..57ce636367b 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::{bail, Context, Result}; use futures::{FutureExt, StreamExt}; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; -use libp2p::{identity, ping, Multiaddr, PeerId}; +use libp2p::{identify, identity, ping, Multiaddr, PeerId}; #[cfg(target_arch = "wasm32")] use wasm_bindgen::prelude::*; @@ -29,12 +29,17 @@ pub async fn run_test( let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?; // Build the transport from the passed ENV var. - let (boxed_transport, local_addr) = build_transport(local_key, ip, transport)?; + let (boxed_transport, local_addr) = build_transport(local_key.clone(), ip, transport)?; let mut swarm = swarm_builder( boxed_transport, Behaviour { ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), keep_alive: keep_alive::Behaviour, + // Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved. + identify: identify::Behaviour::new(identify::Config::new( + "/interop-tests".to_owned(), + local_key.public(), + )), }, local_peer_id, ) @@ -237,6 +242,7 @@ impl FromStr for SecProtocol { struct Behaviour { ping: ping::Behaviour, keep_alive: keep_alive::Behaviour, + identify: identify::Behaviour, } /// Helper function to get a ENV variable into an test parameter like `Transport`. diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index f0055c3fe4f..caea4e5359e 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.44.1 - unreleased + +- Update to `yamux` `v0.12` which brings performance improvements and introduces an ACK backlog of 256 inbound streams. + When interacting with other libp2p nodes that are also running this or a newer version, the creation of inbound streams will be backpressured once the ACK backlog is hit. + See [PR 3013]. + +[PR 3013]: https://github.com/libp2p/rust-libp2p/pull/3013 + ## 0.44.0 - Raise MSRV to 1.65. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 50a9e97d1d0..11b94ac9738 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = { workspace = true } description = "Yamux multiplexing protocol for libp2p" -version = "0.44.0" +version = "0.44.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.28" libp2p-core = { workspace = true } thiserror = "1.0" -yamux = "0.10.0" +yamux = "0.12" log = "0.4" [dev-dependencies] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index b24c976ebf2..12e5dd8c1ff 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,14 +22,14 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use futures::{future, prelude::*, ready, stream::BoxStream}; +use futures::{future, prelude::*, ready}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; use std::io::{IoSlice, IoSliceMut}; use std::task::Waker; use std::{ - fmt, io, iter, mem, + io, iter, pin::Pin, task::{Context, Poll}, }; @@ -37,14 +37,12 @@ use thiserror::Error; use yamux::ConnectionError; /// A Yamux connection. +#[derive(Debug)] pub struct Muxer { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: BoxStream<'static, Result>, - /// Handle to control the connection. - control: yamux::Control, + connection: yamux::Connection, /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// - /// The only way how yamux can make progress is by driving the stream. However, the + /// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. @@ -54,17 +52,13 @@ pub struct Muxer { inbound_stream_buffer: VecDeque, /// Waker to be called when new inbound streams are available. inbound_stream_waker: Option, - - _phantom: std::marker::PhantomData, } -const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; - -impl fmt::Debug for Muxer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Yamux") - } -} +/// How many streams to buffer before we start resetting them. +/// +/// This is equal to the ACK BACKLOG in `rust-yamux`. +/// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset streams because they'll voluntarily stop opening them once they hit the ACK backlog. +const MAX_BUFFERED_INBOUND_STREAMS: usize = 256; impl Muxer where @@ -72,22 +66,17 @@ where { /// Create a new Yamux connection. fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { - let conn = yamux::Connection::new(io, cfg, mode); - let ctrl = conn.control(); - - Self { - incoming: yamux::into_stream(conn).err_into().boxed(), - control: ctrl, + Muxer { + connection: yamux::Connection::new(io, cfg, mode), inbound_stream_buffer: VecDeque::default(), inbound_stream_waker: None, - _phantom: Default::default(), } } } impl StreamMuxer for Muxer where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static, + C: AsyncRead + AsyncWrite + Unpin + 'static, { type Substream = Stream; type Error = Error; @@ -112,10 +101,15 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - Pin::new(&mut self.control) - .poll_open_stream(cx) - .map_ok(Stream) - .map_err(Error) + let stream = ready!(self.connection.poll_new_outbound(cx).map_err(Error)?); + + Poll::Ready(Ok(Stream(stream))) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.connection.poll_close(cx).map_err(Error)?); + + Poll::Ready(Ok(())) } fn poll( @@ -141,23 +135,6 @@ where cx.waker().wake_by_ref(); Poll::Pending } - - fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { - if let Poll::Ready(()) = Pin::new(&mut self.control).poll_close(c).map_err(Error)? { - return Poll::Ready(Ok(())); - } - - while let Poll::Ready(maybe_inbound_stream) = - self.incoming.poll_next_unpin(c).map_err(Error)? - { - match maybe_inbound_stream { - Some(inbound_stream) => mem::drop(inbound_stream), - None => return Poll::Ready(Ok(())), - } - } - - Poll::Pending - } } /// A stream produced by the yamux multiplexer. @@ -210,18 +187,16 @@ impl AsyncWrite for Stream { impl Muxer where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static, + C: AsyncRead + AsyncWrite + Unpin + 'static, { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { - self.incoming.poll_next_unpin(cx).map(|maybe_stream| { - let stream = maybe_stream - .transpose() - .map_err(Error)? - .map(Stream) - .ok_or(Error(ConnectionError::Closed))?; - - Ok(stream) - }) + let stream = ready!(self.connection.poll_next_inbound(cx)) + .transpose() + .map_err(Error)? + .map(Stream) + .ok_or(Error(ConnectionError::Closed))?; + + Poll::Ready(Ok(stream)) } } diff --git a/protocols/kad/tests/client_mode.rs b/protocols/kad/tests/client_mode.rs index b2530569518..30fd4d972a8 100644 --- a/protocols/kad/tests/client_mode.rs +++ b/protocols/kad/tests/client_mode.rs @@ -59,11 +59,11 @@ async fn two_servers_add_each_other_to_routing_table() { match libp2p_swarm_test::drive(&mut server2, &mut server1).await { ( - [Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)], + [Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)], [Identify(_), Identify(_)], ) | ( - [Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })], + [Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })], [Identify(_), Identify(_)], ) => { assert_eq!(peer2, server1_peer_id);