Skip to content

Commit

Permalink
deps(yamux): update yamux to v0.12
Browse files Browse the repository at this point in the history
Pull-Request: libp2p#3013.
  • Loading branch information
thomaseizinger authored and hanabi1224 committed Aug 3, 2023
1 parent 2a8e371 commit 94dc11b
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 67 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" }
libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" }
libp2p-websocket = { version = "0.42.0", path = "transports/websocket" }
libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" }
libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" }
multistream-select = { version = "0.13.0", path = "misc/multistream-select" }
quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" }
quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" }
Expand Down
4 changes: 2 additions & 2 deletions interop-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rand = "0.8.5"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
axum = "0.6"
libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns"] }
libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns", "identify"] }
libp2p-quic = { workspace = true, features = ["tokio"] }
libp2p-webrtc = { workspace = true, features = ["tokio"] }
libp2p-mplex = { path = "../muxers/mplex" }
Expand All @@ -34,7 +34,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen"] }
libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] }
wasm-bindgen = { version = "0.2" }
wasm-bindgen-futures = { version = "0.4" }
wasm-logger = { version = "0.2.0" }
Expand Down
10 changes: 8 additions & 2 deletions interop-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::{bail, Context, Result};
use futures::{FutureExt, StreamExt};
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::{identity, ping, Multiaddr, PeerId};
use libp2p::{identify, identity, ping, Multiaddr, PeerId};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;

Expand All @@ -29,12 +29,17 @@ pub async fn run_test(
let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?;

// Build the transport from the passed ENV var.
let (boxed_transport, local_addr) = build_transport(local_key, ip, transport)?;
let (boxed_transport, local_addr) = build_transport(local_key.clone(), ip, transport)?;
let mut swarm = swarm_builder(
boxed_transport,
Behaviour {
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour,
// Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved.
identify: identify::Behaviour::new(identify::Config::new(
"/interop-tests".to_owned(),
local_key.public(),
)),
},
local_peer_id,
)
Expand Down Expand Up @@ -237,6 +242,7 @@ impl FromStr for SecProtocol {
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
identify: identify::Behaviour,
}

/// Helper function to get a ENV variable into an test parameter like `Transport`.
Expand Down
8 changes: 8 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 0.44.1 - unreleased

- Update to `yamux` `v0.12` which brings performance improvements and introduces an ACK backlog of 256 inbound streams.
When interacting with other libp2p nodes that are also running this or a newer version, the creation of inbound streams will be backpressured once the ACK backlog is hit.
See [PR 3013].

[PR 3013]: https://github.com/libp2p/rust-libp2p/pull/3013

## 0.44.0

- Raise MSRV to 1.65.
Expand Down
4 changes: 2 additions & 2 deletions muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-yamux"
edition = "2021"
rust-version = { workspace = true }
description = "Yamux multiplexing protocol for libp2p"
version = "0.44.0"
version = "0.44.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"]
futures = "0.3.28"
libp2p-core = { workspace = true }
thiserror = "1.0"
yamux = "0.10.0"
yamux = "0.12"
log = "0.4"

[dev-dependencies]
Expand Down
85 changes: 30 additions & 55 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,27 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use futures::{future, prelude::*, ready, stream::BoxStream};
use futures::{future, prelude::*, ready};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::io::{IoSlice, IoSliceMut};
use std::task::Waker;
use std::{
fmt, io, iter, mem,
io, iter,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use yamux::ConnectionError;

/// A Yamux connection.
#[derive(Debug)]
pub struct Muxer<C> {
/// The [`futures::stream::Stream`] of incoming substreams.
incoming: BoxStream<'static, Result<yamux::Stream, yamux::ConnectionError>>,
/// Handle to control the connection.
control: yamux::Control,
connection: yamux::Connection<C>,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
///
/// The only way how yamux can make progress is by driving the stream. However, the
/// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
Expand All @@ -54,40 +52,31 @@ pub struct Muxer<C> {
inbound_stream_buffer: VecDeque<Stream>,
/// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>,

_phantom: std::marker::PhantomData<C>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;

impl<S> fmt::Debug for Muxer<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Yamux")
}
}
/// How many streams to buffer before we start resetting them.
///
/// This is equal to the ACK BACKLOG in `rust-yamux`.
/// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset streams because they'll voluntarily stop opening them once they hit the ACK backlog.
const MAX_BUFFERED_INBOUND_STREAMS: usize = 256;

impl<C> Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
/// Create a new Yamux connection.
fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();

Self {
incoming: yamux::into_stream(conn).err_into().boxed(),
control: ctrl,
Muxer {
connection: yamux::Connection::new(io, cfg, mode),
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
_phantom: Default::default(),
}
}
}

impl<C> StreamMuxer for Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Substream = Stream;
type Error = Error;
Expand All @@ -112,10 +101,15 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Pin::new(&mut self.control)
.poll_open_stream(cx)
.map_ok(Stream)
.map_err(Error)
let stream = ready!(self.connection.poll_new_outbound(cx).map_err(Error)?);

Poll::Ready(Ok(Stream(stream)))
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.connection.poll_close(cx).map_err(Error)?);

Poll::Ready(Ok(()))
}

fn poll(
Expand All @@ -141,23 +135,6 @@ where
cx.waker().wake_by_ref();
Poll::Pending
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<Result<(), Error>> {
if let Poll::Ready(()) = Pin::new(&mut self.control).poll_close(c).map_err(Error)? {
return Poll::Ready(Ok(()));
}

while let Poll::Ready(maybe_inbound_stream) =
self.incoming.poll_next_unpin(c).map_err(Error)?
{
match maybe_inbound_stream {
Some(inbound_stream) => mem::drop(inbound_stream),
None => return Poll::Ready(Ok(())),
}
}

Poll::Pending
}
}

/// A stream produced by the yamux multiplexer.
Expand Down Expand Up @@ -210,18 +187,16 @@ impl AsyncWrite for Stream {

impl<C> Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()
.map_err(Error)?
.map(Stream)
.ok_or(Error(ConnectionError::Closed))?;

Ok(stream)
})
let stream = ready!(self.connection.poll_next_inbound(cx))
.transpose()
.map_err(Error)?
.map(Stream)
.ok_or(Error(ConnectionError::Closed))?;

Poll::Ready(Ok(stream))
}
}

Expand Down
4 changes: 2 additions & 2 deletions protocols/kad/tests/client_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ async fn two_servers_add_each_other_to_routing_table() {

match libp2p_swarm_test::drive(&mut server2, &mut server1).await {
(
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Identify(_)],
)
| (
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_)],
) => {
assert_eq!(peer2, server1_peer_id);
Expand Down

0 comments on commit 94dc11b

Please sign in to comment.