From f8b5574e2b174cd8913cc9b42824742f08432986 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Mon, 2 Dec 2019 15:49:52 +0200 Subject: [PATCH] Added basic yamux connection upgrade Yamux provides multiplexing over a ordered reliable connection (e.g. TCP). More info: https://github.com/hashicorp/yamux/blob/master/spec.md - This is working from a fork of yamux (develop branch), where the upgrade to futures 0.3.x is close to complete - Made a TcpStream wrapper struct which implements futures `AsyncWrite` and `AsyncRead` to reduce the tie in to tokio - _Side note:_ investigated upgrading to futures 0.3.x however there ended up being a few external libraries which are locked to alpha futures. This is WIP for tonic (https://github.com/hyperium/tonic/pull/163) and tower (no PR - we may be able to pretty easily remove this dependency) --- .circleci/config.yml | 14 +- .gitmodules | 4 + README.md | 14 ++ base_layer/p2p/tests/ping_pong/mod.rs | 140 ------------- base_layer/p2p/tests/services/liveness.rs | 2 +- comms/Cargo.toml | 1 + comms/src/lib.rs | 1 + comms/src/multiplexing/mod.rs | 23 +++ comms/src/multiplexing/yamux.rs | 231 ++++++++++++++++++++++ comms/src/noise/config.rs | 11 +- comms/src/noise/socket.rs | 37 ++-- comms/src/test_utils/tcp.rs | 12 +- comms/src/transports/mod.rs | 2 + comms/src/transports/tcp.rs | 53 ++++- comms/yamux | 1 + scripts/update_submodules.sh | 20 ++ 16 files changed, 387 insertions(+), 179 deletions(-) create mode 100644 .gitmodules delete mode 100644 base_layer/p2p/tests/ping_pong/mod.rs create mode 100644 comms/src/multiplexing/mod.rs create mode 100644 comms/src/multiplexing/yamux.rs create mode 160000 comms/yamux create mode 100755 scripts/update_submodules.sh diff --git a/.circleci/config.yml b/.circleci/config.yml index 5f31bb671b2..6ccd535dd8a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,4 +1,4 @@ -version: 2 +version: 2.1 defaults: rust_image: &rust_image quay.io/tarilabs/rust_tari-build-with-deps:nightly-2019-10-04 @@ -9,6 +9,10 @@ jobs: - image: *rust_image steps: - checkout + - run: + command: | + git submodule update --init --recursive + name: Init git submodule - run: name: RFC documentation command: | @@ -25,6 +29,10 @@ jobs: - image: quay.io/tarilabs/git-ssh-client:0.2-alpine steps: - checkout + - run: + command: | + git submodule update --init --recursive + name: Init git submodule - attach_workspace: at: . - add_ssh_keys: @@ -68,6 +76,10 @@ jobs: resource_class: medium steps: - checkout + - run: + command: | + git submodule update --init --recursive + name: Init git submodule - run: name: Tari source code command: | diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000000..6a8eb5b5d0b --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "comms/yamux"] + path = comms/yamux + url = https://github.com/tari-project/yamux.git + branch = futures-alpha diff --git a/README.md b/README.md index fe73d15e51c..a7d5586390f 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,20 @@ to generate the documentation. The generated html sits in `target/doc/`. Alterna See [RFC-0110/CodeStructure](./RFC/src/RFC-0010_CodeStructure.md) for details on the code structure and layout. +### Git submodules + +Git submodules are use temporarily until some dependent libraries are stabilized and released as crates. +When checking out code take the following steps to ensure submodules are up to date. + +```shell script +# Initialize submodules +git submodule init +# Sets `git pull` to automatically pull submodules +git config submodule.recurse true +# Checkout/update all submodules +git submodule update --recursive --remote +``` + ## Conversation channels [](https://t.me/tarilab) Non-technical discussions and gentle sparring. diff --git a/base_layer/p2p/tests/ping_pong/mod.rs b/base_layer/p2p/tests/ping_pong/mod.rs deleted file mode 100644 index e06ceaf3057..00000000000 --- a/base_layer/p2p/tests/ping_pong/mod.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2019 The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -// NOTE: This test uses ports 11111 and 11112 - -use crate::support::{assert_change, random_string}; -use futures::executor::ThreadPool; -use rand::rngs::OsRng; -use std::{sync::Arc, time::Duration}; -use tari_comms::{ - builder::CommsNode, - connection::NetAddress, - connection_manager::PeerConnectionConfig, - control_service::ControlServiceConfig, - peer_manager::{peer_storage::PeerStorage, NodeIdentity, Peer}, - types::CommsDatabase, - CommsBuilder, -}; -use tari_p2p::{ - ping_pong::{PingPongService, PingPongServiceApi}, - sync_services::{ServiceExecutor, ServiceRegistry}, - tari_message::TariMessageType, -}; -use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper}; -use tempdir::TempDir; - -fn new_node_identity(control_service_address: NetAddress) -> NodeIdentity { - NodeIdentity::random(&mut OsRng::new().unwrap(), control_service_address).unwrap() -} - -fn create_peer_storage(tmpdir: &TempDir, database_name: &str, peers: Vec) -> CommsDatabase { - let datastore = LMDBBuilder::new() - .set_path(tmpdir.path().to_str().unwrap()) - .set_environment_size(10) - .set_max_number_of_databases(1) - .add_database(database_name, lmdb_zero::db::CREATE) - .build() - .unwrap(); - - let peer_database = datastore.get_handle(database_name).unwrap(); - let peer_database = LMDBWrapper::new(Arc::new(peer_database)); - let mut storage = PeerStorage::new(peer_database).unwrap(); - for peer in peers { - storage.add_peer(peer).unwrap(); - } - - storage.into_datastore() -} - -fn setup_ping_pong_service( - node_identity: NodeIdentity, - peer_storage: CommsDatabase, -) -> (ServiceExecutor, Arc, CommsNode) -{ - let ping_pong = PingPongService::new(); - let pingpong_api = ping_pong.get_api(); - - let services = ServiceRegistry::new().register(ping_pong); - let comms = CommsBuilder::new() - .with_node_identity(node_identity.clone()) - .with_peer_storage(peer_storage) - .configure_peer_connections(PeerConnectionConfig { - host: "127.0.0.1".parse().unwrap(), - ..Default::default() - }) - .configure_control_service(ControlServiceConfig { - socks_proxy_address: None, - listener_address: node_identity.control_service_address().unwrap(), - requested_connection_timeout: Duration::from_millis(5000), - }) - .build() - .unwrap() - .start() - .unwrap(); - - (ServiceExecutor::execute(&comms, services), pingpong_api, comms) -} - -#[test] -#[allow(non_snake_case)] -fn end_to_end() { - let node_A_tmpdir = TempDir::new(random_string(8).as_str()).unwrap(); - - let node_B_tmpdir = TempDir::new(random_string(8).as_str()).unwrap(); - - let node_A_identity = new_node_identity("127.0.0.1:11111".parse().unwrap()); - let node_B_identity = new_node_identity("127.0.0.1:11112".parse().unwrap()); - - let (node_A_services, node_A_pingpong, mut comms_A) = setup_ping_pong_service( - node_A_identity.clone(), - create_peer_storage(&node_A_tmpdir, "node_A", vec![node_B_identity.clone().into()]), - ); - - let (node_B_services, node_B_pingpong, mut comms_B) = setup_ping_pong_service( - node_B_identity.clone(), - create_peer_storage(&node_B_tmpdir, "node_B", vec![node_A_identity.clone().into()]), - ); - - let mut thread_pool = ThreadPool::new().unwrap(); - comms_A.spawn_tasks(&mut thread_pool); - comms_B.spawn_tasks(&mut thread_pool); - - // Ping node B - node_A_pingpong - .ping(node_B_identity.identity.public_key.clone()) - .unwrap(); - - assert_change(|| node_B_pingpong.ping_count().unwrap(), 1, 20); - assert_change(|| node_A_pingpong.pong_count().unwrap(), 1, 20); - - // Ping node A - node_B_pingpong - .ping(node_A_identity.identity.public_key.clone()) - .unwrap(); - - assert_change(|| node_B_pingpong.pong_count().unwrap(), 1, 20); - assert_change(|| node_A_pingpong.ping_count().unwrap(), 1, 20); - - node_A_services.shutdown().unwrap(); - node_B_services.shutdown().unwrap(); -} diff --git a/base_layer/p2p/tests/services/liveness.rs b/base_layer/p2p/tests/services/liveness.rs index 7f81fdf8dc6..e9e87366a96 100644 --- a/base_layer/p2p/tests/services/liveness.rs +++ b/base_layer/p2p/tests/services/liveness.rs @@ -150,7 +150,7 @@ fn end_to_end() { runtime, liveness1.get_event_stream_fused(), take = 18, - timeout = Duration::from_secs(10), + timeout = Duration::from_secs(20), ); let ping_count = events diff --git a/comms/Cargo.toml b/comms/Cargo.toml index f28444e7ef1..9794a235f12 100644 --- a/comms/Cargo.toml +++ b/comms/Cargo.toml @@ -35,6 +35,7 @@ time = "0.1.42" tokio = "0.2.0-alpha.6" tokio-executor = { version ="^0.2.0-alpha.6", features = ["threadpool"] } ttl_cache = "0.5.1" +yamux = {path="./yamux"} zmq = "0.9.2" [dev-dependencies] diff --git a/comms/src/lib.rs b/comms/src/lib.rs index 4740a612f29..c3c71e3fc34 100644 --- a/comms/src/lib.rs +++ b/comms/src/lib.rs @@ -27,6 +27,7 @@ pub mod connection_manager; mod consts; pub mod control_service; pub mod inbound_message_service; +mod multiplexing; mod noise; pub mod outbound_message_service; pub mod peer_manager; diff --git a/comms/src/multiplexing/mod.rs b/comms/src/multiplexing/mod.rs new file mode 100644 index 00000000000..65a8a804883 --- /dev/null +++ b/comms/src/multiplexing/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +pub mod yamux; diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs new file mode 100644 index 00000000000..696bb17eee2 --- /dev/null +++ b/comms/src/multiplexing/yamux.rs @@ -0,0 +1,231 @@ +// Copyright 2019, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::connection::Direction; +use futures::{ + io::{AsyncRead, AsyncWrite}, + stream::BoxStream, + StreamExt, +}; +use std::{fmt::Debug, io}; +use yamux::Mode; + +pub type IncomingSubstream<'a> = BoxStream<'a, Result>; + +#[derive(Debug)] +pub struct Yamux { + inner: yamux::Connection, +} + +const MAX_BUFFER_SIZE: u32 = 8 * 1024 * 1024; // 8MB +const RECEIVE_WINDOW: u32 = 4 * 1024 * 1024; // 4MB + +impl Yamux +where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static +{ + pub fn new(socket: TSocket, mode: Mode) -> Self { + let mut config = yamux::Config::default(); + // Use OnRead mode instead of OnReceive mode to provide back pressure to the sending side. + // Caveat: the OnRead mode has the risk of deadlock, where both sides send data larger than + // receive window and don't read before finishing writes. + // This should never happen as the window size should be large enough for all protocol messages. + config.set_window_update_mode(yamux::WindowUpdateMode::OnRead); + // Because OnRead mode increases the RTT of window update, bigger buffer size and receive + // window size perform better. + config.set_max_buffer_size(MAX_BUFFER_SIZE as usize); + config.set_receive_window(RECEIVE_WINDOW); + + Self { + inner: yamux::Connection::new(socket, config, mode), + } + } + + /// Upgrade the underlying socket to use yamux + pub async fn upgrade_connection(socket: TSocket, direction: Direction) -> io::Result { + let mode = match direction { + Direction::Inbound => Mode::Server, + Direction::Outbound => Mode::Client, + }; + + Ok(Self::new(socket, mode)) + } + + /// Get the yamux control struct + pub fn get_yamux_control(&self) -> yamux::Control { + self.inner.control() + } + + /// Returns a `Stream` emitting substreams initiated by the remote + pub fn incoming(self) -> IncomingSubstream<'static> { + yamux::into_stream(self.inner).boxed() + } +} + +#[cfg(test)] +mod test { + use crate::{ + multiplexing::yamux::{Mode, Yamux}, + test_utils::tcp::build_connected_tcp_socket_pair, + }; + use futures::{ + future, + io::{AsyncReadExt, AsyncWriteExt}, + StreamExt, + }; + use std::io; + use tokio::runtime::Runtime; + + #[test] + fn open_substream() -> io::Result<()> { + let rt = Runtime::new().unwrap(); + let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair()); + let msg = b"The Way of Kings"; + + let dialer = Yamux::new(dialer, Mode::Client); + let mut dialer_control = dialer.get_yamux_control(); + // The incoming stream must be polled for the control to work + rt.spawn(async move { + dialer.incoming().next().await; + }); + + rt.spawn(async move { + let mut substream = dialer_control.open_stream().await.unwrap(); + + substream.write_all(msg).await.unwrap(); + substream.flush().await.unwrap(); + substream.close().await.unwrap(); + }); + + let mut listener = Yamux::new(listener, Mode::Server).incoming(); + let mut substream = rt + .block_on(listener.next()) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no substream"))? + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + let mut buf = Vec::new(); + let _ = rt.block_on(future::select(substream.read_to_end(&mut buf), listener.next())); + assert_eq!(buf, msg); + + Ok(()) + } + + #[test] + fn close() -> io::Result<()> { + let rt = Runtime::new().unwrap(); + let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair()); + let msg = b"Words of Radiance"; + + let dialer = Yamux::new(dialer, Mode::Client); + let mut dialer_control = dialer.get_yamux_control(); + // The incoming stream must be polled for the control to work + rt.spawn(async move { + dialer.incoming().next().await; + }); + + rt.spawn(async move { + let mut substream = dialer_control.open_stream().await.unwrap(); + + substream.write_all(msg).await.unwrap(); + substream.flush().await.unwrap(); + + let mut buf = Vec::new(); + substream.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b""); + }); + + let mut incoming = Yamux::new(listener, Mode::Server).incoming(); + let mut substream = rt.block_on(incoming.next()).unwrap().unwrap(); + rt.spawn(async move { + incoming.next().await; + }); + + rt.block_on(async move { + let mut buf = vec![0; msg.len()]; + substream.read_exact(&mut buf).await?; + assert_eq!(buf, msg); + + // Close the substream and then try to write to it + substream.close().await?; + + let result = substream.write_all(b"ignored message").await; + match result { + Ok(()) => panic!("Write should have failed"), + Err(e) => assert_eq!(e.kind(), io::ErrorKind::WriteZero), + } + + io::Result::Ok(()) + })?; + + Ok(()) + } + + #[test] + fn send_big_message() -> io::Result<()> { + let rt = Runtime::new().unwrap(); + #[allow(non_upper_case_globals)] + static MiB: usize = 1 << 20; + static MSG_LEN: usize = 16 * MiB; + + let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair()); + + let dialer = Yamux::new(dialer, Mode::Client); + let mut dialer_control = dialer.get_yamux_control(); + // The incoming stream must be polled for the control to work + rt.spawn(async move { + dialer.incoming().next().await; + }); + + rt.spawn(async move { + let mut substream = dialer_control.open_stream().await.unwrap(); + + let msg = vec![0x55u8; MSG_LEN]; + substream.write_all(msg.as_slice()).await.unwrap(); + + let mut buf = vec![0u8; MSG_LEN]; + substream.read_exact(&mut buf).await.unwrap(); + substream.close().await.unwrap(); + + assert_eq!(buf.len(), MSG_LEN); + assert_eq!(buf, vec![0xAAu8; MSG_LEN]); + }); + + let mut incoming = Yamux::new(listener, Mode::Server).incoming(); + let mut substream = rt.block_on(incoming.next()).unwrap().unwrap(); + rt.spawn(async move { + incoming.next().await; + }); + + rt.block_on(async move { + let mut buf = vec![0u8; MSG_LEN]; + substream.read_exact(&mut buf).await?; + assert_eq!(buf, vec![0x55u8; MSG_LEN]); + + let msg = vec![0xAAu8; MSG_LEN]; + substream.write_all(msg.as_slice()).await?; + substream.close().await?; + + io::Result::Ok(()) + })?; + + Ok(()) + } +} diff --git a/comms/src/noise/config.rs b/comms/src/noise/config.rs index 32e1608e209..380c580757e 100644 --- a/comms/src/noise/config.rs +++ b/comms/src/noise/config.rs @@ -31,9 +31,9 @@ use crate::{ }, types::{CommsPublicKey, CommsSecretKey}, }; +use futures::{AsyncRead, AsyncWrite}; use snow::{self, params::NoiseParams, Keypair}; use tari_utilities::ByteArray; -use tokio::io::{AsyncRead, AsyncWrite}; pub(super) const NOISE_IX_PARAMETER: &str = "Noise_IX_25519_ChaChaPoly_BLAKE2b"; @@ -95,13 +95,10 @@ impl NoiseConfig { mod test { use super::*; use crate::{consts::COMMS_RNG, test_utils::tcp::build_connected_tcp_socket_pair}; - use futures::future; + use futures::{future, AsyncReadExt, AsyncWriteExt}; use snow::params::{BaseChoice, CipherChoice, DHChoice, HandshakePattern, HashChoice}; use tari_crypto::keys::PublicKey; - use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - runtime::Runtime, - }; + use tokio::runtime::Runtime; fn check_noise_params(config: &NoiseConfig) { assert_eq!(config.parameters.hash, HashChoice::Blake2b); @@ -156,7 +153,7 @@ mod test { let sample = b"Children of time"; socket_in.write_all(sample).await.unwrap(); socket_in.flush().await.unwrap(); - socket_in.shutdown().await.unwrap(); + socket_in.close().await.unwrap(); let mut read_buf = Vec::with_capacity(16); socket_out.read_to_end(&mut read_buf).await.unwrap(); diff --git a/comms/src/noise/socket.rs b/comms/src/noise/socket.rs index 3f9eff15ad7..04f1780fc61 100644 --- a/comms/src/noise/socket.rs +++ b/comms/src/noise/socket.rs @@ -35,7 +35,8 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +// use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures::{io::Error, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; const LOG_TARGET: &str = "comms::noise::socket"; @@ -485,16 +486,16 @@ where TSocket: AsyncWrite + Unpin impl AsyncWrite for NoiseSocket where TSocket: AsyncWrite + Unpin { - fn poll_write(self: Pin<&mut Self>, context: &mut Context, buf: &[u8]) -> Poll> { - self.get_mut().poll_write(context, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + self.get_mut().poll_write(cx, buf) } - fn poll_flush(self: Pin<&mut Self>, context: &mut Context) -> Poll> { - self.get_mut().poll_flush(context) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.get_mut().poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, context: &mut Context) -> Poll> { - Pin::new(&mut self.socket).poll_shutdown(context) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.socket).poll_close(cx) } } @@ -619,17 +620,17 @@ impl From for NoiseState { #[cfg(test)] mod test { use super::*; - use crate::{noise::config::NOISE_IX_PARAMETER, test_utils::tcp::build_connected_tcp_socket_pair}; + use crate::{ + noise::config::NOISE_IX_PARAMETER, + test_utils::tcp::build_connected_tcp_socket_pair, + transports::TcpSocket, + }; use futures::future::join; use snow::{params::NoiseParams, Builder, Error, Keypair}; use std::io; - use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, - runtime::Runtime, - }; + use tokio::runtime::Runtime; - async fn build_test_connection() -> Result<((Keypair, Handshake), (Keypair, Handshake)), Error> + async fn build_test_connection() -> Result<((Keypair, Handshake), (Keypair, Handshake)), Error> { let parameters: NoiseParams = NOISE_IX_PARAMETER.parse().expect("Invalid protocol name"); @@ -656,9 +657,9 @@ mod test { } async fn perform_handshake( - dialer: Handshake, - listener: Handshake, - ) -> io::Result<(NoiseSocket, NoiseSocket)> + dialer: Handshake, + listener: Handshake, + ) -> io::Result<(NoiseSocket, NoiseSocket)> { let (dialer_result, listener_result) = join(dialer.handshake_1rt(), listener.handshake_1rt()).await; @@ -691,7 +692,7 @@ mod test { dialer_socket.write_all(b" ").await?; dialer_socket.write_all(b"archive").await?; dialer_socket.flush().await?; - dialer_socket.shutdown().await?; + dialer_socket.close().await?; let mut buf = Vec::new(); listener_socket.read_to_end(&mut buf).await?; diff --git a/comms/src/test_utils/tcp.rs b/comms/src/test_utils/tcp.rs index fd725e234a0..1e0e217a8ca 100644 --- a/comms/src/test_utils/tcp.rs +++ b/comms/src/test_utils/tcp.rs @@ -20,14 +20,14 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use crate::transports::TcpSocket; use futures::StreamExt; -use tari_test_utils::address::get_next_local_address; use tokio::net::{TcpListener, TcpStream}; -pub async fn build_connected_tcp_socket_pair() -> (TcpStream, TcpStream) { - let addr = get_next_local_address(); - let listener = TcpListener::bind(&addr).await.unwrap(); - let (in_sock, out_sock) = futures::future::join(listener.incoming().next(), TcpStream::connect(&addr)).await; +pub async fn build_connected_tcp_socket_pair() -> (TcpSocket, TcpSocket) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let (in_sock, out_sock) = futures::future::join(listener.incoming().next(), TcpStream::connect(&local_addr)).await; - (out_sock.unwrap(), in_sock.unwrap().unwrap()) + (out_sock.unwrap().into(), in_sock.unwrap().unwrap().into()) } diff --git a/comms/src/transports/mod.rs b/comms/src/transports/mod.rs index c12c5b13795..b4f9b9829f6 100644 --- a/comms/src/transports/mod.rs +++ b/comms/src/transports/mod.rs @@ -25,6 +25,8 @@ use multiaddr::Multiaddr; mod tcp; +pub use tcp::TcpSocket; + pub trait Transport { /// The output of the transport after a connection is established type Output; diff --git a/comms/src/transports/tcp.rs b/comms/src/transports/tcp.rs index 3799963eb24..b7565d07038 100644 --- a/comms/src/transports/tcp.rs +++ b/comms/src/transports/tcp.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::transports::Transport; -use futures::{ready, stream::BoxStream, Future, Poll, Stream, StreamExt}; +use futures::{io::Error, ready, stream::BoxStream, AsyncRead, AsyncWrite, Future, Poll, Stream, StreamExt}; use multiaddr::{AddrComponent, Multiaddr}; use std::{ io, @@ -30,7 +30,10 @@ use std::{ task::Context, time::Duration, }; -use tokio::net::{TcpListener, TcpStream}; +use tokio::{ + io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite}, + net::{TcpListener, TcpStream}, +}; /// Transport implementation for TCP #[derive(Debug, Clone, Default)] @@ -92,7 +95,7 @@ impl TcpTransport { impl Transport for TcpTransport { type Error = io::Error; type Inbound = TcpInbound<'static>; - type Output = (TcpStream, Multiaddr); + type Output = (TcpSocket, Multiaddr); type DialFuture = impl Future>; type ListenFuture = impl Future>; @@ -120,7 +123,7 @@ impl Transport for TcpTransport { let stream = TcpStream::connect(&socket_addr).await?; config.configure(&stream)?; let peer_addr = socketaddr_to_multiaddr(stream.peer_addr()?); - Ok((stream, peer_addr)) + Ok((TcpSocket::new(stream), peer_addr)) }) } } @@ -133,7 +136,7 @@ pub struct TcpInbound<'a> { } impl Stream for TcpInbound<'_> { - type Item = io::Result<(TcpStream, Multiaddr)>; + type Item = io::Result<(TcpSocket, Multiaddr)>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match ready!(self.incoming.poll_next_unpin(cx)) { @@ -141,7 +144,7 @@ impl Stream for TcpInbound<'_> { // Configure each socket self.config.configure(&stream)?; let peer_addr = socketaddr_to_multiaddr(stream.peer_addr()?); - Poll::Ready(Some(Ok((stream, peer_addr)))) + Poll::Ready(Some(Ok((TcpSocket::new(stream), peer_addr)))) }, Some(Err(err)) => Poll::Ready(Some(Err(err))), None => Poll::Ready(None), @@ -149,6 +152,44 @@ impl Stream for TcpInbound<'_> { } } +/// TcpSocket is a wrapper struct for tokio `TcpStream` and implements +/// `futures-rs` AsyncRead/Write +pub struct TcpSocket { + inner: TcpStream, +} + +impl TcpSocket { + pub fn new(stream: TcpStream) -> Self { + Self { inner: stream } + } +} + +impl AsyncWrite for TcpSocket { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +impl AsyncRead for TcpSocket { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl From for TcpSocket { + fn from(stream: TcpStream) -> Self { + Self { inner: stream } + } +} + /// Convert a socket address to a multiaddress fn socketaddr_to_multiaddr(socket_addr: SocketAddr) -> Multiaddr { let mut addr: Multiaddr = match socket_addr.ip() { diff --git a/comms/yamux b/comms/yamux new file mode 160000 index 00000000000..9d045557e62 --- /dev/null +++ b/comms/yamux @@ -0,0 +1 @@ +Subproject commit 9d045557e622db989d12ef2e0d81c1bdc5cd8119 diff --git a/scripts/update_submodules.sh b/scripts/update_submodules.sh new file mode 100755 index 00000000000..5755026c37f --- /dev/null +++ b/scripts/update_submodules.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +set -e + +echo "Synching all submodules" +git submodule update --recursive --remote + +SUBMODULES=( + "comms/yamux" +) +GIT_URLS=( + "git@github.com:tari-project/yamux.git" +) + +# Change all submodule urls to use ssh +for index in "${!SUBMODULES[@]}"; do + pushd "${SUBMODULES[$index]}" > /dev/null + git remote set-url origin "${GIT_URLS[$index]}" + popd > /dev/null +done