Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: Rework transport to be async #1116

Merged
merged 2 commits into from
Apr 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions docs/architecture/adr-009-transport-agnostic-peer-abstraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Changelog
* 2021-02-05: drafted
* 2022-04-08: revised

## Context

Expand Down Expand Up @@ -32,9 +33,9 @@ resources on the system.
pub trait Transport {
type Connection: Connection;
type Endpoint: Endpoint<Connection = <Self as Transport>::Connection>;
type Incoming: Iterator<Item = Result<<Self as Transport>::Connection>> + Send;
type Incoming: Stream<Item = Result<<Self as Transport>::Connection>> + Send + Sync;

fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>;
async fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>;
}
```

Expand All @@ -43,10 +44,10 @@ incoming `Connection`s. Which is a standardised way to connect to new peers and
react to newly connected ones respectively.

``` rust
pub trait Endpoint: Send {
pub trait Endpoint: Send + Sync {
type Connection;

fn connect(&self, info: ConnectInfo) -> Result<Self::Connection>;
async fn connect(&self, info: ConnectInfo) -> Result<Self::Connection>;
fn listen_addrs(&self) -> Vec<SocketAddr>;
}
```
Expand All @@ -58,19 +59,19 @@ While being open to enable feature parity with current production installations
based on tendermint-go's `MConn`.

``` rust
pub trait StreamSend {
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
pub trait StreamSend: Send + Sync {
async fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

pub trait Connection: Send {
pub trait Connection: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
type StreamRead: Stream<Item = Result<Vec<u8>>> + Send;
type StreamSend: StreamSend;

fn advertised_addrs(&self) -> Vec<SocketAddr>;
fn close(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
fn local_addr(&self) -> SocketAddr;
fn open_bidirectional(
async fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
Expand All @@ -96,11 +97,11 @@ impl<Conn> Peer<Connected<Conn>>
where
Conn: Connection,
{
pub fn run(self, stream_ids: Vec<StreamId>) -> Result<Peer<Running<Conn>>> {
pub async fn run(self, stream_ids: Vec<StreamId>) -> Result<Peer<Running<Conn>>> {
// ...
}

fn stop(self) -> Result<Peer<Stopped>> {
async fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}
Expand All @@ -109,11 +110,11 @@ impl<Conn> Peer<Running<Conn>>
where
Conn: Connection,
{
pub fn send(&self, message: message::Send) -> Result<()> {
pub async fn send(&self, message: message::Send) -> Result<()> {
// ...
}

pub fn stop(self) -> Result<Peer<Stopped>> {
pub async fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}
Expand All @@ -123,7 +124,6 @@ While sending messages is done through a method on a running peer, getting hold
of incoming messages can be achieved by draining the `Receiver` part of the
running state.


### Supervisor

The `Supervisor` is the main entry point to the p2p package giving higher-level
Expand All @@ -146,27 +146,39 @@ pub enum Event {
UpgradeFailed(node::Id, Report),
}

struct CommandHandle;

impl CommandHandle {
fn instruct(command: Command) -> Result<()> {
// ..
}
}

impl Supervisor {
pub fn run<T>(transport: T) -> Result<Self>
pub fn new<T>(transport: T) -> Self
where
T: transport::Transport + Send + 'static,
{
// ...
// ..
}

pub fn recv(&self) -> Result<Event> {
// ...
pub handle(&self) -> CommandHandle {
// ..
}

pub subscribe(&self) -> Receiver<Event> {
// ..
}

pub fn command(&self, cmd: Command) -> Result<()> {
pub async fn run<T>(self) -> Result<()> {
// ...
}
}
```

## Status

Proposed
Accepted

## Consequences

Expand Down
54 changes: 28 additions & 26 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,52 +1,54 @@
[package]
name = "tendermint-p2p"
version = "0.24.0-pre.1"
edition = "2018"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/informalsystems/tendermint-rs"
homepage = "https://tendermint.com"
readme = "README.md"
keywords = ["p2p", "tendermint", "cosmos"]
categories = ["cryptography::cryptocurrencies", "network-programming"]
keywords = [ "p2p", "tendermint", "cosmos" ]
categories = [ "cryptography::cryptocurrencies", "network-programming" ]
authors = [
"Informal Systems <hello@informal.systems>",
"Alexander Simmerl <a.simmerl@gmail.com>",
"Tony Arcieri <tony@iqlusion.io>",
"Ismail Khoffi <Ismail.Khoffi@gmail.com>",
"xla <self@xla.is>",
]

description = """
The Tendermint P2P stack in Rust.
"""

[lib]
test = false
test = false

[features]
default = ["flex-error/std", "flex-error/eyre_tracer"]
amino = ["prost-derive"]
default = [ "flex-error/std", "flex-error/eyre_tracer" ]
amino = [ "prost-derive" ]

[dependencies]
chacha20poly1305 = { version = "0.8", default-features = false, features = ["reduced-round"] }
ed25519-consensus = { version = "1.2", default-features = false }
eyre = { version = "0.6", default-features = false }
flume = { version = "0.10.7", default-features = false }
hkdf = { version = "0.10.0", default-features = false }
merlin = { version = "2", default-features = false }
prost = { version = "0.10", default-features = false }
rand_core = { version = "0.5", default-features = false, features = ["std"] }
sha2 = { version = "0.9", default-features = false }
subtle = { version = "2", default-features = false }
x25519-dalek = { version = "1.1", default-features = false, features = ["u64_backend"] }
zeroize = { version = "1", default-features = false }
signature = { version = "1.3.0", default-features = false }
aead = { version = "0.4.1", default-features = false }
flex-error = { version = "0.4.4", default-features = false }
aead = { version = "0.4", default-features = false }
async-trait = { version = "0.1", default-features = false }
chacha20poly1305 = { version = "0.8", default-features = false, features = [ "reduced-round" ] }
ed25519-consensus = { version = "1.2", default-features = false }
eyre = { version = "0.6", default-features = false }
flex-error = { version = "0.4", default-features = false }
flume = { version = "0.10", default-features = false }
futures-core = { version = "0.3", default-features = false, features = [ "alloc" ] }
hkdf = { version = "0.10", default-features = false }
merlin = { version = "2.0", default-features = false }
prost = { version = "0.10", default-features = false }
rand_core = { version = "0.5", default-features = false, features = [ "std" ] }
sha2 = { version = "0.9", default-features = false }
signature = { version = "1.3", default-features = false }
subtle = { version = "2.0", default-features = false }
x25519-dalek = { version = "1.1", default-features = false, features = [ "u64_backend" ] }
zeroize = { version = "1.0", default-features = false }

# path dependencies
tendermint = { path = "../tendermint", version = "0.24.0-pre.1", default-features = false }
tendermint-proto = { path = "../proto", version = "0.24.0-pre.1", default-features = false }
tendermint-std-ext = { path = "../std-ext", version = "0.24.0-pre.1", default-features = false }
tendermint = { version = "0.24.0-pre.1", default-features = false, path = "../tendermint" }
tendermint-proto = { version = "0.24.0-pre.1", default-features = false, path = "../proto" }
tendermint-std-ext = { version = "0.24.0-pre.1", default-features = false, path = "../std-ext" }

# optional dependencies
prost-derive = { version = "0.10", optional = true }
prost-derive = { version = "0.10", optional = true }
24 changes: 15 additions & 9 deletions p2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

use std::net::{SocketAddr, ToSocketAddrs};

use async_trait::async_trait;
use eyre::Result;
use futures_core::Stream;

use tendermint::node;
use tendermint::public_key::PublicKey;
Expand Down Expand Up @@ -54,7 +56,8 @@ pub enum Direction<Conn> {
}

/// Trait that describes the send end of a stream.
pub trait StreamSend {
#[async_trait]
pub trait StreamSend: Send + Sync {
/// Sends the message to the peer over the open stream. `msg` should be a valid and properly
/// encoded byte array according to the supported messages of the stream.
///
Expand All @@ -63,17 +66,18 @@ pub trait StreamSend {
/// * If the underlying I/O operations fail.
/// * If the stream is closed.
/// * If the peer is gone
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
async fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

/// Trait which describes the core concept of a connection between two peers established by
/// `[Transport]`.
#[async_trait]
pub trait Connection: Send {
/// Errors emitted by the connection.
type Error;
/// Read end of a bidirectional stream. Carries a finite stream of framed messages. Decoding is
/// left to the caller and should correspond to the type of stream.
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
type StreamRead: Stream<Item = Result<Vec<u8>>> + Send + Sync;
/// Send end of a stream.
type StreamSend: StreamSend;

Expand All @@ -84,7 +88,7 @@ pub trait Connection: Send {
/// # Errors
///
/// * If release of attached resources failed.
fn close(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
/// Returns the local address for the connection.
fn local_addr(&self) -> SocketAddr;
/// Opens a new bi-bidirectional stream for the given [`StreamId`].
Expand All @@ -94,7 +98,7 @@ pub trait Connection: Send {
/// * If the stream type is not supported.
/// * If the peer is gone.
/// * If resources necessary for the stream creation aren't available/accessible.
fn open_bidirectional(
async fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
Expand All @@ -105,7 +109,8 @@ pub trait Connection: Send {
}

/// Local handle on a resource which allows connecting to remote peers.
pub trait Endpoint<A>: Send
#[async_trait]
pub trait Endpoint<A>: Send + Sync
where
A: ToSocketAddrs,
{
Expand All @@ -118,12 +123,13 @@ where
///
/// * If the remote is not reachable.
/// * If resources necessary for the connection creation aren't available/accessible.
fn connect(&self, info: ConnectInfo<A>) -> Result<Self::Connection>;
async fn connect(&self, info: ConnectInfo<A>) -> Result<Self::Connection>;
/// Local address(es) the endpoint listens on.
fn listen_addrs(&self) -> Vec<SocketAddr>;
}

/// Trait that describes types which support connection management of the p2p stack.
#[async_trait]
pub trait Transport<A>
where
A: ToSocketAddrs,
Expand All @@ -133,13 +139,13 @@ where
/// Local handle on a resource which allows connecting to remote peers.
type Endpoint: Endpoint<A, Connection = <Self as Transport<A>>::Connection> + Drop;
/// Infinite stream of inbound connections.
type Incoming: Iterator<Item = Result<<Self as Transport<A>>::Connection>> + Send;
type Incoming: Stream<Item = Result<<Self as Transport<A>>::Connection>> + Send + Sync;

/// Consumes the transport to bind the resources in exchange for the `Endpoint` and `Incoming`
/// stream.
///
/// # Errors
///
/// * If resource allocation fails for lack of privileges or being not available.
fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
async fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
}