From 7b7dcc8f68227b1c2dac5dcab4a68348ebe70ca1 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 22 Jan 2019 14:04:40 -0800 Subject: [PATCH] refactor(http2): remove extra mpsc trying to work around h2 hang --- Cargo.toml | 4 +--- src/proto/h2/client.rs | 47 +++++------------------------------------- tests/integration.rs | 3 --- 3 files changed, 6 insertions(+), 48 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 239581206f..b889200f1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ futures = "0.1.21" futures-cpupool = { version = "0.1.6", optional = true } http = "0.1.15" httparse = "1.0" -h2 = "0.1.10" +h2 = "0.1.15" iovec = "0.1" itoa = "0.4.1" log = "0.4" @@ -52,7 +52,6 @@ serde_json = "1.0" [features] default = [ - "__internal_flaky_tests", "runtime", ] runtime = [ @@ -66,7 +65,6 @@ runtime = [ "tokio-timer", ] nightly = [] -__internal_flaky_tests = [] __internal_happy_eyeballs_tests = [] [profile.release] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 7ccceac05d..f287a9e034 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,22 +1,17 @@ use bytes::IntoBuf; use futures::{Async, Future, Poll, Stream}; -use futures::future::{self, Either}; -use futures::sync::mpsc; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; use headers::content_length_parse_all; use body::Payload; -use ::common::{Exec, Never}; +use ::common::Exec; use headers; use ::proto::Dispatched; use super::{PipeToSendStream, SendBuf}; use ::{Body, Request, Response}; type ClientRx = ::client::dispatch::Receiver, Response>; -/// An mpsc channel is used to help notify the `Connection` task when *all* -/// other handles to it have been dropped, so that it can shutdown. -type ConnDropRef = mpsc::Sender; pub(crate) struct Client where @@ -29,7 +24,7 @@ where enum State where B: IntoBuf { Handshaking(Handshake), - Ready(SendRequest, ConnDropRef), + Ready(SendRequest), } impl Client @@ -64,40 +59,13 @@ where let next = match self.state { State::Handshaking(ref mut h) => { let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); - // An mpsc channel is used entirely to detect when the - // 'Client' has been dropped. This is to get around a bug - // in h2 where dropping all SendRequests won't notify a - // parked Connection. - let (tx, rx) = mpsc::channel(0); - let rx = rx.into_future() - .map(|(msg, _)| match msg { - Some(never) => match never {}, - None => (), - }) - .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); let fut = conn .inspect(|_| trace!("connection complete")) - .map_err(|e| debug!("connection error: {}", e)) - .select2(rx) - .then(|res| match res { - Ok(Either::A(((), _))) | - Err(Either::A(((), _))) => { - // conn has finished either way - Either::A(future::ok(())) - }, - Ok(Either::B(((), conn))) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - Either::B(conn) - } - Err(Either::B((never, _))) => match never {}, - }); + .map_err(|e| debug!("connection error: {}", e)); self.executor.execute(fut)?; - State::Ready(request_tx, tx) + State::Ready(request_tx) }, - State::Ready(ref mut tx, ref conn_dropper) => { + State::Ready(ref mut tx) => { try_ready!(tx.poll_ready().map_err(::Error::new_h2)); match self.rx.poll() { Ok(Async::Ready(Some((req, mut cb)))) => { @@ -130,11 +98,6 @@ where match pipe.poll() { Ok(Async::Ready(())) | Err(()) => (), Ok(Async::NotReady) => { - let conn_drop_ref = conn_dropper.clone(); - let pipe = pipe.then(move |x| { - drop(conn_drop_ref); - x - }); self.executor.execute(pipe)?; } } diff --git a/tests/integration.rs b/tests/integration.rs index 68cc4c403a..bc292749b5 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -333,9 +333,6 @@ t! { ; } -// In rare cases, the h2 client connection does not shutdown, resulting -// in this test simply hanging... :( -#[cfg(feature = "__internal_flaky_tests")] t! { http2_parallel_10, parallel: 0..10