diff --git a/.circleci/config.yml b/.circleci/config.yml
index 5f31bb671b..6ccd535dd8 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -1,4 +1,4 @@
-version: 2
+version: 2.1
defaults:
rust_image: &rust_image quay.io/tarilabs/rust_tari-build-with-deps:nightly-2019-10-04
@@ -9,6 +9,10 @@ jobs:
- image: *rust_image
steps:
- checkout
+ - run:
+ command: |
+ git submodule update --init --recursive
+ name: Init git submodule
- run:
name: RFC documentation
command: |
@@ -25,6 +29,10 @@ jobs:
- image: quay.io/tarilabs/git-ssh-client:0.2-alpine
steps:
- checkout
+ - run:
+ command: |
+ git submodule update --init --recursive
+ name: Init git submodule
- attach_workspace:
at: .
- add_ssh_keys:
@@ -68,6 +76,10 @@ jobs:
resource_class: medium
steps:
- checkout
+ - run:
+ command: |
+ git submodule update --init --recursive
+ name: Init git submodule
- run:
name: Tari source code
command: |
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000000..6a8eb5b5d0
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "comms/yamux"]
+ path = comms/yamux
+ url = https://github.com/tari-project/yamux.git
+ branch = futures-alpha
diff --git a/README.md b/README.md
index fe73d15e51..a7d5586390 100644
--- a/README.md
+++ b/README.md
@@ -35,6 +35,20 @@ to generate the documentation. The generated html sits in `target/doc/`. Alterna
See [RFC-0110/CodeStructure](./RFC/src/RFC-0010_CodeStructure.md) for details on the code structure and layout.
+### Git submodules
+
+Git submodules are use temporarily until some dependent libraries are stabilized and released as crates.
+When checking out code take the following steps to ensure submodules are up to date.
+
+```shell script
+# Initialize submodules
+git submodule init
+# Sets `git pull` to automatically pull submodules
+git config submodule.recurse true
+# Checkout/update all submodules
+git submodule update --recursive --remote
+```
+
## Conversation channels
[](https://t.me/tarilab) Non-technical discussions and gentle sparring.
diff --git a/base_layer/p2p/tests/ping_pong/mod.rs b/base_layer/p2p/tests/ping_pong/mod.rs
deleted file mode 100644
index e06ceaf305..0000000000
--- a/base_layer/p2p/tests/ping_pong/mod.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2019 The Tari Project
-//
-// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
-// following conditions are met:
-//
-// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
-// disclaimer.
-//
-// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
-// following disclaimer in the documentation and/or other materials provided with the distribution.
-//
-// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
-// products derived from this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
-// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
-// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-// NOTE: This test uses ports 11111 and 11112
-
-use crate::support::{assert_change, random_string};
-use futures::executor::ThreadPool;
-use rand::rngs::OsRng;
-use std::{sync::Arc, time::Duration};
-use tari_comms::{
- builder::CommsNode,
- connection::NetAddress,
- connection_manager::PeerConnectionConfig,
- control_service::ControlServiceConfig,
- peer_manager::{peer_storage::PeerStorage, NodeIdentity, Peer},
- types::CommsDatabase,
- CommsBuilder,
-};
-use tari_p2p::{
- ping_pong::{PingPongService, PingPongServiceApi},
- sync_services::{ServiceExecutor, ServiceRegistry},
- tari_message::TariMessageType,
-};
-use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper};
-use tempdir::TempDir;
-
-fn new_node_identity(control_service_address: NetAddress) -> NodeIdentity {
- NodeIdentity::random(&mut OsRng::new().unwrap(), control_service_address).unwrap()
-}
-
-fn create_peer_storage(tmpdir: &TempDir, database_name: &str, peers: Vec) -> CommsDatabase {
- let datastore = LMDBBuilder::new()
- .set_path(tmpdir.path().to_str().unwrap())
- .set_environment_size(10)
- .set_max_number_of_databases(1)
- .add_database(database_name, lmdb_zero::db::CREATE)
- .build()
- .unwrap();
-
- let peer_database = datastore.get_handle(database_name).unwrap();
- let peer_database = LMDBWrapper::new(Arc::new(peer_database));
- let mut storage = PeerStorage::new(peer_database).unwrap();
- for peer in peers {
- storage.add_peer(peer).unwrap();
- }
-
- storage.into_datastore()
-}
-
-fn setup_ping_pong_service(
- node_identity: NodeIdentity,
- peer_storage: CommsDatabase,
-) -> (ServiceExecutor, Arc, CommsNode)
-{
- let ping_pong = PingPongService::new();
- let pingpong_api = ping_pong.get_api();
-
- let services = ServiceRegistry::new().register(ping_pong);
- let comms = CommsBuilder::new()
- .with_node_identity(node_identity.clone())
- .with_peer_storage(peer_storage)
- .configure_peer_connections(PeerConnectionConfig {
- host: "127.0.0.1".parse().unwrap(),
- ..Default::default()
- })
- .configure_control_service(ControlServiceConfig {
- socks_proxy_address: None,
- listener_address: node_identity.control_service_address().unwrap(),
- requested_connection_timeout: Duration::from_millis(5000),
- })
- .build()
- .unwrap()
- .start()
- .unwrap();
-
- (ServiceExecutor::execute(&comms, services), pingpong_api, comms)
-}
-
-#[test]
-#[allow(non_snake_case)]
-fn end_to_end() {
- let node_A_tmpdir = TempDir::new(random_string(8).as_str()).unwrap();
-
- let node_B_tmpdir = TempDir::new(random_string(8).as_str()).unwrap();
-
- let node_A_identity = new_node_identity("127.0.0.1:11111".parse().unwrap());
- let node_B_identity = new_node_identity("127.0.0.1:11112".parse().unwrap());
-
- let (node_A_services, node_A_pingpong, mut comms_A) = setup_ping_pong_service(
- node_A_identity.clone(),
- create_peer_storage(&node_A_tmpdir, "node_A", vec![node_B_identity.clone().into()]),
- );
-
- let (node_B_services, node_B_pingpong, mut comms_B) = setup_ping_pong_service(
- node_B_identity.clone(),
- create_peer_storage(&node_B_tmpdir, "node_B", vec![node_A_identity.clone().into()]),
- );
-
- let mut thread_pool = ThreadPool::new().unwrap();
- comms_A.spawn_tasks(&mut thread_pool);
- comms_B.spawn_tasks(&mut thread_pool);
-
- // Ping node B
- node_A_pingpong
- .ping(node_B_identity.identity.public_key.clone())
- .unwrap();
-
- assert_change(|| node_B_pingpong.ping_count().unwrap(), 1, 20);
- assert_change(|| node_A_pingpong.pong_count().unwrap(), 1, 20);
-
- // Ping node A
- node_B_pingpong
- .ping(node_A_identity.identity.public_key.clone())
- .unwrap();
-
- assert_change(|| node_B_pingpong.pong_count().unwrap(), 1, 20);
- assert_change(|| node_A_pingpong.ping_count().unwrap(), 1, 20);
-
- node_A_services.shutdown().unwrap();
- node_B_services.shutdown().unwrap();
-}
diff --git a/base_layer/p2p/tests/services/liveness.rs b/base_layer/p2p/tests/services/liveness.rs
index 7f81fdf8dc..142b78407d 100644
--- a/base_layer/p2p/tests/services/liveness.rs
+++ b/base_layer/p2p/tests/services/liveness.rs
@@ -44,7 +44,7 @@ pub fn setup_liveness_service(
node_identity: NodeIdentity,
peers: Vec,
data_path: &str,
-) -> (LivenessHandle, Arc, Dht)
+) -> (LivenessHandle, CommsNode, Dht)
{
let (publisher, subscription_factory) = pubsub_connector(runtime.executor(), 100);
let subscription_factory = Arc::new(subscription_factory);
@@ -97,60 +97,49 @@ fn end_to_end() {
.unwrap();
let alice_temp_dir = TempDir::new(string(8).as_str()).unwrap();
- let (mut liveness1, _comms_1, _dht_1) = setup_liveness_service(
+ let (mut liveness1, comms_1, dht_1) = setup_liveness_service(
&runtime,
node_1_identity.clone(),
vec![node_2_identity.clone()],
alice_temp_dir.path().to_str().unwrap(),
);
let bob_temp_dir = TempDir::new(string(8).as_str()).unwrap();
- let (mut liveness2, _comms_2, _dht_2) = setup_liveness_service(
+ let (mut liveness2, comms_2, dht_2) = setup_liveness_service(
&runtime,
node_2_identity.clone(),
vec![node_1_identity.clone()],
bob_temp_dir.path().to_str().unwrap(),
);
- let mut pingpong1_total = (0, 0);
- let mut pingpong2_total = (0, 0);
-
for _ in 0..5 {
let _ = runtime
.block_on(liveness2.send_ping(node_1_identity.node_id().clone()))
.unwrap();
- pingpong1_total = (pingpong1_total.0 + 1, pingpong1_total.1);
- pingpong2_total = (pingpong2_total.0, pingpong2_total.1 + 1);
}
for _ in 0..4 {
let _ = runtime
.block_on(liveness1.send_ping(node_2_identity.node_id().clone()))
.unwrap();
- pingpong2_total = (pingpong2_total.0 + 1, pingpong2_total.1);
- pingpong1_total = (pingpong1_total.0, pingpong1_total.1 + 1);
}
for _ in 0..5 {
let _ = runtime
.block_on(liveness2.send_ping(node_1_identity.node_id().clone()))
.unwrap();
- pingpong1_total = (pingpong1_total.0 + 1, pingpong1_total.1);
- pingpong2_total = (pingpong2_total.0, pingpong2_total.1 + 1);
}
for _ in 0..4 {
let _ = runtime
.block_on(liveness1.send_ping(node_2_identity.node_id().clone()))
.unwrap();
- pingpong2_total = (pingpong2_total.0 + 1, pingpong2_total.1);
- pingpong1_total = (pingpong1_total.0, pingpong1_total.1 + 1);
}
let events = collect_stream!(
runtime,
liveness1.get_event_stream_fused(),
take = 18,
- timeout = Duration::from_secs(10),
+ timeout = Duration::from_secs(20),
);
let ping_count = events
@@ -209,4 +198,11 @@ fn end_to_end() {
assert_eq!(pongcount1, 8);
assert_eq!(pingcount2, 8);
assert_eq!(pongcount2, 10);
+
+ drop(dht_1);
+ drop(dht_2);
+ comms_1.shutdown().unwrap();
+ comms_2.shutdown().unwrap();
+
+ runtime.shutdown_on_idle();
}
diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs
index 9bb49741f3..ec0f50f699 100644
--- a/base_layer/p2p/tests/support/comms_and_services.rs
+++ b/base_layer/p2p/tests/support/comms_and_services.rs
@@ -39,14 +39,12 @@ pub fn setup_comms_services(
peers: Vec,
publisher: InboundDomainConnector,
data_path: &str,
-) -> (Arc, Dht)
+) -> (CommsNode, Dht)
where
TSink: Sink> + Clone + Unpin + Send + Sync + 'static,
TSink::Error: Error + Send + Sync,
{
- let (comms, dht) = initialize_local_test_comms(executor, node_identity, publisher, data_path)
- .map(|(comms, dht)| (Arc::new(comms), dht))
- .unwrap();
+ let (comms, dht) = initialize_local_test_comms(executor, node_identity, publisher, data_path).unwrap();
for p in peers {
let addr = p.control_service_address().clone();
diff --git a/comms/Cargo.toml b/comms/Cargo.toml
index f28444e7ef..9794a235f1 100644
--- a/comms/Cargo.toml
+++ b/comms/Cargo.toml
@@ -35,6 +35,7 @@ time = "0.1.42"
tokio = "0.2.0-alpha.6"
tokio-executor = { version ="^0.2.0-alpha.6", features = ["threadpool"] }
ttl_cache = "0.5.1"
+yamux = {path="./yamux"}
zmq = "0.9.2"
[dev-dependencies]
diff --git a/comms/src/lib.rs b/comms/src/lib.rs
index 4740a612f2..c3c71e3fc3 100644
--- a/comms/src/lib.rs
+++ b/comms/src/lib.rs
@@ -27,6 +27,7 @@ pub mod connection_manager;
mod consts;
pub mod control_service;
pub mod inbound_message_service;
+mod multiplexing;
mod noise;
pub mod outbound_message_service;
pub mod peer_manager;
diff --git a/comms/src/multiplexing/mod.rs b/comms/src/multiplexing/mod.rs
new file mode 100644
index 0000000000..65a8a80488
--- /dev/null
+++ b/comms/src/multiplexing/mod.rs
@@ -0,0 +1,23 @@
+// Copyright 2019, The Tari Project
+//
+// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
+// following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
+// following disclaimer in the documentation and/or other materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
+// products derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+pub mod yamux;
diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs
new file mode 100644
index 0000000000..696bb17eee
--- /dev/null
+++ b/comms/src/multiplexing/yamux.rs
@@ -0,0 +1,231 @@
+// Copyright 2019, The Tari Project
+//
+// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
+// following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
+// following disclaimer in the documentation and/or other materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
+// products derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+use crate::connection::Direction;
+use futures::{
+ io::{AsyncRead, AsyncWrite},
+ stream::BoxStream,
+ StreamExt,
+};
+use std::{fmt::Debug, io};
+use yamux::Mode;
+
+pub type IncomingSubstream<'a> = BoxStream<'a, Result>;
+
+#[derive(Debug)]
+pub struct Yamux {
+ inner: yamux::Connection,
+}
+
+const MAX_BUFFER_SIZE: u32 = 8 * 1024 * 1024; // 8MB
+const RECEIVE_WINDOW: u32 = 4 * 1024 * 1024; // 4MB
+
+impl Yamux
+where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static
+{
+ pub fn new(socket: TSocket, mode: Mode) -> Self {
+ let mut config = yamux::Config::default();
+ // Use OnRead mode instead of OnReceive mode to provide back pressure to the sending side.
+ // Caveat: the OnRead mode has the risk of deadlock, where both sides send data larger than
+ // receive window and don't read before finishing writes.
+ // This should never happen as the window size should be large enough for all protocol messages.
+ config.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
+ // Because OnRead mode increases the RTT of window update, bigger buffer size and receive
+ // window size perform better.
+ config.set_max_buffer_size(MAX_BUFFER_SIZE as usize);
+ config.set_receive_window(RECEIVE_WINDOW);
+
+ Self {
+ inner: yamux::Connection::new(socket, config, mode),
+ }
+ }
+
+ /// Upgrade the underlying socket to use yamux
+ pub async fn upgrade_connection(socket: TSocket, direction: Direction) -> io::Result {
+ let mode = match direction {
+ Direction::Inbound => Mode::Server,
+ Direction::Outbound => Mode::Client,
+ };
+
+ Ok(Self::new(socket, mode))
+ }
+
+ /// Get the yamux control struct
+ pub fn get_yamux_control(&self) -> yamux::Control {
+ self.inner.control()
+ }
+
+ /// Returns a `Stream` emitting substreams initiated by the remote
+ pub fn incoming(self) -> IncomingSubstream<'static> {
+ yamux::into_stream(self.inner).boxed()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use crate::{
+ multiplexing::yamux::{Mode, Yamux},
+ test_utils::tcp::build_connected_tcp_socket_pair,
+ };
+ use futures::{
+ future,
+ io::{AsyncReadExt, AsyncWriteExt},
+ StreamExt,
+ };
+ use std::io;
+ use tokio::runtime::Runtime;
+
+ #[test]
+ fn open_substream() -> io::Result<()> {
+ let rt = Runtime::new().unwrap();
+ let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());
+ let msg = b"The Way of Kings";
+
+ let dialer = Yamux::new(dialer, Mode::Client);
+ let mut dialer_control = dialer.get_yamux_control();
+ // The incoming stream must be polled for the control to work
+ rt.spawn(async move {
+ dialer.incoming().next().await;
+ });
+
+ rt.spawn(async move {
+ let mut substream = dialer_control.open_stream().await.unwrap();
+
+ substream.write_all(msg).await.unwrap();
+ substream.flush().await.unwrap();
+ substream.close().await.unwrap();
+ });
+
+ let mut listener = Yamux::new(listener, Mode::Server).incoming();
+ let mut substream = rt
+ .block_on(listener.next())
+ .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no substream"))?
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+
+ let mut buf = Vec::new();
+ let _ = rt.block_on(future::select(substream.read_to_end(&mut buf), listener.next()));
+ assert_eq!(buf, msg);
+
+ Ok(())
+ }
+
+ #[test]
+ fn close() -> io::Result<()> {
+ let rt = Runtime::new().unwrap();
+ let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());
+ let msg = b"Words of Radiance";
+
+ let dialer = Yamux::new(dialer, Mode::Client);
+ let mut dialer_control = dialer.get_yamux_control();
+ // The incoming stream must be polled for the control to work
+ rt.spawn(async move {
+ dialer.incoming().next().await;
+ });
+
+ rt.spawn(async move {
+ let mut substream = dialer_control.open_stream().await.unwrap();
+
+ substream.write_all(msg).await.unwrap();
+ substream.flush().await.unwrap();
+
+ let mut buf = Vec::new();
+ substream.read_to_end(&mut buf).await.unwrap();
+ assert_eq!(buf, b"");
+ });
+
+ let mut incoming = Yamux::new(listener, Mode::Server).incoming();
+ let mut substream = rt.block_on(incoming.next()).unwrap().unwrap();
+ rt.spawn(async move {
+ incoming.next().await;
+ });
+
+ rt.block_on(async move {
+ let mut buf = vec![0; msg.len()];
+ substream.read_exact(&mut buf).await?;
+ assert_eq!(buf, msg);
+
+ // Close the substream and then try to write to it
+ substream.close().await?;
+
+ let result = substream.write_all(b"ignored message").await;
+ match result {
+ Ok(()) => panic!("Write should have failed"),
+ Err(e) => assert_eq!(e.kind(), io::ErrorKind::WriteZero),
+ }
+
+ io::Result::Ok(())
+ })?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn send_big_message() -> io::Result<()> {
+ let rt = Runtime::new().unwrap();
+ #[allow(non_upper_case_globals)]
+ static MiB: usize = 1 << 20;
+ static MSG_LEN: usize = 16 * MiB;
+
+ let (dialer, listener) = rt.block_on(build_connected_tcp_socket_pair());
+
+ let dialer = Yamux::new(dialer, Mode::Client);
+ let mut dialer_control = dialer.get_yamux_control();
+ // The incoming stream must be polled for the control to work
+ rt.spawn(async move {
+ dialer.incoming().next().await;
+ });
+
+ rt.spawn(async move {
+ let mut substream = dialer_control.open_stream().await.unwrap();
+
+ let msg = vec![0x55u8; MSG_LEN];
+ substream.write_all(msg.as_slice()).await.unwrap();
+
+ let mut buf = vec![0u8; MSG_LEN];
+ substream.read_exact(&mut buf).await.unwrap();
+ substream.close().await.unwrap();
+
+ assert_eq!(buf.len(), MSG_LEN);
+ assert_eq!(buf, vec![0xAAu8; MSG_LEN]);
+ });
+
+ let mut incoming = Yamux::new(listener, Mode::Server).incoming();
+ let mut substream = rt.block_on(incoming.next()).unwrap().unwrap();
+ rt.spawn(async move {
+ incoming.next().await;
+ });
+
+ rt.block_on(async move {
+ let mut buf = vec![0u8; MSG_LEN];
+ substream.read_exact(&mut buf).await?;
+ assert_eq!(buf, vec![0x55u8; MSG_LEN]);
+
+ let msg = vec![0xAAu8; MSG_LEN];
+ substream.write_all(msg.as_slice()).await?;
+ substream.close().await?;
+
+ io::Result::Ok(())
+ })?;
+
+ Ok(())
+ }
+}
diff --git a/comms/src/noise/config.rs b/comms/src/noise/config.rs
index 32e1608e20..380c580757 100644
--- a/comms/src/noise/config.rs
+++ b/comms/src/noise/config.rs
@@ -31,9 +31,9 @@ use crate::{
},
types::{CommsPublicKey, CommsSecretKey},
};
+use futures::{AsyncRead, AsyncWrite};
use snow::{self, params::NoiseParams, Keypair};
use tari_utilities::ByteArray;
-use tokio::io::{AsyncRead, AsyncWrite};
pub(super) const NOISE_IX_PARAMETER: &str = "Noise_IX_25519_ChaChaPoly_BLAKE2b";
@@ -95,13 +95,10 @@ impl NoiseConfig {
mod test {
use super::*;
use crate::{consts::COMMS_RNG, test_utils::tcp::build_connected_tcp_socket_pair};
- use futures::future;
+ use futures::{future, AsyncReadExt, AsyncWriteExt};
use snow::params::{BaseChoice, CipherChoice, DHChoice, HandshakePattern, HashChoice};
use tari_crypto::keys::PublicKey;
- use tokio::{
- io::{AsyncReadExt, AsyncWriteExt},
- runtime::Runtime,
- };
+ use tokio::runtime::Runtime;
fn check_noise_params(config: &NoiseConfig) {
assert_eq!(config.parameters.hash, HashChoice::Blake2b);
@@ -156,7 +153,7 @@ mod test {
let sample = b"Children of time";
socket_in.write_all(sample).await.unwrap();
socket_in.flush().await.unwrap();
- socket_in.shutdown().await.unwrap();
+ socket_in.close().await.unwrap();
let mut read_buf = Vec::with_capacity(16);
socket_out.read_to_end(&mut read_buf).await.unwrap();
diff --git a/comms/src/noise/socket.rs b/comms/src/noise/socket.rs
index 3f9eff15ad..04f1780fc6 100644
--- a/comms/src/noise/socket.rs
+++ b/comms/src/noise/socket.rs
@@ -35,7 +35,8 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+// use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures::{io::Error, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const LOG_TARGET: &str = "comms::noise::socket";
@@ -485,16 +486,16 @@ where TSocket: AsyncWrite + Unpin
impl AsyncWrite for NoiseSocket
where TSocket: AsyncWrite + Unpin
{
- fn poll_write(self: Pin<&mut Self>, context: &mut Context, buf: &[u8]) -> Poll> {
- self.get_mut().poll_write(context, buf)
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> {
+ self.get_mut().poll_write(cx, buf)
}
- fn poll_flush(self: Pin<&mut Self>, context: &mut Context) -> Poll> {
- self.get_mut().poll_flush(context)
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> {
+ self.get_mut().poll_flush(cx)
}
- fn poll_shutdown(mut self: Pin<&mut Self>, context: &mut Context) -> Poll> {
- Pin::new(&mut self.socket).poll_shutdown(context)
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.socket).poll_close(cx)
}
}
@@ -619,17 +620,17 @@ impl From for NoiseState {
#[cfg(test)]
mod test {
use super::*;
- use crate::{noise::config::NOISE_IX_PARAMETER, test_utils::tcp::build_connected_tcp_socket_pair};
+ use crate::{
+ noise::config::NOISE_IX_PARAMETER,
+ test_utils::tcp::build_connected_tcp_socket_pair,
+ transports::TcpSocket,
+ };
use futures::future::join;
use snow::{params::NoiseParams, Builder, Error, Keypair};
use std::io;
- use tokio::{
- io::{AsyncReadExt, AsyncWriteExt},
- net::TcpStream,
- runtime::Runtime,
- };
+ use tokio::runtime::Runtime;
- async fn build_test_connection() -> Result<((Keypair, Handshake), (Keypair, Handshake)), Error>
+ async fn build_test_connection() -> Result<((Keypair, Handshake), (Keypair, Handshake)), Error>
{
let parameters: NoiseParams = NOISE_IX_PARAMETER.parse().expect("Invalid protocol name");
@@ -656,9 +657,9 @@ mod test {
}
async fn perform_handshake(
- dialer: Handshake,
- listener: Handshake,
- ) -> io::Result<(NoiseSocket, NoiseSocket)>
+ dialer: Handshake,
+ listener: Handshake,
+ ) -> io::Result<(NoiseSocket, NoiseSocket)>
{
let (dialer_result, listener_result) = join(dialer.handshake_1rt(), listener.handshake_1rt()).await;
@@ -691,7 +692,7 @@ mod test {
dialer_socket.write_all(b" ").await?;
dialer_socket.write_all(b"archive").await?;
dialer_socket.flush().await?;
- dialer_socket.shutdown().await?;
+ dialer_socket.close().await?;
let mut buf = Vec::new();
listener_socket.read_to_end(&mut buf).await?;
diff --git a/comms/src/test_utils/tcp.rs b/comms/src/test_utils/tcp.rs
index fd725e234a..1e0e217a8c 100644
--- a/comms/src/test_utils/tcp.rs
+++ b/comms/src/test_utils/tcp.rs
@@ -20,14 +20,14 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+use crate::transports::TcpSocket;
use futures::StreamExt;
-use tari_test_utils::address::get_next_local_address;
use tokio::net::{TcpListener, TcpStream};
-pub async fn build_connected_tcp_socket_pair() -> (TcpStream, TcpStream) {
- let addr = get_next_local_address();
- let listener = TcpListener::bind(&addr).await.unwrap();
- let (in_sock, out_sock) = futures::future::join(listener.incoming().next(), TcpStream::connect(&addr)).await;
+pub async fn build_connected_tcp_socket_pair() -> (TcpSocket, TcpSocket) {
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let local_addr = listener.local_addr().unwrap();
+ let (in_sock, out_sock) = futures::future::join(listener.incoming().next(), TcpStream::connect(&local_addr)).await;
- (out_sock.unwrap(), in_sock.unwrap().unwrap())
+ (out_sock.unwrap().into(), in_sock.unwrap().unwrap().into())
}
diff --git a/comms/src/transports/mod.rs b/comms/src/transports/mod.rs
index c12c5b1379..b4f9b9829f 100644
--- a/comms/src/transports/mod.rs
+++ b/comms/src/transports/mod.rs
@@ -25,6 +25,8 @@ use multiaddr::Multiaddr;
mod tcp;
+pub use tcp::TcpSocket;
+
pub trait Transport {
/// The output of the transport after a connection is established
type Output;
diff --git a/comms/src/transports/tcp.rs b/comms/src/transports/tcp.rs
index 3799963eb2..b7565d0703 100644
--- a/comms/src/transports/tcp.rs
+++ b/comms/src/transports/tcp.rs
@@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use crate::transports::Transport;
-use futures::{ready, stream::BoxStream, Future, Poll, Stream, StreamExt};
+use futures::{io::Error, ready, stream::BoxStream, AsyncRead, AsyncWrite, Future, Poll, Stream, StreamExt};
use multiaddr::{AddrComponent, Multiaddr};
use std::{
io,
@@ -30,7 +30,10 @@ use std::{
task::Context,
time::Duration,
};
-use tokio::net::{TcpListener, TcpStream};
+use tokio::{
+ io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite},
+ net::{TcpListener, TcpStream},
+};
/// Transport implementation for TCP
#[derive(Debug, Clone, Default)]
@@ -92,7 +95,7 @@ impl TcpTransport {
impl Transport for TcpTransport {
type Error = io::Error;
type Inbound = TcpInbound<'static>;
- type Output = (TcpStream, Multiaddr);
+ type Output = (TcpSocket, Multiaddr);
type DialFuture = impl Future