diff --git a/src/error.rs b/src/error.rs index fccc1c67..b6580f24 100644 --- a/src/error.rs +++ b/src/error.rs @@ -167,7 +167,7 @@ impl From for io::Error { Error::ConnectFailed => io::ErrorKind::ConnectionRefused.into(), Error::Io(e) => e, Error::Timeout => io::ErrorKind::TimedOut.into(), - _ => io::ErrorKind::Other.into(), + e => io::Error::new(io::ErrorKind::Other, e), } } } diff --git a/src/handler.rs b/src/handler.rs index 458c2bc4..58691f3e 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -13,6 +13,7 @@ use futures_channel::oneshot::Sender; use futures_io::{AsyncRead, AsyncWrite}; use futures_util::{pin_mut, task::AtomicWaker}; use http::{Response, Uri}; +use once_cell::sync::OnceCell; use sluice::pipe; use std::{ ascii, @@ -106,7 +107,15 @@ struct Shared { /// A waker used by the handler to wake up the associated future. waker: AtomicWaker, - completed: AtomicCell, + /// Set to the final result of the transfer received from curl. This is used + /// to communicate an error while reading the response body if the handler + /// suddenly aborts. + result: OnceCell>, + + /// Set to true whenever the response body is dropped. This is used in the + /// opposite manner as the above flag; if the response body is dropped, then + /// this communicates to the handler to stop running since the user has lost + /// interest in this request. response_body_dropped: AtomicCell, } @@ -121,7 +130,7 @@ impl RequestHandler { let (sender, receiver) = futures_channel::oneshot::channel(); let shared = Arc::new(Shared { waker: AtomicWaker::default(), - completed: AtomicCell::new(false), + result: OnceCell::new(), response_body_dropped: AtomicCell::new(false), }); let (response_body_reader, response_body_writer) = pipe::pipe(); @@ -211,7 +220,7 @@ impl RequestHandler { let span = tracing::trace_span!(parent: &self.span, "on_result"); let _enter = span.enter(); - self.shared.completed.store(true); + self.shared.result.set(result.clone()).unwrap(); match result { Ok(()) => self.flush_response_headers(), @@ -688,12 +697,15 @@ impl AsyncRead for ResponseBodyReader { match inner.poll_read(cx, buf) { // On EOF, check to see if the transfer was cancelled, and if so, // return an error. - Poll::Ready(Ok(0)) => { - if !self.shared.completed.load() { - Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into())) - } else { - Poll::Ready(Ok(0)) - } + Poll::Ready(Ok(0)) => match self.shared.result.get() { + // The transfer did finish successfully, so return EOF. + Some(Ok(())) => Poll::Ready(Ok(0)), + + // The transfer finished with an error, so return the error. + Some(Err(e)) => Poll::Ready(Err(io::Error::from(Error::from(e.clone())))), + + // The transfer did not finish properly at all, so return an error. + None => Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into())), } poll => poll, } diff --git a/tests/timeouts.rs b/tests/timeouts.rs index eb40d18b..37364e84 100644 --- a/tests/timeouts.rs +++ b/tests/timeouts.rs @@ -1,5 +1,5 @@ use isahc::prelude::*; -use std::time::Duration; +use std::{io::{self, Cursor, Read}, thread, time::Duration}; use testserver::mock; /// Issue #3 @@ -27,3 +27,31 @@ fn request_errors_if_read_timeout_is_reached() { assert_eq!(m.requests().len(), 1); } + +/// Issue #154 +#[test] +fn timeout_during_response_body_produces_error() { + struct SlowReader; + + impl Read for SlowReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + thread::sleep(Duration::from_secs(2)); + Ok(0) + } + } + + let m = mock! { + body_reader: Cursor::new(vec![0; 100_000]).chain(SlowReader), + }; + + let mut response = Request::get(m.url()) + .timeout(Duration::from_millis(500)) + .body(()) + .unwrap() + .send() + .unwrap(); + + // Because of the short timeout, the response body should abort while being + // read from. + assert_eq!(response.copy_to(std::io::sink()).unwrap_err().kind(), std::io::ErrorKind::TimedOut); +} diff --git a/testserver/src/lib.rs b/testserver/src/lib.rs index 29ef85cc..f2e669cb 100644 --- a/testserver/src/lib.rs +++ b/testserver/src/lib.rs @@ -24,7 +24,15 @@ macro_rules! mock { (@response($response:expr) body: $body:expr, $($tail:tt)*) => {{ let mut response = $response; - response.body = $body.into(); + response = response.with_body_buf($body); + + $crate::mock!(@response(response) $($tail)*) + }}; + + (@response($response:expr) body_reader: $body:expr, $($tail:tt)*) => {{ + let mut response = $response; + + response = response.with_body_reader($body); $crate::mock!(@response(response) $($tail)*) }}; @@ -32,7 +40,9 @@ macro_rules! mock { (@response($response:expr) transfer_encoding: $value:expr, $($tail:tt)*) => {{ let mut response = $response; - response.transfer_encoding = $value; + if $value { + response.body_len = None; + } $crate::mock!(@response(response) $($tail)*) }}; diff --git a/testserver/src/mock.rs b/testserver/src/mock.rs index e2c0ca95..1d410e0b 100644 --- a/testserver/src/mock.rs +++ b/testserver/src/mock.rs @@ -112,8 +112,8 @@ impl Mock { Response { status_code: 404, headers: Vec::new(), - body: Vec::new(), - transfer_encoding: false, + body: Box::new(std::io::empty()), + body_len: Some(0), } } diff --git a/testserver/src/response.rs b/testserver/src/response.rs index 825bf4f5..9051d310 100644 --- a/testserver/src/response.rs +++ b/testserver/src/response.rs @@ -1,17 +1,31 @@ -use std::io::Cursor; +use std::io::{Cursor, Read}; -#[derive(Clone, Debug)] pub struct Response { pub status_code: u16, pub headers: Vec<(String, String)>, - pub body: Vec, - pub transfer_encoding: bool, + pub body: Box, + pub body_len: Option, } impl Response { - pub(crate) fn into_http_response(self) -> tiny_http::Response>> { - let len = self.body.len(); + pub fn new() -> Self { + Self::default() + } + + pub fn with_body_buf(mut self, buf: impl Into>) -> Self { + let buf = buf.into(); + self.body_len = Some(buf.len()); + self.body = Box::new(Cursor::new(buf)); + self + } + + pub fn with_body_reader(mut self, reader: impl Read + 'static) -> Self { + self.body_len = None; + self.body = Box::new(reader); + self + } + pub(crate) fn into_http_response(self) -> tiny_http::Response> { tiny_http::Response::new( self.status_code.into(), self.headers.into_iter() @@ -20,12 +34,8 @@ impl Response { value.as_bytes(), ).unwrap()) .collect(), - Cursor::new(self.body), - if self.transfer_encoding { - None - } else { - Some(len) - }, + self.body, + self.body_len, None, ) } @@ -36,8 +46,8 @@ impl Default for Response { Self { status_code: 200, headers: Vec::new(), - body: Vec::new(), - transfer_encoding: false, + body: Box::new(std::io::empty()), + body_len: Some(0), } } }