diff --git a/src/proto/conn.rs b/src/proto/conn.rs index 92d4e1b5ec..0d3980718d 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -235,18 +235,31 @@ where I: AsyncRead + AsyncWrite, let (reading, ret) = match self.state.reading { Reading::Body(ref mut decoder) => { - let slice = try_ready!(decoder.decode(&mut self.io)); - if !slice.is_empty() { - return Ok(Async::Ready(Some(super::Chunk::from(slice)))); - } else if decoder.is_eof() { - debug!("incoming body completed"); - (Reading::KeepAlive, Ok(Async::Ready(None))) - } else { - trace!("decode stream unexpectedly ended"); - //TODO: Should this return an UnexpectedEof? - (Reading::Closed, Ok(Async::Ready(None))) + match decoder.decode(&mut self.io) { + Ok(Async::Ready(slice)) => { + let chunk = if !slice.is_empty() { + Some(super::Chunk::from(slice)) + } else { + None + }; + let reading = if decoder.is_eof() { + debug!("incoming body completed"); + Reading::KeepAlive + } else if chunk.is_some() { + Reading::Body(decoder.clone()) + } else { + trace!("decode stream unexpectedly ended"); + //TODO: Should this return an UnexpectedEof? + Reading::Closed + }; + (reading, Ok(Async::Ready(chunk))) + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + trace!("decode stream error: {}", e); + (Reading::Closed, Err(e)) + }, } - }, _ => unreachable!("read_body invalid state: {:?}", self.state.reading), }; diff --git a/src/proto/dispatch.rs b/src/proto/dispatch.rs index ef165ef8c5..91c95c176d 100644 --- a/src/proto/dispatch.rs +++ b/src/proto/dispatch.rs @@ -85,66 +85,25 @@ where if self.is_closing { return Ok(Async::Ready(())); } else if self.conn.can_read_head() { - // can dispatch receive, or does it still care about, an incoming message? - match self.dispatch.poll_ready() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"), - Err(()) => { - trace!("dispatch no longer receiving messages"); - self.close(); - return Ok(Async::Ready(())); - } - } - // dispatch is ready for a message, try to read one - match self.conn.read_head() { - Ok(Async::Ready(Some((head, has_body)))) => { - let body = if has_body { - let (mut tx, rx) = super::body::channel(); - let _ = tx.poll_ready(); // register this task if rx is dropped - self.body_tx = Some(tx); - Some(rx) - } else { - None - }; - self.dispatch.recv_msg(Ok((head, body)))?; - }, - Ok(Async::Ready(None)) => { - // read eof, conn will start to shutdown automatically - return Ok(Async::Ready(())); - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => { - debug!("read_head error: {}", err); - self.dispatch.recv_msg(Err(err))?; - // if here, the dispatcher gave the user the error - // somewhere else. we still need to shutdown, but - // not as a second error. - return Ok(Async::Ready(())); - } - } + try_ready!(self.poll_read_head()); } else if self.conn.can_write_continue() { try_nb!(self.conn.flush()); } else if let Some(mut body) = self.body_tx.take() { - let can_read_body = self.conn.can_read_body(); - match body.poll_ready() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => { - self.body_tx = Some(body); - return Ok(Async::NotReady); - }, - Err(_canceled) => { - // user doesn't care about the body - // so we should stop reading - if can_read_body { + if self.conn.can_read_body() { + match body.poll_ready() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => { + self.body_tx = Some(body); + return Ok(Async::NotReady); + }, + Err(_canceled) => { + // user doesn't care about the body + // so we should stop reading trace!("body receiver dropped before eof, closing"); self.conn.close_read(); return Ok(Async::Ready(())); } - // else the conn body is done, and user dropped, - // so everything is fine! } - } - if can_read_body { match self.conn.read_body() { Ok(Async::Ready(Some(chunk))) => { match body.start_send(Ok(chunk)) { @@ -183,6 +142,47 @@ where } } + fn poll_read_head(&mut self) -> Poll<(), ::Error> { + // can dispatch receive, or does it still care about, an incoming message? + match self.dispatch.poll_ready() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"), + Err(()) => { + trace!("dispatch no longer receiving messages"); + self.close(); + return Ok(Async::Ready(())); + } + } + // dispatch is ready for a message, try to read one + match self.conn.read_head() { + Ok(Async::Ready(Some((head, has_body)))) => { + let body = if has_body { + let (mut tx, rx) = super::body::channel(); + let _ = tx.poll_ready(); // register this task if rx is dropped + self.body_tx = Some(tx); + Some(rx) + } else { + None + }; + self.dispatch.recv_msg(Ok((head, body)))?; + Ok(Async::Ready(())) + }, + Ok(Async::Ready(None)) => { + // read eof, conn will start to shutdown automatically + Ok(Async::Ready(())) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + debug!("read_head error: {}", err); + self.dispatch.recv_msg(Err(err))?; + // if here, the dispatcher gave the user the error + // somewhere else. we still need to shutdown, but + // not as a second error. + Ok(Async::Ready(())) + } + } + } + fn poll_write(&mut self) -> Poll<(), ::Error> { loop { if self.is_closing { diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 4c45bf07a7..462cc96762 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1,3 +1,4 @@ +use std::error::Error as StdError; use std::fmt; use std::usize; use std::io; @@ -97,7 +98,7 @@ impl Decoder { if num > *remaining { *remaining = 0; } else if num == 0 { - return Err(io::Error::new(io::ErrorKind::Other, "early eof")); + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody)); } else { *remaining -= num; } @@ -262,7 +263,7 @@ impl ChunkedState { if count == 0 { *rem = 0; - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")); + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody)); } *buf = Some(slice); *rem -= count as u64; @@ -300,9 +301,23 @@ impl ChunkedState { } } +#[derive(Debug)] +struct IncompleteBody; + +impl fmt::Display for IncompleteBody { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for IncompleteBody { + fn description(&self) -> &str { + "end of file before message length reached" + } +} + #[cfg(test)] mod tests { - use std::error::Error; use std::io; use std::io::Write; use super::Decoder; @@ -422,8 +437,7 @@ mod tests { let mut decoder = Decoder::length(10); assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7); let e = decoder.decode(&mut bytes).unwrap_err(); - assert_eq!(e.kind(), io::ErrorKind::Other); - assert_eq!(e.description(), "early eof"); + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); } #[test] @@ -436,7 +450,6 @@ mod tests { assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7); let e = decoder.decode(&mut bytes).unwrap_err(); assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); - assert_eq!(e.description(), "early eof"); } #[test] diff --git a/tests/server.rs b/tests/server.rs index fceaf48c33..78675734f3 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -14,7 +14,7 @@ use tokio_core::net::TcpListener; use tokio_core::reactor::{Core, Timeout}; use tokio_io::{AsyncRead, AsyncWrite}; -use std::net::{TcpStream, SocketAddr}; +use std::net::{TcpStream, Shutdown, SocketAddr}; use std::io::{self, Read, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; @@ -223,6 +223,26 @@ fn post_with_chunked_body() { assert_eq!(server.body(), b"qwert"); } +#[test] +fn post_with_incomplete_body() { + extern crate pretty_env_logger; + let _ = pretty_env_logger::try_init(); + let server = serve(); + let mut req = connect(server.addr()); + req.write_all(b"\ + POST / HTTP/1.1\r\n\ + Host: example.domain\r\n\ + Content-Length: 10\r\n\ + \r\n\ + 12345\ + ").expect("write"); + req.shutdown(Shutdown::Write).expect("shutdown write"); + + server.body_err(); + + req.read(&mut [0; 256]).expect("read"); +} + #[test] fn empty_response_chunked() { let server = serve(); @@ -746,24 +766,34 @@ impl Serve { } pub fn remote_addr(&self) -> SocketAddr { - match self.msg_rx.try_recv() { + match self.msg_rx.recv() { Ok(Msg::Addr(addr)) => addr, other => panic!("expected remote addr, found: {:?}", other), } } fn body(&self) -> Vec { + self.try_body().expect("body") + } + + fn body_err(&self) -> hyper::Error { + self.try_body().expect_err("body_err") + } + + fn try_body(&self) -> Result, hyper::Error> { let mut buf = vec![]; loop { - match self.msg_rx.try_recv() { + match self.msg_rx.recv() { Ok(Msg::Chunk(msg)) => { buf.extend(&msg); }, Ok(Msg::Addr(_)) => {}, - Err(_) => break, + Ok(Msg::Error(e)) => return Err(e), + Ok(Msg::End) => break, + Err(e) => panic!("expected body, found: {:?}", e), } } - buf + Ok(buf) } fn reply(&self) -> ReplyBuilder { @@ -821,6 +851,8 @@ enum Msg { //Head(Request), Addr(SocketAddr), Chunk(Vec), + Error(hyper::Error), + End, } impl NewService for TestService { @@ -841,15 +873,23 @@ impl Service for TestService { type Error = hyper::Error; type Future = Box>; fn call(&self, req: Request) -> Self::Future { - let tx = self.tx.clone(); + let tx1 = self.tx.clone(); + let tx2 = self.tx.clone(); #[allow(deprecated)] let remote_addr = req.remote_addr().expect("remote_addr"); - tx.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap(); + tx1.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap(); let replies = self.reply.clone(); Box::new(req.body().for_each(move |chunk| { - tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); + tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); + Ok(()) + }).then(move |result| { + let msg = match result { + Ok(()) => Msg::End, + Err(e) => Msg::Error(e), + }; + tx2.lock().unwrap().send(msg).unwrap(); Ok(()) }).map(move |_| { let mut res = Response::new();