diff --git a/Cargo.toml b/Cargo.toml index 7a63825735..07bfc835b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ futures = "0.1.21" futures-cpupool = { version = "0.1.6", optional = true } http = "0.1.15" httparse = "1.0" -h2 = "0.1.15" +h2 = "0.1.10" iovec = "0.1" itoa = "0.4.1" log = "0.4" @@ -52,6 +52,7 @@ serde_json = "1.0" [features] default = [ + "__internal_flaky_tests", "runtime", ] runtime = [ @@ -65,6 +66,7 @@ runtime = [ "tokio-timer", ] nightly = [] +__internal_flaky_tests = [] __internal_happy_eyeballs_tests = [] [profile.release] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index f287a9e034..7ccceac05d 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,17 +1,22 @@ use bytes::IntoBuf; use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, Either}; +use futures::sync::mpsc; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; use headers::content_length_parse_all; use body::Payload; -use ::common::Exec; +use ::common::{Exec, Never}; use headers; use ::proto::Dispatched; use super::{PipeToSendStream, SendBuf}; use ::{Body, Request, Response}; type ClientRx = ::client::dispatch::Receiver, Response>; +/// An mpsc channel is used to help notify the `Connection` task when *all* +/// other handles to it have been dropped, so that it can shutdown. +type ConnDropRef = mpsc::Sender; pub(crate) struct Client where @@ -24,7 +29,7 @@ where enum State where B: IntoBuf { Handshaking(Handshake), - Ready(SendRequest), + Ready(SendRequest, ConnDropRef), } impl Client @@ -59,13 +64,40 @@ where let next = match self.state { State::Handshaking(ref mut h) => { let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); + // An mpsc channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (tx, rx) = mpsc::channel(0); + let rx = rx.into_future() + .map(|(msg, _)| match msg { + Some(never) => match never {}, + None => (), + }) + .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); let fut = conn .inspect(|_| trace!("connection complete")) - .map_err(|e| debug!("connection error: {}", e)); + .map_err(|e| debug!("connection error: {}", e)) + .select2(rx) + .then(|res| match res { + Ok(Either::A(((), _))) | + Err(Either::A(((), _))) => { + // conn has finished either way + Either::A(future::ok(())) + }, + Ok(Either::B(((), conn))) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + Either::B(conn) + } + Err(Either::B((never, _))) => match never {}, + }); self.executor.execute(fut)?; - State::Ready(request_tx) + State::Ready(request_tx, tx) }, - State::Ready(ref mut tx) => { + State::Ready(ref mut tx, ref conn_dropper) => { try_ready!(tx.poll_ready().map_err(::Error::new_h2)); match self.rx.poll() { Ok(Async::Ready(Some((req, mut cb)))) => { @@ -98,6 +130,11 @@ where match pipe.poll() { Ok(Async::Ready(())) | Err(()) => (), Ok(Async::NotReady) => { + let conn_drop_ref = conn_dropper.clone(); + let pipe = pipe.then(move |x| { + drop(conn_drop_ref); + x + }); self.executor.execute(pipe)?; } } diff --git a/tests/integration.rs b/tests/integration.rs index bc292749b5..68cc4c403a 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -333,6 +333,9 @@ t! { ; } +// In rare cases, the h2 client connection does not shutdown, resulting +// in this test simply hanging... :( +#[cfg(feature = "__internal_flaky_tests")] t! { http2_parallel_10, parallel: 0..10