Skip to content

Commit

Permalink
Upgrade some lib tests to async/.await version (#1882)
Browse files Browse the repository at this point in the history
* test(http): use async/.await

Signed-off-by: Weihang Lo <me@weihanglo.tw>

* test(pool): use async/.await

* test(pool): pass &mut Future into PollOnce

* test(client): tests/benches using async/.await

* test(client): change due to PR #1917

* test(client): change Delay to delay fucntion

Ref: tokio-rs/tokio#1440

* test(client): remove warning triggers
  • Loading branch information
weihanglo authored and seanmonstar committed Sep 6, 2019
1 parent 511ea38 commit 144893b
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 159 deletions.
54 changes: 35 additions & 19 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,58 +630,76 @@ impl ConnectingTcp {

#[cfg(test)]
mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
use std::io;
use futures::Future;

use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;

use super::{Connect, Destination, HttpConnector};

#[test]
fn test_errors_missing_authority() {
let mut rt = Runtime::new().unwrap();
let uri = "/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
}

#[test]
fn test_errors_enforce_http() {
let mut rt = Runtime::new().unwrap();
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
}


#[test]
fn test_errors_missing_scheme() {
let mut rt = Runtime::new().unwrap();
let uri = "example.domain".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
});
}

#[test]
#[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
fn client_happy_eyeballs() {
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
use std::task::Poll;
use std::time::{Duration, Instant};

use futures::{Async, Poll};
use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;

use crate::common::{Pin, task};
use super::dns;
use super::ConnectingTcp;

Expand Down Expand Up @@ -768,16 +786,15 @@ mod tests {
struct ConnectingTcpFuture(ConnectingTcp);

impl Future for ConnectingTcpFuture {
type Item = u8;
type Error = ::std::io::Error;
type Output = Result<u8, std::io::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.poll(&Some(Handle::default())) {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.0.poll(cx,&Some(Handle::default())) {
Poll::Ready(Ok(stream)) => Poll::Ready(Ok(
if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 }
)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
Err(err) => Err(err),
}
}
}
Expand Down Expand Up @@ -818,6 +835,5 @@ mod tests {
(reachable, duration)
}
}
*/
}

150 changes: 90 additions & 60 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,70 +246,102 @@ impl<T, U> Callback<T, U> {

#[cfg(test)]
mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
#[cfg(feature = "nightly")]
extern crate test;

use futures::{future, Future, Stream};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::runtime::current_thread::Runtime;

use super::{Callback, channel, Receiver};

#[derive(Debug)]
struct Custom(i32);

impl<T, U> Future for Receiver<T, U> {
type Output = Option<(T, Callback<T, U>)>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_next(cx)
}
}

/// Helper to check if the future is ready after polling once.
struct PollOnce<'a, F>(&'a mut F);

impl<F, T> Future for PollOnce<'_, F>
where
F: Future<Output = T> + Unpin
{
type Output = Option<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Ready(None)
}
}
}

#[test]
fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();

future::lazy(|| {
let (mut tx, mut rx) = super::channel::<Custom, ()>();
// must poll once for try_send to succeed
assert!(rx.poll().expect("rx empty").is_not_ready());
let promise = tx.try_send(Custom(43)).unwrap();
drop(rx);
let (mut tx, mut rx) = channel::<Custom, ()>();

promise.then(|fulfilled| {
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");
// must poll once for try_send to succeed
rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none(), "rx empty");
});

match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}
let promise = tx.try_send(Custom(43)).unwrap();
drop(rx);

Ok::<(), ()>(())
})
}).wait().unwrap();
rt.block_on(async {
let fulfilled = promise.await;
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");
match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}
});
}

#[test]
fn sender_checks_for_want_on_send() {
future::lazy(|| {
let (mut tx, mut rx) = super::channel::<Custom, ()>();
// one is allowed to buffer, second is rejected
let _ = tx.try_send(Custom(1)).expect("1 buffered");
tx.try_send(Custom(2)).expect_err("2 not ready");
assert!(rx.poll().expect("rx 1").is_ready());
// Even though 1 has been popped, only 1 could be buffered for the
// lifetime of the channel.
tx.try_send(Custom(2)).expect_err("2 still not ready");
assert!(rx.poll().expect("rx empty").is_not_ready());
let _ = tx.try_send(Custom(2)).expect("2 ready");
Ok::<(), ()>(())
}).wait().unwrap();
let mut rt = Runtime::new().unwrap();
let (mut tx, mut rx) = channel::<Custom, ()>();

// one is allowed to buffer, second is rejected
let _ = tx.try_send(Custom(1)).expect("1 buffered");
tx.try_send(Custom(2)).expect_err("2 not ready");

rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_some(), "rx empty");
});

// Even though 1 has been popped, only 1 could be buffered for the
// lifetime of the channel.
tx.try_send(Custom(2)).expect_err("2 still not ready");

rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none(), "rx empty");
});

let _ = tx.try_send(Custom(2)).expect("2 ready");
}

#[test]
fn unbounded_sender_doesnt_bound_on_want() {
let (tx, rx) = super::channel::<Custom, ()>();
let (tx, rx) = channel::<Custom, ()>();
let mut tx = tx.unbound();

let _ = tx.try_send(Custom(1)).unwrap();
Expand All @@ -325,46 +357,44 @@ mod tests {
#[bench]
fn giver_queue_throughput(b: &mut test::Bencher) {
use crate::{Body, Request, Response};
let (mut tx, mut rx) = super::channel::<Request<Body>, Response<Body>>();

let mut rt = Runtime::new().unwrap();
let (mut tx, mut rx) = channel::<Request<Body>, Response<Body>>();

b.iter(move || {
::futures::future::lazy(|| {
let _ = tx.send(Request::default()).unwrap();
let _ = tx.send(Request::default()).unwrap();
rt.block_on(async {
loop {
let ok = rx.poll().unwrap();
if ok.is_not_ready() {
break;
let poll_once = PollOnce(&mut rx);
let opt = poll_once.await;
if opt.is_none() {
break
}
}
Ok::<_, ()>(())
}).wait().unwrap();
});
})
}

#[cfg(feature = "nightly")]
#[bench]
fn giver_queue_not_ready(b: &mut test::Bencher) {
let (_tx, mut rx) = super::channel::<i32, ()>();
let mut rt = Runtime::new().unwrap();
let (_tx, mut rx) = channel::<i32, ()>();
b.iter(move || {
::futures::future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<(), ()>(())
}).wait().unwrap();
rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none());
});
})
}

#[cfg(feature = "nightly")]
#[bench]
fn giver_queue_cancel(b: &mut test::Bencher) {
let (_tx, mut rx) = super::channel::<i32, ()>();
let (_tx, mut rx) = channel::<i32, ()>();

b.iter(move || {
rx.taker.cancel();
})
}
*/
}
Loading

0 comments on commit 144893b

Please sign in to comment.