Skip to content

Commit

Permalink
Merge pull request #83 from twittner/tokio
Browse files Browse the repository at this point in the history
Use tokio in tests.
  • Loading branch information
twittner authored Jun 13, 2020
2 parents d60fca8 + 96246ce commit 33bbe5a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ static_assertions = "1"

[dev-dependencies]
anyhow = "1"
async-std = "1"
criterion = "0.3"
futures = "0.3.4"
quickcheck = "0.9"
tokio = { version = "0.2", features = ["tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3", features = ["compat"] }

[[bench]]
name = "concurrent"
Expand Down
8 changes: 5 additions & 3 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::task;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{channel::mpsc, future, prelude::*, ready};
use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll}};
use tokio::{runtime::Runtime, task};
use yamux::{Config, Connection, Mode};

criterion_group!(benches, concurrent);
Expand Down Expand Up @@ -52,16 +52,18 @@ fn concurrent(c: &mut Criterion) {

c.bench_function_over_inputs("one by one", move |b, &&params| {
let data = data1.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
task::block_on(roundtrip(params.streams, params.messages, data.clone(), false))
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), false))
})
},
params);

c.bench_function_over_inputs("all at once", move |b, &&params| {
let data = data2.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
task::block_on(roundtrip(params.streams, params.messages, data.clone(), true))
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), true))
})
},
params);
Expand Down
2 changes: 1 addition & 1 deletion src/frame/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
#[test]
fn encode_decode_identity() {
fn property(f: Frame<()>) -> bool {
async_std::task::block_on(async move {
futures::executor::block_on(async move {
let id = crate::connection::Id::random();
let mut io = Io::new(id, futures::io::Cursor::new(Vec::new()), f.body.len());
if io.send(&f).await.is_err() {
Expand Down
32 changes: 18 additions & 14 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,35 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::{net::{TcpStream, TcpListener}, task};
use crate::{Config, Connection, ConnectionError, Mode, Control, connection::State};
use futures::{future, prelude::*};
use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};
use rand::Rng;
use std::{fmt::Debug, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}};
use tokio::{net::{TcpStream, TcpListener}, runtime::Runtime, task};
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};

#[test]
fn prop_send_recv() {
fn prop(msgs: Vec<Msg>) -> TestResult {
if msgs.is_empty() {
return TestResult::discard()
}
task::block_on(async move {
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let num_requests = msgs.len();
let iter = msgs.into_iter().map(|m| m.0);

let (listener, address) = bind().await.expect("bind");
let (mut listener, address) = bind().await.expect("bind");

let server = async {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let connection = Connection::new(socket, Config::default(), Mode::Server);
repeat_echo(connection).await.expect("repeat_echo")
};

let client = async {
let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, Config::default(), Mode::Client);
let control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand All @@ -55,19 +57,20 @@ fn prop_max_streams() {
let mut cfg = Config::default();
cfg.set_max_num_streams(max_streams);

task::block_on(async move {
let (listener, address) = bind().await.expect("bind");
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (mut listener, address) = bind().await.expect("bind");

let cfg_s = cfg.clone();
let server = async move {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let connection = Connection::new(socket, cfg_s, Mode::Server);
repeat_echo(connection).await
};

task::spawn(server);

let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, cfg, Mode::Client);
let mut control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand All @@ -89,12 +92,13 @@ fn prop_max_streams() {
fn prop_send_recv_half_closed() {
fn prop(msg: Msg) {
let msg_len = msg.0.len();
task::block_on(async move {
let (listener, address) = bind().await.expect("bind");
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (mut listener, address) = bind().await.expect("bind");

// Server should be able to write on a stream shutdown by the client.
let server = async {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
let mut connection = Connection::new(socket, Config::default(), Mode::Server);
let mut stream = connection.next_stream().await
.expect("S: next_stream")
Expand All @@ -108,7 +112,7 @@ fn prop_send_recv_half_closed() {

// Client should be able to read after shutting down the stream.
let client = async {
let socket = TcpStream::connect(address).await.expect("connect");
let socket = TcpStream::connect(address).await.expect("connect").compat();
let connection = Connection::new(socket, Config::default(), Mode::Client);
let mut control = connection.control();
task::spawn(crate::into_stream(connection).for_each(|_| future::ready(())));
Expand Down Expand Up @@ -150,7 +154,7 @@ async fn bind() -> io::Result<(TcpListener, SocketAddr)> {
}

/// For each incoming stream of `c` echo back to the sender.
async fn repeat_echo(c: Connection<TcpStream>) -> Result<(), ConnectionError> {
async fn repeat_echo(c: Connection<Compat<TcpStream>>) -> Result<(), ConnectionError> {
let c = crate::into_stream(c);
c.try_for_each_concurrent(None, |mut stream| async move {
{
Expand Down
15 changes: 8 additions & 7 deletions tests/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use async_std::{net::{TcpStream, TcpListener}, task};
use futures::{channel::mpsc, prelude::*};
use std::{net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc};
use tokio::{net::{TcpStream, TcpListener}, task};
use tokio_util::compat::Tokio02AsyncReadCompatExt;
use yamux::{Config, Connection, Mode};

async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {
let listener = TcpListener::bind(&address).await.expect("bind");
let mut listener = TcpListener::bind(&address).await.expect("bind");
let address = listener.local_addr().expect("local address");

let server = async move {
let socket = listener.accept().await.expect("accept").0;
let socket = listener.accept().await.expect("accept").0.compat();
yamux::into_stream(Connection::new(socket, Config::default(), Mode::Server))
.try_for_each_concurrent(None, |mut stream| async move {
log::debug!("S: accepted new stream");
Expand All @@ -36,7 +37,7 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {

task::spawn(server);

let socket = TcpStream::connect(&address).await.expect("connect");
let socket = TcpStream::connect(&address).await.expect("connect").compat();
let (tx, rx) = mpsc::unbounded();
let conn = Connection::new(socket, Config::default(), Mode::Client);
let mut ctrl = conn.control();
Expand Down Expand Up @@ -65,9 +66,9 @@ async fn roundtrip(address: SocketAddr, nstreams: usize, data: Arc<Vec<u8>>) {
assert_eq!(nstreams, n)
}

#[test]
fn concurrent_streams() {
#[tokio::test]
async fn concurrent_streams() {
let data = Arc::new(vec![0x42; 100 * 1024]);
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0));
task::block_on(roundtrip(addr, 1000, data))
roundtrip(addr, 1000, data).await
}

0 comments on commit 33bbe5a

Please sign in to comment.