Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce libp2p-stream #5027

Merged
merged 40 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c5e2fcb
Add basic boilerplate for stream example
thomaseizinger Dec 24, 2023
2f19988
Write the demo application
thomaseizinger Dec 24, 2023
0a7402b
Introduce `PeerControl`
thomaseizinger Dec 24, 2023
9ff2146
Initial implementation
thomaseizinger Dec 24, 2023
f50a25f
Split code up into modules
thomaseizinger Dec 24, 2023
dbbd3b8
Split `register` into `new_control` and `accept`
thomaseizinger Dec 24, 2023
b0380fa
De-register protocol on drop of `IncomingStreams`
thomaseizinger Dec 24, 2023
61c5823
Add TODO
thomaseizinger Dec 25, 2023
09f0ec3
Import `Pin`
thomaseizinger Dec 25, 2023
1bd3206
Introduce `libp2p-stream`
thomaseizinger Dec 27, 2023
b0a5dc2
Improve docs
thomaseizinger Dec 27, 2023
ac4c208
Rename error
thomaseizinger Dec 27, 2023
7d75564
Remove obsolete TODO
thomaseizinger Dec 27, 2023
62f2cd7
Make tests pass
thomaseizinger Dec 27, 2023
3c3ce7a
Add failing test
thomaseizinger Dec 27, 2023
91721a0
Implement some keep-alive logic
thomaseizinger Dec 27, 2023
3757b0b
Use flume rendezvous channel for `IncomingStreams`
thomaseizinger Dec 27, 2023
d856fee
`IncomingStreams` should not keep connection alive
thomaseizinger Dec 27, 2023
abb9039
Add TODOs
thomaseizinger Dec 27, 2023
9cc05ff
Remove `PeerControl` API
thomaseizinger Dec 30, 2023
0e029c3
Update readme
thomaseizinger Dec 30, 2023
ab9284e
Change example to echo protocol
thomaseizinger Dec 30, 2023
c75b39d
Update examples/stream/src/main.rs
thomaseizinger Dec 31, 2023
d7b1825
Make `Control` independent of protocol
thomaseizinger Jan 13, 2024
415cef7
Move `accept` to `Control`
thomaseizinger Jan 13, 2024
6f71534
Add test for dial error
thomaseizinger Jan 13, 2024
deab410
Improve docs
thomaseizinger Jan 13, 2024
d9080c1
Don't unwarp poisoned lock
thomaseizinger Jan 13, 2024
bca58fe
Introduce `control.rs` module
thomaseizinger Jan 13, 2024
ce1f3b7
Introduce `shared` module
thomaseizinger Jan 13, 2024
08befa7
Add backpressure docs
thomaseizinger Jan 13, 2024
9bcbedc
Add `Default` impl for `Behaviour`
thomaseizinger Jan 13, 2024
3e696da
Add metadata to manifest
thomaseizinger Jan 13, 2024
92ef685
Fix doc links
thomaseizinger Jan 13, 2024
c28a03c
Merge branch 'master' into feat/stream-behaviour
thomaseizinger Jan 13, 2024
6179f57
Add version entry
thomaseizinger Jan 13, 2024
e266657
Undo changes in swarm
thomaseizinger Jan 13, 2024
a838cf9
Add changelog file
thomaseizinger Jan 13, 2024
05d1981
Correct title to resource management
thomaseizinger Jan 15, 2024
95961aa
Merge branch 'master' into feat/stream-behaviour
mergify[bot] Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"examples/ping",
"examples/relay-server",
"examples/rendezvous",
"examples/stream",
"examples/upnp",
"hole-punching-tests",
"identity",
Expand Down Expand Up @@ -45,10 +46,11 @@ members = [
"protocols/relay",
"protocols/rendezvous",
"protocols/request-response",
"protocols/stream",
"protocols/upnp",
"swarm",
"swarm-derive",
"swarm-test",
"swarm",
"transports/dns",
"transports/noise",
"transports/plaintext",
Expand All @@ -57,11 +59,11 @@ members = [
"transports/tcp",
"transports/tls",
"transports/uds",
"transports/webrtc",
"transports/webrtc-websys",
"transports/webrtc",
"transports/websocket-websys",
"transports/websocket",
"transports/webtransport-websys",
"transports/websocket-websys",
"wasm-tests/webtransport-tests",
]
resolver = "2"
Expand Down Expand Up @@ -99,6 +101,7 @@ libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
libp2p-server = { version = "0.12.5", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
Expand Down
22 changes: 22 additions & 0 deletions examples/stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "stream-example"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"

[package.metadata.release]
release = false

[dependencies]
anyhow = "1"
futures = "0.3.29"
libp2p = { path = "../../libp2p", features = [ "tokio", "quic"] }
libp2p-stream = { path = "../../protocols/stream", version = "0.1.0-alpha" }
rand = "0.8"
tokio = { version = "1.35", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
35 changes: 35 additions & 0 deletions examples/stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## Description

This example shows the usage of the `stream::Behaviour`.
As a counter-part to the `request_response::Behaviour`, the `stream::Behaviour` allows users to write stream-oriented protocols whilst having minimal interaction with the `Swarm`.

In this showcase, we implement an echo protocol: All incoming data is echoed back to the dialer, until the stream is closed.

## Usage

To run the example, follow these steps:

1. Start an instance of the example in one terminal:

```sh
cargo run --bin stream-example
```

Observe printed listen address.

2. Start another instance in a new terminal, providing the listen address of the first one.

```sh
cargo run --bin stream-example -- <address>
```

3. Both terminals should now continuosly print messages.

## Conclusion

The `stream::Behaviour` is an "escape-hatch" from the way typical rust-libp2p protocols are written.
It is suitable for several scenarios including:

- prototyping of new protocols
- experimentation with rust-libp2p
- integration in `async/await`-heavy applications
154 changes: 154 additions & 0 deletions examples/stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::{io, time::Duration};

use anyhow::{Context, Result};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
use libp2p_stream as stream;
use rand::RngCore;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?,
)
.init();

let maybe_address = std::env::args()
.nth(1)
.map(|arg| arg.parse::<Multiaddr>())
.transpose()
.context("Failed to parse argument as `Multiaddr`")?;

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_quic()
.with_behaviour(|_| stream::Behaviour::new())?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
.build();

swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;

let mut incoming_streams = swarm
.behaviour()
.new_control()
.accept(ECHO_PROTOCOL)
.unwrap();

// Deal with incoming streams.
// Spawning a dedicated task is just one way of doing this.
// libp2p doesn't care how you handle incoming streams but you _must_ handle them somehow.
// To mitigate DoS attacks, libp2p will internally drop incoming streams if your application cannot keep up processing them.
tokio::spawn(async move {
// This loop handles incoming streams _sequentially_ but that doesn't have to be the case.
// You can also spawn a dedicated task per stream if you want to.
// Be aware that this breaks backpressure though as spawning new tasks is equivalent to an unbounded buffer.
// Each task needs memory meaning an aggressive remote peer may force you OOM this way.

while let Some((peer, stream)) = incoming_streams.next().await {
match echo(stream).await {
Ok(n) => {
tracing::info!(%peer, "Echoed {n} bytes!");
}
Err(e) => {
tracing::warn!(%peer, "Echo failed: {e}");
continue;
}
};
}
});

// In this demo application, the dialing peer initiates the protocol.
if let Some(address) = maybe_address {
let Some(Protocol::P2p(peer_id)) = address.iter().last() else {
anyhow::bail!("Provided address does not end in `/p2p`");
};

swarm.dial(address)?;

tokio::spawn(connection_handler(peer_id, swarm.behaviour().new_control()));
}

// Poll the swarm to make progress.
loop {
let event = swarm.next().await.expect("never terminates");

match event {
libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
let listen_address = address.with_p2p(*swarm.local_peer_id()).unwrap();
tracing::info!(%listen_address);
}
event => tracing::trace!(?event),
}
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

/// A very simple, `async fn`-based connection handler for our custom echo protocol.
async fn connection_handler(peer: PeerId, mut control: stream::Control) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.

let stream = match control.open_stream(peer, ECHO_PROTOCOL).await {
Ok(stream) => stream,
Err(error @ stream::OpenStreamError::UnsupportedProtocol(_)) => {
tracing::info!(%peer, %error);
return;
}
Err(error) => {
// Other errors may be temporary.
// In production, something like an exponential backoff / circuit-breaker may be more appropriate.
tracing::debug!(%peer, %error);
continue;
}
};

if let Err(e) = send(stream).await {
tracing::warn!(%peer, "Echo protocol failed: {e}");
continue;
}

tracing::info!(%peer, "Echo complete!")
}
}

async fn echo(mut stream: Stream) -> io::Result<usize> {
let mut total = 0;

let mut buf = [0u8; 100];

loop {
let read = stream.read(&mut buf).await?;
if read == 0 {
return Ok(total);
}

total += read;
stream.write_all(&buf[..read]).await?;
}
}

async fn send(mut stream: Stream) -> io::Result<()> {
let num_bytes = rand::random::<usize>() % 1000;

let mut bytes = vec![0; num_bytes];
rand::thread_rng().fill_bytes(&mut bytes);

stream.write_all(&bytes).await?;

let mut buf = vec![0; num_bytes];
stream.read_exact(&mut buf).await?;

if bytes != buf {
return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
}

stream.close().await?;

Ok(())
}
3 changes: 3 additions & 0 deletions protocols/stream/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.1.0-alpha

Initial release.
27 changes: 27 additions & 0 deletions protocols/stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "libp2p-stream"
version = "0.1.0-alpha"
edition = "2021"
rust-version.workspace = true
description = "Generic stream protocols for libp2p"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
futures = "0.3.29"
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
libp2p-swarm = { workspace = true }
tracing = "0.1.37"
void = "1"
rand = "0.8"

[dev-dependencies]
libp2p-swarm-test = { workspace = true }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
Loading
Loading