Skip to content

Commit

Permalink
Added basic yamux connection upgrade
Browse files Browse the repository at this point in the history
Yamux provides multiplexing over a ordered reliable connection (e.g. TCP).
More info: https://github.com/hashicorp/yamux/blob/master/spec.md

- This is working from a fork of yamux (develop branch), where the
  upgrade to futures 0.3.x is close to complete
- Made a TcpStream wrapper struct which implements futures `AsyncWrite` and `AsyncRead` to reduce the tie in to tokio
- _Side note:_ investigated upgrading to futures 0.3.x however there ended up being a few external libraries which are locked to alpha futures. This is WIP for tonic (hyperium/tonic#163) and tower (no PR - we may be able to pretty easily remove this dependency)
  • Loading branch information
sdbondi committed Dec 4, 2019
1 parent c0a6762 commit e418555
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 37 deletions.
2 changes: 2 additions & 0 deletions comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ time = "0.1.42"
tokio = "0.2.0-alpha.6"
tokio-executor = { version ="^0.2.0-alpha.6", features = ["threadpool"] }
ttl_cache = "0.5.1"
# TODO: For now, perhaps release this as a crate `tari-yamux` until yamux 0.3 is released (requires upgrade to futures 0.3.x)
yamux = {git = "https://github.com/tari-project/yamux.git", branch="futures-alpha"}
zmq = "0.9.2"

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod connection_manager;
mod consts;
pub mod control_service;
pub mod inbound_message_service;
mod multiplexing;
mod noise;
pub mod outbound_message_service;
pub mod peer_manager;
Expand Down
23 changes: 23 additions & 0 deletions comms/src/multiplexing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2019, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

pub mod yamux;
231 changes: 231 additions & 0 deletions comms/src/multiplexing/yamux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2019, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::connection::Direction;
use futures::{
io::{AsyncRead, AsyncWrite},
stream::BoxStream,
StreamExt,
};
use std::{fmt::Debug, io};
use yamux::Mode;

pub type IncomingSubstream<'a> = BoxStream<'a, Result<yamux::Stream, yamux::ConnectionError>>;

#[derive(Debug)]
pub struct Yamux<TSocket> {
inner: yamux::Connection<TSocket>,
}

const MAX_BUFFER_SIZE: u32 = 8 * 1024 * 1024; // 8MB
const RECEIVE_WINDOW: u32 = 4 * 1024 * 1024; // 4MB

impl<TSocket> Yamux<TSocket>
where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
pub fn new(socket: TSocket, mode: Mode) -> Self {
let mut config = yamux::Config::default();
// Use OnRead mode instead of OnReceive mode to provide back pressure to the sending side.
// Caveat: the OnRead mode has the risk of deadlock, where both sides send data larger than
// receive window and don't read before finishing writes.
// This should never happen as the window size should be large enough for all protocol messages.
config.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
// Because OnRead mode increases the RTT of window update, bigger buffer size and receive
// window size perform better.
config.set_max_buffer_size(MAX_BUFFER_SIZE as usize);
config.set_receive_window(RECEIVE_WINDOW);

Self {
inner: yamux::Connection::new(socket, config, mode),
}
}

/// Upgrade the underlying socket to use yamux
pub async fn upgrade_connection(socket: TSocket, direction: Direction) -> io::Result<Self> {
let mode = match direction {
Direction::Inbound => Mode::Server,
Direction::Outbound => Mode::Client,
};

Ok(Self::new(socket, mode))
}

/// Get the yamux control struct
pub fn get_yamux_control(&self) -> yamux::Control {
self.inner.control()
}

/// Returns a `Stream` emitting substreams initiated by the remote
pub fn incoming(self) -> IncomingSubstream<'static> {
yamux::into_stream(self.inner).boxed()
}
}

#[cfg(test)]
mod test {
use crate::{
multiplexing::yamux::{Mode, Yamux},
test_utils::tcp::build_connected_tcp_socket_pair,
};
use futures::{
future,
io::{AsyncReadExt, AsyncWriteExt},
StreamExt,
};
use std::io;
use tokio::runtime::Runtime;

#[test]
fn open_substream() -> io::Result<()> {
let rt = Runtime::new().unwrap();
let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());
let msg = b"The Way of Kings";

let dialer = Yamux::new(dialer, Mode::Client);
let mut dialer_control = dialer.get_yamux_control();
// The incoming stream must be polled for the control to work
rt.spawn(async move {
dialer.incoming().next().await;
});

rt.spawn(async move {
let mut substream = dialer_control.open_stream().await.unwrap();

substream.write_all(msg).await.unwrap();
substream.flush().await.unwrap();
substream.close().await.unwrap();
});

let mut listener = Yamux::new(listener, Mode::Server).incoming();
let mut substream = rt
.block_on(listener.next())
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no substream"))?
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

let mut buf = Vec::new();
let _ = rt.block_on(future::select(substream.read_to_end(&mut buf), listener.next()));
assert_eq!(buf, msg);

Ok(())
}

#[test]
fn close() -> io::Result<()> {
let rt = Runtime::new().unwrap();
let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());
let msg = b"Words of Radiance";

let dialer = Yamux::new(dialer, Mode::Client);
let mut dialer_control = dialer.get_yamux_control();
// The incoming stream must be polled for the control to work
rt.spawn(async move {
dialer.incoming().next().await;
});

rt.spawn(async move {
let mut substream = dialer_control.open_stream().await.unwrap();

substream.write_all(msg).await.unwrap();
substream.flush().await.unwrap();

let mut buf = Vec::new();
substream.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"");
});

let mut incoming = Yamux::new(listener, Mode::Server).incoming();
let mut substream = rt.block_on(incoming.next()).unwrap().unwrap();
rt.spawn(async move {
incoming.next().await;
});

rt.block_on(async move {
let mut buf = vec![0; msg.len()];
substream.read_exact(&mut buf).await?;
assert_eq!(buf, msg);

// Close the substream and then try to write to it
substream.close().await?;

let result = substream.write_all(b"ignored message").await;
match result {
Ok(()) => panic!("Write should have failed"),
Err(e) => assert_eq!(e.kind(), io::ErrorKind::WriteZero),
}

io::Result::Ok(())
})?;

Ok(())
}

#[test]
fn send_big_message() -> io::Result<()> {
let rt = Runtime::new().unwrap();
#[allow(non_upper_case_globals)]
static MiB: usize = 1 << 20;
static MSG_LEN: usize = 16 * MiB;

let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());

let dialer = Yamux::new(dialer, Mode::Client);
let mut dialer_control = dialer.get_yamux_control();
// The incoming stream must be polled for the control to work
rt.spawn(async move {
dialer.incoming().next().await;
});

rt.spawn(async move {
let mut substream = dialer_control.open_stream().await.unwrap();

let msg = vec![0x55u8; MSG_LEN];
substream.write_all(msg.as_slice()).await.unwrap();

let mut buf = vec![0u8; MSG_LEN];
substream.read_exact(&mut buf).await.unwrap();
substream.close().await.unwrap();

assert_eq!(buf.len(), MSG_LEN);
assert_eq!(buf, vec![0xAAu8; MSG_LEN]);
});

let mut incoming = Yamux::new(listener, Mode::Server).incoming();
let mut substream = rt.block_on(incoming.next()).unwrap().unwrap();
rt.spawn(async move {
incoming.next().await;
});

rt.block_on(async move {
let mut buf = vec![0u8; MSG_LEN];
substream.read_exact(&mut buf).await?;
assert_eq!(buf, vec![0x55u8; MSG_LEN]);

let msg = vec![0xAAu8; MSG_LEN];
substream.write_all(msg.as_slice()).await?;
substream.close().await?;

io::Result::Ok(())
})?;

Ok(())
}
}
11 changes: 4 additions & 7 deletions comms/src/noise/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use crate::{
},
types::{CommsPublicKey, CommsSecretKey},
};
use futures::{AsyncRead, AsyncWrite};
use snow::{self, params::NoiseParams, Keypair};
use tari_utilities::ByteArray;
use tokio::io::{AsyncRead, AsyncWrite};

pub(super) const NOISE_IX_PARAMETER: &str = "Noise_IX_25519_ChaChaPoly_BLAKE2b";

Expand Down Expand Up @@ -95,13 +95,10 @@ impl NoiseConfig {
mod test {
use super::*;
use crate::{consts::COMMS_RNG, test_utils::tcp::build_connected_tcp_socket_pair};
use futures::future;
use futures::{future, AsyncReadExt, AsyncWriteExt};
use snow::params::{BaseChoice, CipherChoice, DHChoice, HandshakePattern, HashChoice};
use tari_crypto::keys::PublicKey;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
runtime::Runtime,
};
use tokio::runtime::Runtime;

fn check_noise_params(config: &NoiseConfig) {
assert_eq!(config.parameters.hash, HashChoice::Blake2b);
Expand Down Expand Up @@ -156,7 +153,7 @@ mod test {
let sample = b"Children of time";
socket_in.write_all(sample).await.unwrap();
socket_in.flush().await.unwrap();
socket_in.shutdown().await.unwrap();
socket_in.close().await.unwrap();

let mut read_buf = Vec::with_capacity(16);
socket_out.read_to_end(&mut read_buf).await.unwrap();
Expand Down
37 changes: 19 additions & 18 deletions comms/src/noise/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
// use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use futures::{io::Error, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

const LOG_TARGET: &str = "comms::noise::socket";

Expand Down Expand Up @@ -485,16 +486,16 @@ where TSocket: AsyncWrite + Unpin
impl<TSocket> AsyncWrite for NoiseSocket<TSocket>
where TSocket: AsyncWrite + Unpin
{
fn poll_write(self: Pin<&mut Self>, context: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.get_mut().poll_write(context, buf)
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
self.get_mut().poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, context: &mut Context) -> Poll<io::Result<()>> {
self.get_mut().poll_flush(context)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.get_mut().poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.socket).poll_shutdown(context)
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.socket).poll_close(cx)
}
}

Expand Down Expand Up @@ -619,17 +620,17 @@ impl From<TransportState> for NoiseState {
#[cfg(test)]
mod test {
use super::*;
use crate::{noise::config::NOISE_IX_PARAMETER, test_utils::tcp::build_connected_tcp_socket_pair};
use crate::{
noise::config::NOISE_IX_PARAMETER,
test_utils::tcp::build_connected_tcp_socket_pair,
transports::TcpSocket,
};
use futures::future::join;
use snow::{params::NoiseParams, Builder, Error, Keypair};
use std::io;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
runtime::Runtime,
};
use tokio::runtime::Runtime;

async fn build_test_connection() -> Result<((Keypair, Handshake<TcpStream>), (Keypair, Handshake<TcpStream>)), Error>
async fn build_test_connection() -> Result<((Keypair, Handshake<TcpSocket>), (Keypair, Handshake<TcpSocket>)), Error>
{
let parameters: NoiseParams = NOISE_IX_PARAMETER.parse().expect("Invalid protocol name");

Expand All @@ -656,9 +657,9 @@ mod test {
}

async fn perform_handshake(
dialer: Handshake<TcpStream>,
listener: Handshake<TcpStream>,
) -> io::Result<(NoiseSocket<TcpStream>, NoiseSocket<TcpStream>)>
dialer: Handshake<TcpSocket>,
listener: Handshake<TcpSocket>,
) -> io::Result<(NoiseSocket<TcpSocket>, NoiseSocket<TcpSocket>)>
{
let (dialer_result, listener_result) = join(dialer.handshake_1rt(), listener.handshake_1rt()).await;

Expand Down Expand Up @@ -691,7 +692,7 @@ mod test {
dialer_socket.write_all(b" ").await?;
dialer_socket.write_all(b"archive").await?;
dialer_socket.flush().await?;
dialer_socket.shutdown().await?;
dialer_socket.close().await?;

let mut buf = Vec::new();
listener_socket.read_to_end(&mut buf).await?;
Expand Down
Loading

0 comments on commit e418555

Please sign in to comment.