Skip to content

Commit

Permalink
fix(server): error if Response code is 1xx
Browse files Browse the repository at this point in the history
Returning a Response from a Service with a 1xx StatusCode is not
currently supported in hyper. It has always resulted in broken
semantics. This patch simply errors better.

- A Response with 1xx status is converted into a 500 response with no body.
- An error is returned from the `server::Connection` to alert about the
  bad response.
  • Loading branch information
seanmonstar committed Jan 23, 2018
1 parent 2277422 commit 44c34ce
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 13 deletions.
28 changes: 23 additions & 5 deletions src/proto/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ where I: AsyncRead + AsyncWrite,
Conn {
io: Buffered::new(io),
state: State {
error: None,
keep_alive: keep_alive,
method: None,
read_task: None,
Expand Down Expand Up @@ -437,11 +438,18 @@ where I: AsyncRead + AsyncWrite,
buf.extend_from_slice(pending.buf());
}
}
let encoder = T::encode(head, body, &mut self.state.method, buf);
self.state.writing = if !encoder.is_eof() {
Writing::Body(encoder, None)
} else {
Writing::KeepAlive
self.state.writing = match T::encode(head, body, &mut self.state.method, buf) {
Ok(encoder) => {
if !encoder.is_eof() {
Writing::Body(encoder, None)
} else {
Writing::KeepAlive
}
},
Err(err) => {
self.state.error = Some(err);
Writing::Closed
}
};
}

Expand Down Expand Up @@ -626,6 +634,14 @@ where I: AsyncRead + AsyncWrite,
self.state.disable_keep_alive();
}
}

pub fn take_error(&mut self) -> ::Result<()> {
if let Some(err) = self.state.error.take() {
Err(err)
} else {
Ok(())
}
}
}

// ==== tokio_proto impl ====
Expand Down Expand Up @@ -736,6 +752,7 @@ impl<I, B: AsRef<[u8]>, T, K: KeepAlive> fmt::Debug for Conn<I, B, T, K> {
}

struct State<B, K> {
error: Option<::Error>,
keep_alive: K,
method: Option<Method>,
read_task: Option<Task>,
Expand Down Expand Up @@ -767,6 +784,7 @@ impl<B: AsRef<[u8]>, K: KeepAlive> fmt::Debug for State<B, K> {
.field("reading", &self.reading)
.field("writing", &self.writing)
.field("keep_alive", &self.keep_alive.status())
.field("error", &self.error)
//.field("method", &self.method)
.field("read_task", &self.read_task)
.finish()
Expand Down
1 change: 1 addition & 0 deletions src/proto/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ where

if self.is_done() {
try_ready!(self.conn.shutdown());
self.conn.take_error()?;
trace!("Dispatch::poll done");
Ok(Async::Ready(()))
} else {
Expand Down
26 changes: 20 additions & 6 deletions src/proto/h1/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,23 @@ impl Http1Transaction for ServerTransaction {
}


fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> Encoder {
fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
trace!("ServerTransaction::encode has_body={}, method={:?}", has_body, method);

let body = ServerTransaction::set_length(&mut head, has_body, method.as_ref());
// hyper currently doesn't support returning 1xx status codes as a Response
// This is because Service only allows returning a single Response, and
// so if you try to reply with a e.g. 100 Continue, you have no way of
// replying with the latter status code response.
let ret = if head.subject.is_informational() {
error!("response with 1xx status code not supported");
head = MessageHead::default();
head.subject = ::StatusCode::InternalServerError;
head.headers.set(ContentLength(0));
Err(::Error::Status)
} else {
Ok(ServerTransaction::set_length(&mut head, has_body, method.as_ref()))
};


let init_cap = 30 + head.headers.len() * AVERAGE_HEADER_SIZE;
dst.reserve(init_cap);
Expand All @@ -133,7 +146,8 @@ impl Http1Transaction for ServerTransaction {
extend(dst, b"\r\n");
}
extend(dst, b"\r\n");
body

ret
}

fn should_error_on_parse_eof() -> bool {
Expand Down Expand Up @@ -289,7 +303,7 @@ impl Http1Transaction for ClientTransaction {
}
}

fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> Encoder {
fn encode(mut head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<Encoder> {
trace!("ClientTransaction::encode has_body={}, method={:?}", has_body, method);

*method = Some(head.subject.0.clone());
Expand All @@ -300,7 +314,7 @@ impl Http1Transaction for ClientTransaction {
dst.reserve(init_cap);
let _ = write!(FastWrite(dst), "{} {}\r\n{}\r\n", head.subject, head.version, head.headers);

body
Ok(body)
}

fn should_error_on_parse_eof() -> bool {
Expand Down Expand Up @@ -645,7 +659,7 @@ mod tests {

b.iter(|| {
let mut vec = Vec::new();
ServerTransaction::encode(head.clone(), true, &mut None, &mut vec);
ServerTransaction::encode(head.clone(), true, &mut None, &mut vec).unwrap();
assert_eq!(vec.len(), len);
::test::black_box(vec);
})
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub trait Http1Transaction {
type Outgoing: Default;
fn parse(bytes: &mut BytesMut) -> ParseResult<Self::Incoming>;
fn decoder(head: &MessageHead<Self::Incoming>, method: &mut Option<::Method>) -> ::Result<Option<h1::Decoder>>;
fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> h1::Encoder;
fn encode(head: MessageHead<Self::Outgoing>, has_body: bool, method: &mut Option<Method>, dst: &mut Vec<u8>) -> ::Result<h1::Encoder>;

fn should_error_on_parse_eof() -> bool;
fn should_read_first() -> bool;
Expand Down
36 changes: 35 additions & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use hyper::server::{Http, Request, Response, Service, NewService};
use hyper::StatusCode;
use hyper::server::{Http, Request, Response, Service, NewService, service_fn};


#[test]
Expand Down Expand Up @@ -867,6 +868,38 @@ fn nonempty_parse_eof_returns_error() {
core.run(fut).unwrap_err();
}

#[test]
fn returning_1xx_response_is_error() {
let mut core = Core::new().unwrap();
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap();
let addr = listener.local_addr().unwrap();

thread::spawn(move || {
let mut tcp = connect(&addr);
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
let mut buf = [0; 256];
tcp.read(&mut buf).unwrap();

let expected = "HTTP/1.1 500 ";
assert_eq!(s(&buf[..expected.len()]), expected);
});

let fut = listener.incoming()
.into_future()
.map_err(|_| unreachable!())
.and_then(|(item, _incoming)| {
let (socket, _) = item.unwrap();
Http::<hyper::Chunk>::new()
.serve_connection(socket, service_fn(|_| {
Ok(Response::<hyper::Body>::new()
.with_status(StatusCode::Continue))
}))
.map(|_| ())
});

core.run(fut).unwrap_err();
}

#[test]
fn remote_addr() {
let server = serve();
Expand Down Expand Up @@ -1191,3 +1224,4 @@ impl Drop for Dropped {
self.0.store(true, Ordering::SeqCst);
}
}

0 comments on commit 44c34ce

Please sign in to comment.