Skip to content

Commit

Permalink
p2p: Rework transport to be async
Browse files Browse the repository at this point in the history
While it was well-intended the reality is that it's going to be
impractical and hinder potential adoption trying to not give in to the
sweet siren sounds of the async ecosystem. The author since has changed
their stance and adopted a conviction that well tuned and guard-railed
async can be workable and lead to maintainable solutions. This
change-set is in preparation of bringing in peer and supervisor
abstractions as described in the ADR.

Signed-off-by: xla <self@xla.is>
  • Loading branch information
xla committed Apr 9, 2022
1 parent 337cadc commit 07e3ab1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
54 changes: 33 additions & 21 deletions docs/architecture/adr-009-transport-agnostic-peer-abstraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Changelog
* 2021-02-05: drafted
* 2022-04-08: revised

## Context

Expand Down Expand Up @@ -32,9 +33,9 @@ resources on the system.
pub trait Transport {
type Connection: Connection;
type Endpoint: Endpoint<Connection = <Self as Transport>::Connection>;
type Incoming: Iterator<Item = Result<<Self as Transport>::Connection>> + Send;
type Incoming: Stream<Item = Result<<Self as Transport>::Connection>> + Send + Sync;

fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>;
async fn bind(self, bind_info: BindInfo) -> Result<(Self::Endpoint, Self::Incoming)>;
}
```

Expand All @@ -43,10 +44,10 @@ incoming `Connection`s. Which is a standardised way to connect to new peers and
react to newly connected ones respectively.

``` rust
pub trait Endpoint: Send {
pub trait Endpoint: Send + Sync {
type Connection;

fn connect(&self, info: ConnectInfo) -> Result<Self::Connection>;
async fn connect(&self, info: ConnectInfo) -> Result<Self::Connection>;
fn listen_addrs(&self) -> Vec<SocketAddr>;
}
```
Expand All @@ -58,19 +59,19 @@ While being open to enable feature parity with current production installations
based on tendermint-go's `MConn`.

``` rust
pub trait StreamSend {
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
pub trait StreamSend: Send + Sync {
async fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

pub trait Connection: Send {
pub trait Connection: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
type StreamRead: Stream<Item = Result<Vec<u8>>> + Send;
type StreamSend: StreamSend;

fn advertised_addrs(&self) -> Vec<SocketAddr>;
fn close(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
fn local_addr(&self) -> SocketAddr;
fn open_bidirectional(
async fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
Expand All @@ -96,11 +97,11 @@ impl<Conn> Peer<Connected<Conn>>
where
Conn: Connection,
{
pub fn run(self, stream_ids: Vec<StreamId>) -> Result<Peer<Running<Conn>>> {
pub async fn run(self, stream_ids: Vec<StreamId>) -> Result<Peer<Running<Conn>>> {
// ...
}

fn stop(self) -> Result<Peer<Stopped>> {
async fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}
Expand All @@ -109,11 +110,11 @@ impl<Conn> Peer<Running<Conn>>
where
Conn: Connection,
{
pub fn send(&self, message: message::Send) -> Result<()> {
pub async fn send(&self, message: message::Send) -> Result<()> {
// ...
}

pub fn stop(self) -> Result<Peer<Stopped>> {
pub async fn stop(self) -> Result<Peer<Stopped>> {
// ...
}
}
Expand All @@ -123,7 +124,6 @@ While sending messages is done through a method on a running peer, getting hold
of incoming messages can be achieved by draining the `Receiver` part of the
running state.


### Supervisor

The `Supervisor` is the main entry point to the p2p package giving higher-level
Expand All @@ -146,27 +146,39 @@ pub enum Event {
UpgradeFailed(node::Id, Report),
}

struct CommandHandle;

impl CommandHandle {
fn instruct(command: Command) -> Result<()> {
// ..
}
}

impl Supervisor {
pub fn run<T>(transport: T) -> Result<Self>
pub fn new<T>(transport: T) -> Self
where
T: transport::Transport + Send + 'static,
{
// ...
// ..
}

pub fn recv(&self) -> Result<Event> {
// ...
pub handle(&self) -> CommandHandle {
// ..
}

pub subscribe(&self) -> Receiver<Event> {
// ..
}

pub fn command(&self, cmd: Command) -> Result<()> {
pub async fn run<T>(self) -> Result<()> {
// ...
}
}
```

## Status

Proposed
Accepted

## Consequences

Expand Down
2 changes: 2 additions & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ amino = [ "prost-derive" ]

[dependencies]
aead = { version = "0.4", default-features = false }
async-trait = { version = "0.1", default-features = false }
chacha20poly1305 = { version = "0.8", default-features = false, features = [ "reduced-round" ] }
ed25519-consensus = { version = "1.2", default-features = false }
eyre = { version = "0.6", default-features = false }
flex-error = { version = "0.4", default-features = false }
flume = { version = "0.10", default-features = false }
futures-core = { version = "0.3", default-features = false, features = [ "alloc" ] }
hkdf = { version = "0.10", default-features = false }
merlin = { version = "2.0", default-features = false }
prost = { version = "0.10", default-features = false }
Expand Down
24 changes: 15 additions & 9 deletions p2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use std::net::{SocketAddr, ToSocketAddrs};

use async_trait::async_trait;
use eyre::Result;
use futures_core::Stream;

use tendermint::node;
use tendermint::public_key::PublicKey;
Expand Down Expand Up @@ -54,7 +56,8 @@ pub enum Direction<Conn> {
}

/// Trait that describes the send end of a stream.
pub trait StreamSend {
#[async_trait]
pub trait StreamSend: Send + Sync {
/// Sends the message to the peer over the open stream. `msg` should be a valid and properly
/// encoded byte array according to the supported messages of the stream.
///
Expand All @@ -63,17 +66,18 @@ pub trait StreamSend {
/// * If the underlying I/O operations fail.
/// * If the stream is closed.
/// * If the peer is gone
fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
async fn send<B: AsRef<[u8]>>(msg: B) -> Result<()>;
}

/// Trait which describes the core concept of a connection between two peers established by
/// `[Transport]`.
#[async_trait]
pub trait Connection: Send {
/// Errors emitted by the connection.
type Error;
/// Read end of a bidirectional stream. Carries a finite stream of framed messages. Decoding is
/// left to the caller and should correspond to the type of stream.
type StreamRead: Iterator<Item = Result<Vec<u8>>> + Send;
type StreamRead: Stream<Item = Result<Vec<u8>>> + Send + Sync;
/// Send end of a stream.
type StreamSend: StreamSend;

Expand All @@ -84,7 +88,7 @@ pub trait Connection: Send {
/// # Errors
///
/// * If release of attached resources failed.
fn close(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
/// Returns the local address for the connection.
fn local_addr(&self) -> SocketAddr;
/// Opens a new bi-bidirectional stream for the given [`StreamId`].
Expand All @@ -94,7 +98,7 @@ pub trait Connection: Send {
/// * If the stream type is not supported.
/// * If the peer is gone.
/// * If resources necessary for the stream creation aren't available/accessible.
fn open_bidirectional(
async fn open_bidirectional(
&self,
stream_id: StreamId,
) -> Result<(Self::StreamRead, Self::StreamSend), Self::Error>;
Expand All @@ -105,7 +109,8 @@ pub trait Connection: Send {
}

/// Local handle on a resource which allows connecting to remote peers.
pub trait Endpoint<A>: Send
#[async_trait]
pub trait Endpoint<A>: Send + Sync
where
A: ToSocketAddrs,
{
Expand All @@ -118,12 +123,13 @@ where
///
/// * If the remote is not reachable.
/// * If resources necessary for the connection creation aren't available/accessible.
fn connect(&self, info: ConnectInfo<A>) -> Result<Self::Connection>;
async fn connect(&self, info: ConnectInfo<A>) -> Result<Self::Connection>;
/// Local address(es) the endpoint listens on.
fn listen_addrs(&self) -> Vec<SocketAddr>;
}

/// Trait that describes types which support connection management of the p2p stack.
#[async_trait]
pub trait Transport<A>
where
A: ToSocketAddrs,
Expand All @@ -133,13 +139,13 @@ where
/// Local handle on a resource which allows connecting to remote peers.
type Endpoint: Endpoint<A, Connection = <Self as Transport<A>>::Connection> + Drop;
/// Infinite stream of inbound connections.
type Incoming: Iterator<Item = Result<<Self as Transport<A>>::Connection>> + Send;
type Incoming: Stream<Item = Result<<Self as Transport<A>>::Connection>> + Send + Sync;

/// Consumes the transport to bind the resources in exchange for the `Endpoint` and `Incoming`
/// stream.
///
/// # Errors
///
/// * If resource allocation fails for lack of privileges or being not available.
fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
async fn bind(self, bind_info: BindInfo<A>) -> Result<(Self::Endpoint, Self::Incoming)>;
}

0 comments on commit 07e3ab1

Please sign in to comment.