Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeouts causing unexplained EOFs #246

Merged
merged 2 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl From<Error> for io::Error {
Error::ConnectFailed => io::ErrorKind::ConnectionRefused.into(),
Error::Io(e) => e,
Error::Timeout => io::ErrorKind::TimedOut.into(),
_ => io::ErrorKind::Other.into(),
e => io::Error::new(io::ErrorKind::Other, e),
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures_channel::oneshot::Sender;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{pin_mut, task::AtomicWaker};
use http::{Response, Uri};
use once_cell::sync::OnceCell;
use sluice::pipe;
use std::{
ascii,
Expand Down Expand Up @@ -106,7 +107,15 @@ struct Shared {
/// A waker used by the handler to wake up the associated future.
waker: AtomicWaker,

completed: AtomicCell<bool>,
/// Set to the final result of the transfer received from curl. This is used
/// to communicate an error while reading the response body if the handler
/// suddenly aborts.
result: OnceCell<Result<(), curl::Error>>,

/// Set to true whenever the response body is dropped. This is used in the
/// opposite manner as the above flag; if the response body is dropped, then
/// this communicates to the handler to stop running since the user has lost
/// interest in this request.
response_body_dropped: AtomicCell<bool>,
}

Expand All @@ -121,7 +130,7 @@ impl RequestHandler {
let (sender, receiver) = futures_channel::oneshot::channel();
let shared = Arc::new(Shared {
waker: AtomicWaker::default(),
completed: AtomicCell::new(false),
result: OnceCell::new(),
response_body_dropped: AtomicCell::new(false),
});
let (response_body_reader, response_body_writer) = pipe::pipe();
Expand Down Expand Up @@ -211,7 +220,7 @@ impl RequestHandler {
let span = tracing::trace_span!(parent: &self.span, "on_result");
let _enter = span.enter();

self.shared.completed.store(true);
self.shared.result.set(result.clone()).unwrap();

match result {
Ok(()) => self.flush_response_headers(),
Expand Down Expand Up @@ -688,12 +697,15 @@ impl AsyncRead for ResponseBodyReader {
match inner.poll_read(cx, buf) {
// On EOF, check to see if the transfer was cancelled, and if so,
// return an error.
Poll::Ready(Ok(0)) => {
if !self.shared.completed.load() {
Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into()))
} else {
Poll::Ready(Ok(0))
}
Poll::Ready(Ok(0)) => match self.shared.result.get() {
// The transfer did finish successfully, so return EOF.
Some(Ok(())) => Poll::Ready(Ok(0)),

// The transfer finished with an error, so return the error.
Some(Err(e)) => Poll::Ready(Err(io::Error::from(Error::from(e.clone())))),

// The transfer did not finish properly at all, so return an error.
None => Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into())),
}
poll => poll,
}
Expand Down
30 changes: 29 additions & 1 deletion tests/timeouts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use isahc::prelude::*;
use std::time::Duration;
use std::{io::{self, Cursor, Read}, thread, time::Duration};
use testserver::mock;

/// Issue #3
Expand Down Expand Up @@ -27,3 +27,31 @@ fn request_errors_if_read_timeout_is_reached() {

assert_eq!(m.requests().len(), 1);
}

/// Issue #154
#[test]
fn timeout_during_response_body_produces_error() {
struct SlowReader;

impl Read for SlowReader {
fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
thread::sleep(Duration::from_secs(2));
Ok(0)
}
}

let m = mock! {
body_reader: Cursor::new(vec![0; 100_000]).chain(SlowReader),
};

let mut response = Request::get(m.url())
.timeout(Duration::from_millis(500))
.body(())
.unwrap()
.send()
.unwrap();

// Because of the short timeout, the response body should abort while being
// read from.
assert_eq!(response.copy_to(std::io::sink()).unwrap_err().kind(), std::io::ErrorKind::TimedOut);
}
14 changes: 12 additions & 2 deletions testserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,25 @@ macro_rules! mock {
(@response($response:expr) body: $body:expr, $($tail:tt)*) => {{
let mut response = $response;

response.body = $body.into();
response = response.with_body_buf($body);

$crate::mock!(@response(response) $($tail)*)
}};

(@response($response:expr) body_reader: $body:expr, $($tail:tt)*) => {{
let mut response = $response;

response = response.with_body_reader($body);

$crate::mock!(@response(response) $($tail)*)
}};

(@response($response:expr) transfer_encoding: $value:expr, $($tail:tt)*) => {{
let mut response = $response;

response.transfer_encoding = $value;
if $value {
response.body_len = None;
}

$crate::mock!(@response(response) $($tail)*)
}};
Expand Down
4 changes: 2 additions & 2 deletions testserver/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl<R: Responder> Mock<R> {
Response {
status_code: 404,
headers: Vec::new(),
body: Vec::new(),
transfer_encoding: false,
body: Box::new(std::io::empty()),
body_len: Some(0),
}
}

Expand Down
38 changes: 24 additions & 14 deletions testserver/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
use std::io::Cursor;
use std::io::{Cursor, Read};

#[derive(Clone, Debug)]
pub struct Response {
pub status_code: u16,
pub headers: Vec<(String, String)>,
pub body: Vec<u8>,
pub transfer_encoding: bool,
pub body: Box<dyn Read>,
pub body_len: Option<usize>,
}

impl Response {
pub(crate) fn into_http_response(self) -> tiny_http::Response<Cursor<Vec<u8>>> {
let len = self.body.len();
pub fn new() -> Self {
Self::default()
}

pub fn with_body_buf(mut self, buf: impl Into<Vec<u8>>) -> Self {
let buf = buf.into();
self.body_len = Some(buf.len());
self.body = Box::new(Cursor::new(buf));
self
}

pub fn with_body_reader(mut self, reader: impl Read + 'static) -> Self {
self.body_len = None;
self.body = Box::new(reader);
self
}

pub(crate) fn into_http_response(self) -> tiny_http::Response<Box<dyn Read>> {
tiny_http::Response::new(
self.status_code.into(),
self.headers.into_iter()
Expand All @@ -20,12 +34,8 @@ impl Response {
value.as_bytes(),
).unwrap())
.collect(),
Cursor::new(self.body),
if self.transfer_encoding {
None
} else {
Some(len)
},
self.body,
self.body_len,
None,
)
}
Expand All @@ -36,8 +46,8 @@ impl Default for Response {
Self {
status_code: 200,
headers: Vec::new(),
body: Vec::new(),
transfer_encoding: false,
body: Box::new(std::io::empty()),
body_len: Some(0),
}
}
}