From 0d24e94afca5c4b970537fd1b20da930ac46b4bc Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 29 Aug 2019 13:38:22 -0700 Subject: [PATCH] feat(client): change `GaiResolver` to use a global blocking threadpool BREAKING CHANGE: Calls to `GaiResolver::new` and `HttpConnector::new` no longer should pass an integer argument for the number of threads. --- Cargo.toml | 2 +- src/client/connect/dns.rs | 65 ++++++++------------------------------ src/client/connect/http.rs | 5 ++- src/client/mod.rs | 2 +- tests/client.rs | 18 +++++------ tests/support/mod.rs | 2 +- 6 files changed, 28 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b57bdd98c3..d22904631c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ pin-utils = "=0.1.0-alpha.4" time = "0.1" tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] } tower-service = "=0.3.0-alpha.1" -tokio-executor = "=0.2.0-alpha.4" +tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] } tokio-io = "=0.2.0-alpha.4" tokio-sync = "=0.2.0-alpha.4" tokio-net = { version = "=0.2.0-alpha.4", optional = true, features = ["tcp"] } diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index 24969f048e..7776716674 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -15,7 +15,6 @@ use std::net::{ }; use std::str::FromStr; -use futures_util::{FutureExt, StreamExt}; use tokio_sync::{mpsc, oneshot}; use crate::common::{Future, Never, Pin, Poll, Unpin, task}; @@ -39,10 +38,7 @@ pub struct Name { /// A resolver using blocking `getaddrinfo` calls in a threadpool. #[derive(Clone)] pub struct GaiResolver { - tx: tokio_executor::threadpool::Sender, - /// A handle to keep the threadpool alive until all `GaiResolver` clones - /// have been dropped. - _threadpool_keep_alive: ThreadPoolKeepAlive, + _priv: (), } #[derive(Clone)] @@ -55,8 +51,7 @@ pub struct GaiAddrs { /// A future to resole a name returned by `GaiResolver`. pub struct GaiFuture { - rx: oneshot::Receiver>, - _threadpool_keep_alive: ThreadPoolKeepAlive, + inner: tokio_executor::blocking::Blocking>, } impl Name { @@ -108,40 +103,9 @@ impl Error for InvalidNameError {} impl GaiResolver { /// Construct a new `GaiResolver`. - /// - /// Takes number of DNS worker threads. - pub fn new(threads: usize) -> Self { - let pool = tokio_executor::threadpool::Builder::new() - .name_prefix("hyper-dns-gai-resolver") - // not for CPU tasks, so only spawn workers - // in blocking mode - .pool_size(1) - .max_blocking(threads) - .build(); - - let tx = pool.sender().clone(); - - // The pool will start to shutdown once `pool` is dropped, - // so to keep it alive, we spawn a future onto the pool itself - // that will only resolve once all `GaiResolver` requests - // are finished. - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - - let on_shutdown = shutdown_rx - .into_future() - .map(move |(next, _rx)| { - match next { - Some(never) => match never {}, - None => (), - } - - drop(pool) - }); - tx.spawn(on_shutdown).expect("can spawn on self"); - + pub fn new() -> Self { GaiResolver { - tx, - _threadpool_keep_alive: ThreadPoolKeepAlive(shutdown_tx), + _priv: (), } } } @@ -151,14 +115,14 @@ impl Resolve for GaiResolver { type Future = GaiFuture; fn resolve(&self, name: Name) -> Self::Future { - let (tx, rx) = oneshot::channel(); - self.tx.spawn(GaiBlocking { - host: name.host, - tx: Some(tx), - }).expect("spawn GaiBlocking"); + let blocking = tokio_executor::blocking::run(move || { + debug!("resolving host={:?}", name.host); + (&*name.host, 0).to_socket_addrs() + .map(|i| IpAddrs { iter: i }) + }); + GaiFuture { - rx, - _threadpool_keep_alive: self._threadpool_keep_alive.clone(), + inner: blocking, } } } @@ -173,10 +137,9 @@ impl Future for GaiFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - Pin::new(&mut self.rx).poll(cx).map(|res| match res { - Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }), - Ok(Err(err)) => Err(err), - Err(_canceled) => unreachable!("GaiResolver threadpool shutdown"), + Pin::new(&mut self.inner).poll(cx).map(|res| match res { + Ok(addrs) => Ok(GaiAddrs { inner: addrs }), + Err(err) => Err(err), }) } } diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 41f6474907..3aabf2f3bf 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -77,9 +77,8 @@ impl HttpConnector { /// Construct a new HttpConnector. /// /// Takes number of DNS worker threads. - #[inline] - pub fn new(threads: usize) -> HttpConnector { - HttpConnector::new_with_resolver(GaiResolver::new(threads)) + pub fn new() -> HttpConnector { + HttpConnector::new_with_resolver(GaiResolver::new()) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 4181b901b6..2d92c1d260 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1023,7 +1023,7 @@ impl Builder { B: Payload + Send, B::Data: Send, { - let mut connector = HttpConnector::new(4); + let mut connector = HttpConnector::new(); if self.pool_config.enabled { connector.set_keepalive(self.pool_config.keep_alive_timeout); } diff --git a/tests/client.rs b/tests/client.rs index 6e0861d5da..7eed383bcb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -213,7 +213,7 @@ macro_rules! test { let addr = server.local_addr().expect("local_addr"); let rt = $runtime; - let connector = ::hyper::client::HttpConnector::new(1); + let connector = ::hyper::client::HttpConnector::new(); let client = Client::builder() .set_host($set_host) .http1_title_case_headers($title_case_headers) @@ -781,7 +781,7 @@ mod dispatch_impl { let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let (tx1, rx1) = oneshot::channel(); @@ -837,7 +837,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -889,7 +889,7 @@ mod dispatch_impl { }); let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -948,7 +948,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -996,7 +996,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1046,7 +1046,7 @@ mod dispatch_impl { let client = Client::builder() .keep_alive(false) - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1090,7 +1090,7 @@ mod dispatch_impl { }); let client = Client::builder() - .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1527,7 +1527,7 @@ mod dispatch_impl { impl DebugConnector { fn new() -> DebugConnector { - let http = HttpConnector::new(1); + let http = HttpConnector::new(); let (tx, _) = mpsc::channel(10); DebugConnector::with_http_and_closes(http, tx) } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 1d5eae1aea..601e699e95 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -313,7 +313,7 @@ pub fn __run_test(cfg: __TestConfig) { Version::HTTP_11 }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); let client = Client::builder() .keep_alive_timeout(Duration::from_secs(10)) .http2_only(cfg.client_version == 2)