Skip to content

Commit

Permalink
feat: introduce libp2p-stream
Browse files Browse the repository at this point in the history
For a while now, `rust-libp2p` provided the `request-response` abstraction which makes it easy for users to build request-response based protocols without having to implement a `NetworkBehaviour` themselves. This PR introduces an alpha version of `libp2p-stream`: a `NetworkBehaviour` that directly gives access to negotiated streams.

In addition to complementing `request-response`, `libp2p-stream` also diverges in its design from the remaining modules by offering a clonable `Control` that provides `async` functions.

Resolves: libp2p#4457.

Pull-Request: libp2p#5027.
  • Loading branch information
thomaseizinger authored Jan 16, 2024
0 parents commit 8aeb660
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 0 deletions.
22 changes: 22 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "stream-example"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"

[package.metadata.release]
release = false

[dependencies]
anyhow = "1"
futures = "0.3.29"
libp2p = { path = "../../libp2p", features = [ "tokio", "quic"] }
libp2p-stream = { path = "../../protocols/stream", version = "0.1.0-alpha" }
rand = "0.8"
tokio = { version = "1.35", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints]
workspace = true
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## Description

This example shows the usage of the `stream::Behaviour`.
As a counter-part to the `request_response::Behaviour`, the `stream::Behaviour` allows users to write stream-oriented protocols whilst having minimal interaction with the `Swarm`.

In this showcase, we implement an echo protocol: All incoming data is echoed back to the dialer, until the stream is closed.

## Usage

To run the example, follow these steps:

1. Start an instance of the example in one terminal:

```sh
cargo run --bin stream-example
```

Observe printed listen address.

2. Start another instance in a new terminal, providing the listen address of the first one.

```sh
cargo run --bin stream-example -- <address>
```

3. Both terminals should now continuosly print messages.

## Conclusion

The `stream::Behaviour` is an "escape-hatch" from the way typical rust-libp2p protocols are written.
It is suitable for several scenarios including:

- prototyping of new protocols
- experimentation with rust-libp2p
- integration in `async/await`-heavy applications
154 changes: 154 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::{io, time::Duration};

use anyhow::{Context, Result};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
use libp2p_stream as stream;
use rand::RngCore;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?,
)
.init();

let maybe_address = std::env::args()
.nth(1)
.map(|arg| arg.parse::<Multiaddr>())
.transpose()
.context("Failed to parse argument as `Multiaddr`")?;

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_quic()
.with_behaviour(|_| stream::Behaviour::new())?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
.build();

swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;

let mut incoming_streams = swarm
.behaviour()
.new_control()
.accept(ECHO_PROTOCOL)
.unwrap();

// Deal with incoming streams.
// Spawning a dedicated task is just one way of doing this.
// libp2p doesn't care how you handle incoming streams but you _must_ handle them somehow.
// To mitigate DoS attacks, libp2p will internally drop incoming streams if your application cannot keep up processing them.
tokio::spawn(async move {
// This loop handles incoming streams _sequentially_ but that doesn't have to be the case.
// You can also spawn a dedicated task per stream if you want to.
// Be aware that this breaks backpressure though as spawning new tasks is equivalent to an unbounded buffer.
// Each task needs memory meaning an aggressive remote peer may force you OOM this way.

while let Some((peer, stream)) = incoming_streams.next().await {
match echo(stream).await {
Ok(n) => {
tracing::info!(%peer, "Echoed {n} bytes!");
}
Err(e) => {
tracing::warn!(%peer, "Echo failed: {e}");
continue;
}
};
}
});

// In this demo application, the dialing peer initiates the protocol.
if let Some(address) = maybe_address {
let Some(Protocol::P2p(peer_id)) = address.iter().last() else {
anyhow::bail!("Provided address does not end in `/p2p`");
};

swarm.dial(address)?;

tokio::spawn(connection_handler(peer_id, swarm.behaviour().new_control()));
}

// Poll the swarm to make progress.
loop {
let event = swarm.next().await.expect("never terminates");

match event {
libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
let listen_address = address.with_p2p(*swarm.local_peer_id()).unwrap();
tracing::info!(%listen_address);
}
event => tracing::trace!(?event),
}
}
}

/// A very simple, `async fn`-based connection handler for our custom echo protocol.
async fn connection_handler(peer: PeerId, mut control: stream::Control) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.

let stream = match control.open_stream(peer, ECHO_PROTOCOL).await {
Ok(stream) => stream,
Err(error @ stream::OpenStreamError::UnsupportedProtocol(_)) => {
tracing::info!(%peer, %error);
return;
}
Err(error) => {
// Other errors may be temporary.
// In production, something like an exponential backoff / circuit-breaker may be more appropriate.
tracing::debug!(%peer, %error);
continue;
}
};

if let Err(e) = send(stream).await {
tracing::warn!(%peer, "Echo protocol failed: {e}");
continue;
}

tracing::info!(%peer, "Echo complete!")
}
}

async fn echo(mut stream: Stream) -> io::Result<usize> {
let mut total = 0;

let mut buf = [0u8; 100];

loop {
let read = stream.read(&mut buf).await?;
if read == 0 {
return Ok(total);
}

total += read;
stream.write_all(&buf[..read]).await?;
}
}

async fn send(mut stream: Stream) -> io::Result<()> {
let num_bytes = rand::random::<usize>() % 1000;

let mut bytes = vec![0; num_bytes];
rand::thread_rng().fill_bytes(&mut bytes);

stream.write_all(&bytes).await?;

let mut buf = vec![0; num_bytes];
stream.read_exact(&mut buf).await?;

if bytes != buf {
return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
}

stream.close().await?;

Ok(())
}

0 comments on commit 8aeb660

Please sign in to comment.