diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5d6b9d2b..8627375a 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -461,13 +461,27 @@ where // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Error::Io(e, inner)) => { - tracing::debug!(error = ?e, "Connection::poll; IO error"); - let e = Error::Io(e, inner); + Err(Error::Io(kind, inner)) => { + tracing::debug!(error = ?kind, "Connection::poll; IO error"); + let e = Error::Io(kind, inner); // Reset all active streams self.streams.handle_error(e.clone()); + // Some client implementations drop the connections without notifying its peer + // Attempting to read after the client dropped the connection results in UnexpectedEof + // If as a server, we don't have anything more to send, just close the connection + // without error + // + // See https://github.com/hyperium/hyper/issues/3427 + if self.streams.is_server() + && self.streams.is_buffer_empty() + && matches!(kind, io::ErrorKind::UnexpectedEof) + { + *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library); + return Ok(()); + } + // Return the error Err(e) } diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 2648a410..02d26506 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -29,6 +29,10 @@ impl Buffer { pub fn new() -> Self { Buffer { slab: Slab::new() } } + + pub fn is_empty(&self) -> bool { + self.slab.is_empty() + } } impl Deque { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f4b12c7b..fa8e6843 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -323,6 +323,14 @@ where } impl DynStreams<'_, B> { + pub fn is_buffer_empty(&self) -> bool { + self.send_buffer.is_empty() + } + + pub fn is_server(&self) -> bool { + self.peer.is_server() + } + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); @@ -1509,6 +1517,11 @@ impl SendBuffer { let inner = Mutex::new(Buffer::new()); SendBuffer { inner } } + + pub fn is_empty(&self) -> bool { + let buf = self.inner.lock().unwrap(); + buf.is_empty() + } } // ===== impl Actions ===== diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 18d08484..60539d0a 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -54,6 +54,9 @@ struct Inner { /// True when the pipe is closed. closed: bool, + + /// Trigger an `UnexpectedEof` error on read + unexpected_eof: bool, } const PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; @@ -73,6 +76,7 @@ pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) { tx_rem: cap, tx_rem_task: None, closed: false, + unexpected_eof: false, })); let mock = Mock { @@ -96,6 +100,11 @@ impl Handle { &mut self.codec } + pub fn close_without_notify(&mut self) { + let mut me = self.codec.get_mut().inner.lock().unwrap(); + me.unexpected_eof = true; + } + /// Send a frame pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> { // Queue the frame @@ -348,6 +357,13 @@ impl AsyncRead for Mock { let mut me = self.pipe.inner.lock().unwrap(); + if me.unexpected_eof { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Simulate an unexpected eof error", + ))); + } + if me.rx.is_empty() { if me.closed { return Poll::Ready(Ok(())); diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 13bc5f22..7bd223e3 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -2,6 +2,7 @@ use futures::future::{join, ready, select, Either}; use futures::stream::FuturesUnordered; use futures::StreamExt; use h2_support::prelude::*; +use std::io; use std::pin::Pin; use std::task::Context; @@ -1822,6 +1823,46 @@ async fn receive_settings_frame_twice_with_second_one_non_empty() { join(srv, h2).await; } +#[tokio::test] +async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.close_without_notify(); + }; + + let h2 = async move { + let (mut client, h2) = client::handshake(io).await.unwrap(); + tokio::spawn(async move { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let _res = client + .send_request(request, true) + .unwrap() + .0 + .await + .expect("request"); + }); + let err = h2.await.expect_err("should receive UnexpectedEof"); + assert_eq!( + err.get_io().expect("should be UnexpectedEof").kind(), + io::ErrorKind::UnexpectedEof, + ); + }; + join(srv, h2).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index dd97e94d..831f1882 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1416,3 +1416,40 @@ async fn reject_informational_status_header_in_request() { join(client, srv).await; } + +#[tokio::test] +async fn client_drop_connection_without_close_notify() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + let client = async move { + let _recv_settings = client.assert_server_handshake().await; + client + .send_frame(frames::headers(1).request("GET", "https://example.com/")) + .await; + client.send_frame(frames::data(1, &b"hello"[..])).await; + client.recv_frame(frames::headers(1).response(200)).await; + + client.close_without_notify(); // Client closed without notify causing UnexpectedEof + }; + + let mut builder = server::Builder::new(); + builder.max_concurrent_streams(1); + + let h2 = async move { + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + let (req, mut stream) = srv.next().await.unwrap().unwrap(); + + assert_eq!(req.method(), &http::Method::GET); + + let rsp = http::Response::builder().status(200).body(()).unwrap(); + stream.send_response(rsp, false).unwrap(); + + // Step the conn state forward and hitting the EOF + // But we have no outstanding request from client to be satisfied, so we should not return + // an error + let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap(); + }; + + join(client, h2).await; +}