Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps(yamux): update yamux to v0.12 #3013

Merged
merged 50 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
084152c
Bump yamux
thomaseizinger Oct 12, 2022
04d01a5
Bump
thomaseizinger Nov 2, 2022
182ada8
Merge branch 'master' into bump-yamux
thomaseizinger Nov 2, 2022
1ee9ad9
Merge branch 'master' into bump-yamux
thomaseizinger Nov 14, 2022
40ad21b
More use of `ready` macro
thomaseizinger Nov 14, 2022
49f45ae
Make error type transparent
thomaseizinger Nov 14, 2022
474d339
Apply same member order as trait
thomaseizinger Nov 14, 2022
76e4439
Fix doc link
thomaseizinger Nov 14, 2022
012849f
Merge branch 'master' into bump-yamux
thomaseizinger Dec 7, 2022
485786c
Point to `yamux 0.11.0`
thomaseizinger Dec 7, 2022
a962ef3
Set version to `0.43.0-alpha`
thomaseizinger Dec 7, 2022
cc665e0
Add changelog entry
thomaseizinger Dec 7, 2022
15a6f71
Fix changelog and remove patch
thomaseizinger Dec 7, 2022
75b440b
Set yamux to new version
thomaseizinger Dec 7, 2022
4c9133f
Merge branch 'master' into bump-yamux
thomaseizinger Dec 12, 2022
952a4e9
Fix doc reference
thomaseizinger Dec 12, 2022
2823dcd
Merge branch 'master' into bump-yamux
thomaseizinger Feb 21, 2023
2568aab
Merge branch 'master' into bump-yamux
thomaseizinger Mar 3, 2023
bf25bd8
Wrap `yamux::Stream`
thomaseizinger Mar 3, 2023
124de4b
Update changelog
thomaseizinger Mar 3, 2023
ed95385
Merge branch 'master' into bump-yamux
thomaseizinger Mar 22, 2023
2e5099f
Remove unused `From` impl
thomaseizinger Mar 22, 2023
8675038
Derive `Debug`
thomaseizinger Mar 22, 2023
059270e
Merge branch 'master' into bump-yamux
thomaseizinger Apr 3, 2023
d541e7d
Merge branch 'master' into bump-yamux
thomaseizinger Apr 28, 2023
a9d2e9e
Remove `From` impl as it leaks into the public API
thomaseizinger Apr 28, 2023
8e9ef00
Merge branch 'master' into bump-yamux
thomaseizinger May 1, 2023
f7d69e7
Merge branch 'master' into bump-yamux
thomaseizinger May 2, 2023
e70fbdd
Merge branch 'master' into bump-yamux
thomaseizinger May 4, 2023
6e14db2
Update to yamux patch version
thomaseizinger May 19, 2023
b26893c
Merge branch 'master' into bump-yamux
thomaseizinger May 19, 2023
5c712d1
Fix compile errors
thomaseizinger May 19, 2023
3bf8e1b
Update to yamux v0.11.1
mxinden May 26, 2023
53396bd
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into b…
mxinden May 26, 2023
96176d8
Update muxers/yamux/src/lib.rs
mxinden May 26, 2023
1bbe680
Merge branch 'master' into bump-yamux
thomaseizinger Jun 30, 2023
b0b7688
Try with latest patch
thomaseizinger Jun 30, 2023
a4a7a34
Align buffer size with ACK backlog
thomaseizinger Jun 30, 2023
349c262
Better docs
thomaseizinger Jun 30, 2023
e9c4f93
Add identify to interop-tests
thomaseizinger Jun 30, 2023
c5fe909
Add identify to wasm build
thomaseizinger Jun 30, 2023
7f90bb3
Merge branch 'master' into bump-yamux
thomaseizinger Jun 30, 2023
1cb257a
Add comment on why identify is included
thomaseizinger Jul 6, 2023
3e93fda
Update yamux to 0.12
thomaseizinger Jul 20, 2023
162566e
Merge branch 'master' into bump-yamux
thomaseizinger Jul 20, 2023
43318d9
Remove already ignored events
thomaseizinger Jul 31, 2023
8dfddae
Bump version
thomaseizinger Jul 31, 2023
23564bc
Merge branch 'master' into bump-yamux
thomaseizinger Jul 31, 2023
eeb9095
Update muxers/yamux/CHANGELOG.md
thomaseizinger Aug 2, 2023
a7b9e0e
Merge branch 'master' into bump-yamux
mergify[bot] Aug 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" }
libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" }
libp2p-websocket = { version = "0.42.0", path = "transports/websocket" }
libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" }
libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" }
multistream-select = { version = "0.13.0", path = "misc/multistream-select" }
quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" }
quickcheck = { package = "quickcheck-ext", path = "misc/quickcheck-ext" }
Expand Down
4 changes: 2 additions & 2 deletions interop-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rand = "0.8.5"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
axum = "0.6"
libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns"] }
libp2p = { path = "../libp2p", features = ["ping", "noise", "tls", "rsa", "macros", "websocket", "tokio", "yamux", "tcp", "dns", "identify"] }
libp2p-quic = { workspace = true, features = ["tokio"] }
libp2p-webrtc = { workspace = true, features = ["tokio"] }
libp2p-mplex = { path = "../muxers/mplex" }
Expand All @@ -34,7 +34,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen"] }
libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] }
wasm-bindgen = { version = "0.2" }
wasm-bindgen-futures = { version = "0.4" }
wasm-logger = { version = "0.2.0" }
Expand Down
10 changes: 8 additions & 2 deletions interop-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::{bail, Context, Result};
use futures::{FutureExt, StreamExt};
use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent};
use libp2p::{identity, ping, Multiaddr, PeerId};
use libp2p::{identify, identity, ping, Multiaddr, PeerId};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;

Expand All @@ -29,12 +29,17 @@ pub async fn run_test(
let redis_client = RedisClient::new(redis_addr).context("Could not connect to redis")?;

// Build the transport from the passed ENV var.
let (boxed_transport, local_addr) = build_transport(local_key, ip, transport)?;
let (boxed_transport, local_addr) = build_transport(local_key.clone(), ip, transport)?;
let mut swarm = swarm_builder(
boxed_transport,
Behaviour {
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour,
// Need to include identify until https://github.com/status-im/nim-libp2p/issues/924 is resolved.
identify: identify::Behaviour::new(identify::Config::new(
"/interop-tests".to_owned(),
local_key.public(),
)),
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
},
local_peer_id,
)
Expand Down Expand Up @@ -237,6 +242,7 @@ impl FromStr for SecProtocol {
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
identify: identify::Behaviour,
}

/// Helper function to get a ENV variable into an test parameter like `Transport`.
Expand Down
8 changes: 8 additions & 0 deletions muxers/yamux/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 0.44.1 - unreleased

- Update to `yamux` `v0.12` which brings performance improvements and introduces an ACK backlog of 256 inbound streams.
When interacting with other libp2p nodes that are also running this or a newer version, the creation of inbound streams will be backpressured once the ACK backlog is hit.
See [PR 3013].

[PR 3013]: https://github.com/libp2p/rust-libp2p/pull/3013

## 0.44.0

- Raise MSRV to 1.65.
Expand Down
4 changes: 2 additions & 2 deletions muxers/yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-yamux"
edition = "2021"
rust-version = { workspace = true }
description = "Yamux multiplexing protocol for libp2p"
version = "0.44.0"
version = "0.44.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"]
futures = "0.3.28"
libp2p-core = { workspace = true }
thiserror = "1.0"
yamux = "0.10.0"
yamux = "0.12"
log = "0.4"

[dev-dependencies]
Expand Down
85 changes: 30 additions & 55 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,27 @@

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use futures::{future, prelude::*, ready, stream::BoxStream};
use futures::{future, prelude::*, ready};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::collections::VecDeque;
use std::io::{IoSlice, IoSliceMut};
use std::task::Waker;
use std::{
fmt, io, iter, mem,
io, iter,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use yamux::ConnectionError;

/// A Yamux connection.
#[derive(Debug)]
pub struct Muxer<C> {
/// The [`futures::stream::Stream`] of incoming substreams.
incoming: BoxStream<'static, Result<yamux::Stream, yamux::ConnectionError>>,
/// Handle to control the connection.
control: yamux::Control,
connection: yamux::Connection<C>,
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
///
/// The only way how yamux can make progress is by driving the stream. However, the
/// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
Expand All @@ -54,40 +52,31 @@ pub struct Muxer<C> {
inbound_stream_buffer: VecDeque<Stream>,
/// Waker to be called when new inbound streams are available.
inbound_stream_waker: Option<Waker>,

_phantom: std::marker::PhantomData<C>,
}

const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;

impl<S> fmt::Debug for Muxer<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Yamux")
}
}
/// How many streams to buffer before we start resetting them.
///
/// This is equal to the ACK BACKLOG in `rust-yamux`.
/// Thus, for peers running on a recent version of `rust-libp2p`, we should never need to reset streams because they'll voluntarily stop opening them once they hit the ACK backlog.
const MAX_BUFFERED_INBOUND_STREAMS: usize = 256;

impl<C> Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
/// Create a new Yamux connection.
fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();

Self {
incoming: yamux::into_stream(conn).err_into().boxed(),
control: ctrl,
Muxer {
connection: yamux::Connection::new(io, cfg, mode),
inbound_stream_buffer: VecDeque::default(),
inbound_stream_waker: None,
_phantom: Default::default(),
}
}
}

impl<C> StreamMuxer for Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Substream = Stream;
type Error = Error;
Expand All @@ -112,10 +101,15 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Pin::new(&mut self.control)
.poll_open_stream(cx)
.map_ok(Stream)
.map_err(Error)
let stream = ready!(self.connection.poll_new_outbound(cx).map_err(Error)?);

Poll::Ready(Ok(Stream(stream)))
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.connection.poll_close(cx).map_err(Error)?);

Poll::Ready(Ok(()))
}

fn poll(
Expand All @@ -141,23 +135,6 @@ where
cx.waker().wake_by_ref();
Poll::Pending
}

fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<Result<(), Error>> {
if let Poll::Ready(()) = Pin::new(&mut self.control).poll_close(c).map_err(Error)? {
return Poll::Ready(Ok(()));
}

while let Poll::Ready(maybe_inbound_stream) =
self.incoming.poll_next_unpin(c).map_err(Error)?
{
match maybe_inbound_stream {
Some(inbound_stream) => mem::drop(inbound_stream),
None => return Poll::Ready(Ok(())),
}
}

Poll::Pending
}
}

/// A stream produced by the yamux multiplexer.
Expand Down Expand Up @@ -210,18 +187,16 @@ impl AsyncWrite for Stream {

impl<C> Muxer<C>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
C: AsyncRead + AsyncWrite + Unpin + 'static,
{
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
let stream = maybe_stream
.transpose()
.map_err(Error)?
.map(Stream)
.ok_or(Error(ConnectionError::Closed))?;

Ok(stream)
})
let stream = ready!(self.connection.poll_next_inbound(cx))
.transpose()
.map_err(Error)?
.map(Stream)
.ok_or(Error(ConnectionError::Closed))?;

Poll::Ready(Ok(stream))
}
}

Expand Down
4 changes: 2 additions & 2 deletions protocols/kad/tests/client_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ async fn two_servers_add_each_other_to_routing_table() {

match libp2p_swarm_test::drive(&mut server2, &mut server1).await {
(
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Kad(RoutingUpdated { peer: peer2, .. }), Identify(_)],
[Identify(_), Identify(_)],
)
| (
[Identify(_), Kad(UnroutablePeer { .. }), Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_), Kad(RoutingUpdated { peer: peer2, .. })],
[Identify(_), Identify(_)],
) => {
assert_eq!(peer2, server1_peer_id);
Expand Down