From 1b1311a7d36b000c9c2c509971ee759da8765711 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 16 Feb 2017 12:39:50 -0800 Subject: [PATCH] feat(http): allow specifying custom body streams --- benches/end_to_end.rs | 2 +- src/client/mod.rs | 171 ++++++++++++++++++++++++++---------- src/client/request.rs | 47 ++-------- src/http/body.rs | 7 -- src/http/chunk.rs | 10 --- src/http/conn.rs | 193 ++++++++++++++++++++++++----------------- src/http/h1/encode.rs | 11 +++ src/http/io.rs | 39 ++------- src/http/mod.rs | 17 ++++ src/server/mod.rs | 123 ++++++++++++++++---------- src/server/response.rs | 26 ++++-- 11 files changed, 377 insertions(+), 269 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 88585cfa54..05ebe676c9 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -60,7 +60,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) { req.headers_mut().set(ContentLength(post.len() as u64)); req.set_body(post); - let work = client.get(url.clone()).and_then(|res| { + let work = client.request(req).and_then(|res| { res.body().for_each(|_chunk| { Ok(()) }) diff --git a/src/client/mod.rs b/src/client/mod.rs index 6b150b755b..923afea216 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -6,10 +6,11 @@ use std::cell::RefCell; use std::fmt; use std::io; +use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; -use futures::{Poll, Async, Future}; +use futures::{Poll, Async, Future, Stream}; use relay; use tokio::io::Io; use tokio::reactor::Handle; @@ -37,14 +38,21 @@ mod response; /// A Client to make outgoing HTTP requests. // If the Connector is clone, then the Client can be clone easily. -#[derive(Clone)] -pub struct Client { +pub struct Client { connector: C, handle: Handle, - pool: Pool, + pool: Pool>, } -impl Client { +impl Client { + /// Create a new Client with the default config. + #[inline] + pub fn new(handle: &Handle) -> Client { + Config::default().build(handle) + } +} + +impl Client { /// Configure a Client. /// /// # Example @@ -63,30 +71,28 @@ impl Client { /// # } /// ``` #[inline] - pub fn configure() -> Config { + pub fn configure() -> Config { Config::default() } } -impl Client { - /// Create a new Client with the default config. - #[inline] - pub fn new(handle: &Handle) -> Client { - Client::configure().build(handle) - } -} - -impl Client { +impl Client { /// Create a new client with a specific connector. #[inline] - fn configured(config: Config, handle: &Handle) -> Client { + fn configured(config: Config, handle: &Handle) -> Client { Client { connector: config.connector, handle: handle.clone(), pool: Pool::new(config.keep_alive, config.keep_alive_timeout), } } +} +impl Client +where C: Connect, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ /// Send a GET Request using this Client. #[inline] pub fn get(&self, url: Url) -> FutureResponse { @@ -95,7 +101,7 @@ impl Client { /// Send a constructed Request using this Client. #[inline] - pub fn request(&self, req: Request) -> FutureResponse { + pub fn request(&self, req: Request) -> FutureResponse { self.call(req) } } @@ -118,13 +124,17 @@ impl Future for FutureResponse { } } -impl Service for Client { - type Request = Request; +impl Service for Client +where C: Connect, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Request = Request; type Response = Response; type Error = ::Error; type Future = FutureResponse; - fn call(&self, req: Request) -> Self::Future { + fn call(&self, req: Self::Request) -> Self::Future { let url = req.url().clone(); let (mut head, body) = request::split(req); let mut headers = Headers::new(); @@ -178,26 +188,40 @@ impl Service for Client { } -impl fmt::Debug for Client { +impl Clone for Client { + fn clone(&self) -> Client { + Client { + connector: self.connector.clone(), + handle: self.handle.clone(), + pool: self.pool.clone(), + } + } +} + +impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.pad("Client") } } -type TokioClient = ClientProxy, Message, ::Error>; +type TokioClient = ClientProxy, Message, ::Error>; -struct HttpClient { - client_rx: RefCell>>>, +struct HttpClient { + client_rx: RefCell>>>>, } -impl ClientProto for HttpClient { +impl ClientProto for HttpClient +where T: Io + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ type Request = http::RequestHead; - type RequestBody = http::Chunk; + type RequestBody = B::Item; type Response = http::ResponseHead; type ResponseBody = http::Chunk; type Error = ::Error; - type Transport = http::Conn>; - type BindTransport = BindingClient; + type Transport = http::Conn>>; + type BindTransport = BindingClient; fn bind_transport(&self, io: T) -> Self::BindTransport { BindingClient { @@ -207,13 +231,17 @@ impl ClientProto for HttpClient { } } -struct BindingClient { - rx: relay::Receiver>, +struct BindingClient { + rx: relay::Receiver>>, io: Option, } -impl Future for BindingClient { - type Item = http::Conn>; +impl Future for BindingClient +where T: Io + 'static, + B: Stream, + B::Item: AsRef<[u8]>, +{ + type Item = http::Conn>>; type Error = io::Error; fn poll(&mut self) -> Poll { @@ -228,8 +256,8 @@ impl Future for BindingClient { } /// Configuration for a Client -#[derive(Debug, Clone)] -pub struct Config { +pub struct Config { + _body_type: PhantomData, //connect_timeout: Duration, connector: C, keep_alive: bool, @@ -242,9 +270,10 @@ pub struct Config { #[derive(Debug, Clone, Copy)] pub struct UseDefaultConnector(()); -impl Config { - fn default() -> Config { +impl Default for Config { + fn default() -> Config { Config { + _body_type: PhantomData::, //connect_timeout: Duration::from_secs(10), connector: UseDefaultConnector(()), keep_alive: true, @@ -254,11 +283,33 @@ impl Config { } } -impl Config { +impl Config { + /// Set the body stream to be used by the `Client`. + /// + /// # Example + /// + /// ```rust + /// # use hyper::client::Config; + /// let cfg = Config::default() + /// .body::(); + /// # drop(cfg); + #[inline] + pub fn body(self) -> Config { + Config { + _body_type: PhantomData::, + //connect_timeout: self.connect_timeout, + connector: self.connector, + keep_alive: self.keep_alive, + keep_alive_timeout: self.keep_alive_timeout, + max_idle: self.max_idle, + } + } + /// Set the `Connect` type to be used. #[inline] - pub fn connector(self, val: CC) -> Config { + pub fn connector(self, val: CC) -> Config { Config { + _body_type: self._body_type, //connect_timeout: self.connect_timeout, connector: val, keep_alive: self.keep_alive, @@ -271,7 +322,7 @@ impl Config { /// /// Default is enabled. #[inline] - pub fn keep_alive(mut self, val: bool) -> Config { + pub fn keep_alive(mut self, val: bool) -> Config { self.keep_alive = val; self } @@ -280,9 +331,9 @@ impl Config { /// /// Pass `None` to disable timeout. /// - /// Default is 2 minutes. + /// Default is 90 seconds. #[inline] - pub fn keep_alive_timeout(mut self, val: Option) -> Config { + pub fn keep_alive_timeout(mut self, val: Option) -> Config { self.keep_alive_timeout = val; self } @@ -292,29 +343,57 @@ impl Config { /// /// Default is 10 seconds. #[inline] - pub fn connect_timeout(mut self, val: Duration) -> Config { + pub fn connect_timeout(mut self, val: Duration) -> Config { self.connect_timeout = val; self } */ } -impl Config { +impl Config +where C: Connect, + B: Stream, + B::Item: AsRef<[u8]>, +{ /// Construct the Client with this configuration. #[inline] - pub fn build(self, handle: &Handle) -> Client { + pub fn build(self, handle: &Handle) -> Client { Client::configured(self, handle) } } -impl Config { +impl Config +where B: Stream, + B::Item: AsRef<[u8]>, +{ /// Construct the Client with this configuration. #[inline] - pub fn build(self, handle: &Handle) -> Client { + pub fn build(self, handle: &Handle) -> Client { self.connector(HttpConnector::new(4, handle)).build(handle) } } +impl fmt::Debug for Config { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Config") + .field("keep_alive", &self.keep_alive) + .field("keep_alive_timeout", &self.keep_alive_timeout) + .field("max_idle", &self.max_idle) + .finish() + } +} + +impl Clone for Config { + fn clone(&self) -> Config { + Config { + _body_type: PhantomData::, + connector: self.connector.clone(), + keep_alive: self.keep_alive, + keep_alive_timeout: self.keep_alive_timeout, + max_idle: self.max_idle, + } + } +} #[cfg(test)] mod tests { diff --git a/src/client/request.rs b/src/client/request.rs index 28617e93d1..67f7c45fe2 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -10,18 +10,18 @@ use version::HttpVersion; use std::str::FromStr; /// A client request to a remote server. -pub struct Request { +pub struct Request { method: Method, url: Url, version: HttpVersion, headers: Headers, - body: Option, + body: Option, } -impl Request { +impl Request { /// Construct a new Request. #[inline] - pub fn new(method: Method, url: Url) -> Request { + pub fn new(method: Method, url: Url) -> Request { Request { method: method, url: url, @@ -65,10 +65,10 @@ impl Request { /// Set the body of the request. #[inline] - pub fn set_body>(&mut self, body: T) { self.body = Some(body.into()); } + pub fn set_body>(&mut self, body: T) { self.body = Some(body.into()); } } -impl fmt::Debug for Request { +impl fmt::Debug for Request { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Request") .field("method", &self.method) @@ -79,7 +79,7 @@ impl fmt::Debug for Request { } } -pub fn split(req: Request) -> (RequestHead, Option) { +pub fn split(req: Request) -> (RequestHead, Option) { let uri = Uri::from_str(&req.url[::url::Position::BeforePath..::url::Position::AfterQuery]).expect("url is uri"); let head = RequestHead { subject: ::http::RequestLine(req.method, uri), @@ -198,38 +198,5 @@ mod tests { assert_eq!(&s[..request_line.len()], request_line); assert!(s.contains("Host: example.dom")); } - - #[test] - fn test_post_chunked_with_encoding() { - let url = Url::parse("http://example.dom").unwrap(); - let mut req = Request::with_connector( - Post, url, &mut MockConnector - ).unwrap(); - req.headers_mut().set(TransferEncoding(vec![Encoding::Chunked])); - let bytes = run_request(req); - let s = from_utf8(&bytes[..]).unwrap(); - assert!(!s.contains("Content-Length:")); - assert!(s.contains("Transfer-Encoding:")); - } - - #[test] - fn test_write_error_closes() { - let url = Url::parse("http://hyper.rs").unwrap(); - let req = Request::with_connector( - Get, url, &mut MockConnector - ).unwrap(); - let mut req = req.start().unwrap(); - - req.message.downcast_mut::().unwrap() - .get_mut().downcast_mut::().unwrap() - .error_on_write = true; - - req.write(b"foo").unwrap(); - assert!(req.flush().is_err()); - - assert!(req.message.downcast_ref::().unwrap() - .get_ref().downcast_ref::().unwrap() - .is_closed); - } */ } diff --git a/src/http/body.rs b/src/http/body.rs index b8ade3d76b..3f32b75cb4 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -1,5 +1,4 @@ use std::convert::From; -use std::sync::Arc; use tokio_proto; use http::Chunk; @@ -65,12 +64,6 @@ impl From> for Body { } } -impl From>> for Body { - fn from (vec: Arc>) -> Body { - Body(TokioBody::from(Chunk::from(vec))) - } -} - impl From<&'static [u8]> for Body { fn from (slice: &'static [u8]) -> Body { Body(TokioBody::from(Chunk::from(slice))) diff --git a/src/http/chunk.rs b/src/http/chunk.rs index a8d6f408f6..5f3ec62639 100644 --- a/src/http/chunk.rs +++ b/src/http/chunk.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::sync::Arc; use http::buf::MemSlice; @@ -8,7 +7,6 @@ pub struct Chunk(Inner); enum Inner { Owned(Vec), - Referenced(Arc>), Mem(MemSlice), Static(&'static [u8]), } @@ -20,13 +18,6 @@ impl From> for Chunk { } } -impl From>> for Chunk { - #[inline] - fn from(v: Arc>) -> Chunk { - Chunk(Inner::Referenced(v)) - } -} - impl From<&'static [u8]> for Chunk { #[inline] fn from(slice: &'static [u8]) -> Chunk { @@ -68,7 +59,6 @@ impl AsRef<[u8]> for Chunk { fn as_ref(&self) -> &[u8] { match self.0 { Inner::Owned(ref vec) => vec, - Inner::Referenced(ref vec) => vec, Inner::Mem(ref slice) => slice.as_ref(), Inner::Static(slice) => slice, } diff --git a/src/http/conn.rs b/src/http/conn.rs index 3e2b80ce03..95863f9ef2 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -8,7 +8,7 @@ use tokio::io::Io; use tokio_proto::streaming::pipeline::{Frame, Transport}; use header::{ContentLength, TransferEncoding}; -use http::{self, Http1Transaction}; +use http::{self, Http1Transaction, DebugTruncate}; use http::io::{Cursor, Buffered}; use http::h1::{Encoder, Decoder}; use version::HttpVersion; @@ -21,14 +21,19 @@ use version::HttpVersion; /// The connection will determine when a message begins and ends as well as /// determine if this connection can be kept alive after the message, /// or if it is complete. -pub struct Conn { +pub struct Conn { io: Buffered, - state: State, + state: State, _marker: PhantomData } -impl Conn { - pub fn new(io: I, keep_alive: K) -> Conn { +impl Conn +where I: Io, + B: AsRef<[u8]>, + T: Http1Transaction, + K: KeepAlive +{ + pub fn new(io: I, keep_alive: K) -> Conn { Conn { io: Buffered::new(io), state: State { @@ -169,7 +174,10 @@ impl Conn { fn can_write_body(&self) -> bool { match self.state.writing { Writing::Body(..) => true, - _ => false + Writing::Init | + Writing::Ending(..) | + Writing::KeepAlive | + Writing::Closed => false, } } @@ -189,7 +197,8 @@ impl Conn { self.state.keep_alive &= wants_keep_alive; let mut buf = Vec::new(); let encoder = T::encode(&mut head, &mut buf); - self.io.buffer(buf); + //TODO: handle when there isn't enough room to buffer the head + assert!(self.io.buffer(buf) > 0); self.state.writing = if body { Writing::Body(encoder, None) } else { @@ -199,7 +208,7 @@ impl Conn { Ok(AsyncSink::Ready) } - fn write_body(&mut self, chunk: Option) -> StartSend, io::Error> { + fn write_body(&mut self, chunk: Option) -> StartSend, io::Error> { debug_assert!(self.can_write_body()); let state = match self.state.writing { @@ -207,47 +216,41 @@ impl Conn { if queued.is_some() { return Ok(AsyncSink::NotReady(chunk)); } - let mut is_done = true; - let mut wbuf = Cursor::new(match chunk { - Some(chunk) => { - is_done = false; - chunk - } - None => { - // Encode a zero length chunk - // the http1 encoder does the right thing - // encoding either the final chunk or ignoring the input - http::Chunk::from(Vec::new()) - } - }); - - match encoder.encode(&mut self.io, wbuf.buf()) { - Ok(n) => { - wbuf.consume(n); - - if !wbuf.is_written() { - trace!("Conn::start_send frame not written, queued"); - *queued = Some(wbuf); - } - }, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => { - trace!("Conn::start_send frame not written, queued"); - *queued = Some(wbuf); + if let Some(chunk) = chunk { + let mut cursor = Cursor::new(chunk); + match encoder.encode(&mut self.io, cursor.buf()) { + Ok(n) => { + cursor.consume(n); + + if !cursor.is_written() { + trace!("Conn::start_send frame not written, queued"); + *queued = Some(cursor); + } }, - _ => return Err(e) + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + trace!("Conn::start_send frame not written, queued"); + *queued = Some(cursor); + }, + _ => return Err(e) + } } - } - if encoder.is_eof() { - Writing::KeepAlive - } else if is_done { - Writing::Closed + if encoder.is_eof() { + Writing::KeepAlive + } else { + return Ok(AsyncSink::Ready); + } } else { - return Ok(AsyncSink::Ready); + // end of stream, that means we should try to eof + match encoder.eof() { + Ok(Some(end)) => Writing::Ending(Cursor::new(end)), + Ok(None) => Writing::KeepAlive, + Err(_not_eof) => Writing::Closed, + } } }, - Writing::Init | Writing::KeepAlive | Writing::Closed => unreachable!(), + _ => unreachable!(), }; self.state.writing = state; Ok(AsyncSink::Ready) @@ -255,7 +258,7 @@ impl Conn { fn write_queued(&mut self) -> Poll<(), io::Error> { trace!("Conn::write_queued()"); - match self.state.writing { + let state = match self.state.writing { Writing::Body(ref mut encoder, ref mut queued) => { let complete = if let Some(chunk) = queued.as_mut() { let n = try_nb!(encoder.encode(&mut self.io, chunk.buf())); @@ -265,15 +268,26 @@ impl Conn { true }; trace!("Conn::write_queued complete = {}", complete); - if complete { + return if complete { *queued = None; Ok(Async::Ready(())) } else { Ok(Async::NotReady) + }; + }, + Writing::Ending(ref mut ending) => { + let n = self.io.buffer(ending.buf()); + ending.consume(n); + if ending.is_written() { + Writing::KeepAlive + } else { + return Ok(Async::NotReady); } }, - _ => Ok(Async::Ready(())), - } + _ => return Ok(Async::Ready(())), + }; + self.state.writing = state; + Ok(Async::Ready(())) } fn flush(&mut self) -> Poll<(), io::Error> { @@ -289,8 +303,9 @@ impl Conn { } } -impl Stream for Conn +impl Stream for Conn where I: Io, + B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, T::Outgoing: fmt::Debug { @@ -317,12 +332,13 @@ where I: Io, } } -impl Sink for Conn +impl Sink for Conn where I: Io, + B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, T::Outgoing: fmt::Debug { - type SinkItem = Frame, http::Chunk, ::Error>; + type SinkItem = Frame, B, ::Error>; type SinkError = io::Error; fn start_send(&mut self, frame: Self::SinkItem) -> StartSend { @@ -371,7 +387,7 @@ where I: Io, }, }; - error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, frame); + error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame)); Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame")) } @@ -384,13 +400,14 @@ where I: Io, } } -impl Transport for Conn +impl Transport for Conn where I: Io + 'static, + B: AsRef<[u8]> + 'static, T: Http1Transaction + 'static, K: KeepAlive + 'static, T::Outgoing: fmt::Debug {} -impl fmt::Debug for Conn { +impl, T, K: fmt::Debug> fmt::Debug for Conn { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") .field("state", &self.state) @@ -399,10 +416,9 @@ impl fmt::Debug for Conn { } } -#[derive(Debug)] -struct State { +struct State { reading: Reading, - writing: Writing, + writing: Writing, keep_alive: K, } @@ -414,14 +430,41 @@ enum Reading { Closed, } -#[derive(Debug)] -enum Writing { +enum Writing { Init, - Body(Encoder, Option>), + Body(Encoder, Option>), + Ending(Cursor<&'static [u8]>), KeepAlive, Closed, } +impl, K: fmt::Debug> fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("State") + .field("reading", &self.reading) + .field("writing", &self.writing) + .field("keep_alive", &self.keep_alive) + .finish() + } +} + +impl> fmt::Debug for Writing { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Writing::Init => f.write_str("Init"), + Writing::Body(ref enc, ref queued) => f.debug_tuple("Body") + .field(enc) + .field(queued) + .finish(), + Writing::Ending(ref ending) => f.debug_tuple("Ending") + .field(ending) + .finish(), + Writing::KeepAlive => f.write_str("KeepAlive"), + Writing::Closed => f.write_str("Closed"), + } + } +} + impl ::std::ops::BitAndAssign for KA { fn bitand_assign(&mut self, enabled: bool) { if !enabled { @@ -468,7 +511,7 @@ impl KeepAlive for KA { } } -impl State { +impl State { fn close(&mut self) { trace!("State::close()"); self.reading = Reading::Closed; @@ -525,9 +568,9 @@ impl State { // The DebugFrame and DebugChunk are simple Debug implementations that allow // us to dump the frame into logs, without logging the entirety of the bytes. -struct DebugFrame<'a, T: fmt::Debug + 'a>(&'a Frame, http::Chunk, ::Error>); +struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame, B, ::Error>); -impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> { +impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self.0 { Frame::Message { ref message, ref body } => { @@ -538,7 +581,7 @@ impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> { }, Frame::Body { chunk: Some(ref chunk) } => { f.debug_struct("Body") - .field("chunk", &DebugChunk(chunk)) + .field("chunk", &DebugTruncate(chunk.as_ref())) .finish() }, Frame::Body { chunk: None } => { @@ -555,22 +598,12 @@ impl<'a, T: fmt::Debug + 'a> fmt::Debug for DebugFrame<'a, T> { } } -struct DebugChunk<'a>(&'a http::Chunk); - -impl<'a> fmt::Debug for DebugChunk<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("Chunk") - .field(&self.0.len()) - .finish() - } -} - #[cfg(test)] mod tests { use futures::{Async, Stream, Sink}; use tokio_proto::streaming::pipeline::Frame; - use http::{MessageHead, ServerTransaction}; + use http::{self, MessageHead, ServerTransaction}; use http::h1::Encoder; use mock::AsyncIo; @@ -584,7 +617,7 @@ mod tests { let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); let len = good_message.len(); let io = AsyncIo::new_buf(good_message, len); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); match conn.poll().unwrap() { Async::Ready(Some(Frame::Message { message, body: false })) => { @@ -601,7 +634,7 @@ mod tests { fn test_conn_parse_partial() { let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); let io = AsyncIo::new_buf(good_message, 10); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); assert!(conn.poll().unwrap().is_not_ready()); conn.io.io_mut().block_in(50); let async = conn.poll().unwrap(); @@ -615,7 +648,7 @@ mod tests { #[test] fn test_conn_closed_read() { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.close(); match conn.poll().unwrap() { @@ -631,7 +664,7 @@ mod tests { let _ = pretty_env_logger::init(); let _: Result<(), ()> = ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); let max = ::http::io::MAX_BUFFER_SIZE + 4096; conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None); @@ -668,7 +701,7 @@ mod tests { use ::futures::Future; let _: Result<(), ()> = ::futures::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.writing = Writing::Body(Encoder::chunked(), None); assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); @@ -679,7 +712,7 @@ mod tests { #[test] fn test_conn_closed_write() { let io = AsyncIo::new_buf(vec![], 0); - let mut conn = Conn::<_, ServerTransaction>::new(io, Default::default()); + let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.close(); match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { diff --git a/src/http/h1/encode.rs b/src/http/h1/encode.rs index e96dcff02b..4b8bab5b2e 100644 --- a/src/http/h1/encode.rs +++ b/src/http/h1/encode.rs @@ -40,6 +40,14 @@ impl Encoder { } } + pub fn eof(&self) -> Result, NotEof> { + match self.kind { + Kind::Length(0) => Ok(None), + Kind::Chunked(Chunked::Init) => Ok(Some(b"0\r\n\r\n")), + _ => Err(NotEof), + } + } + pub fn encode(&mut self, w: &mut W, msg: &[u8]) -> io::Result { match self.kind { Kind::Chunked(ref mut chunked) => { @@ -67,6 +75,9 @@ impl Encoder { } } +#[derive(Debug)] +pub struct NotEof; + #[derive(Debug, PartialEq, Clone)] enum Chunked { Init, diff --git a/src/http/io.rs b/src/http/io.rs index 067f384e2b..c7a9924e44 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -6,7 +6,7 @@ use std::ptr; use futures::Async; use tokio::io::Io; -use http::{Http1Transaction, h1, MessageHead, ParseResult}; +use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate}; use http::buf::{MemBuf, MemSlice}; const INIT_BUFFER_SIZE: usize = 4096; @@ -91,8 +91,8 @@ impl Buffered { self.read_buf.reserve(INIT_BUFFER_SIZE); } - pub fn buffer>(&mut self, buf: B) { - self.write_buf.buffer(buf.as_ref()); + pub fn buffer>(&mut self, buf: B) -> usize { + self.write_buf.buffer(buf.as_ref()) } #[cfg(test)] @@ -101,24 +101,6 @@ impl Buffered { } } -/* -impl Read for Buffered { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace!("Buffered.read self={}, buf={}", self.read_buf.len(), buf.len()); - unimplemented!() - /* - let n = try!(self.read_buf.bytes().read(buf)); - self.read_buf.consume(n); - if n == 0 { - self.read_buf.reset(); - self.io.read(&mut buf[n..]) - } else { - Ok(n) - } - */ - } -} -*/ impl Write for Buffered { fn write(&mut self, data: &[u8]) -> io::Result { @@ -164,7 +146,7 @@ impl MemRead for Buffered { } #[derive(Clone)] -pub struct Cursor> { +pub struct Cursor { bytes: T, pos: usize, } @@ -211,16 +193,9 @@ impl> Cursor { impl> fmt::Debug for Cursor { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let bytes = self.buf(); - if bytes.len() > 32 { - try!(f.write_str("Cursor([")); - for byte in &bytes[..32] { - try!(write!(f, "{:?}, ", byte)); - } - write!(f, "... {}])", bytes.len()) - } else { - write!(f, "Cursor({:?})", &bytes) - } + f.debug_tuple("Cursor") + .field(&DebugTruncate(self.buf())) + .finish() } } diff --git a/src/http/mod.rs b/src/http/mod.rs index 4a68812733..464583ed64 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -133,6 +133,23 @@ pub trait Http1Transaction { type ParseResult = ::Result, usize)>>; +struct DebugTruncate<'a>(&'a [u8]); + +impl<'a> fmt::Debug for DebugTruncate<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let bytes = self.0; + if bytes.len() > 32 { + try!(f.write_str("[")); + for byte in &bytes[..32] { + try!(write!(f, "{:?}, ", byte)); + } + write!(f, "... {}]", bytes.len()) + } else { + fmt::Debug::fmt(bytes, f) + } + } +} + #[test] fn test_should_keep_alive() { let mut headers = Headers::new(); diff --git a/src/server/mod.rs b/src/server/mod.rs index ca69db6461..eb7b3af872 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,6 +6,7 @@ use std::cell::RefCell; use std::fmt; use std::io; +use std::marker::PhantomData; use std::net::SocketAddr; use std::rc::{Rc, Weak}; use std::time::Duration; @@ -36,29 +37,33 @@ mod response; /// This structure is used to create instances of `Server` or to spawn off tasks /// which handle a connection to an HTTP server. Each instance of `Http` can be /// configured with various protocol-level options such as keepalive. -#[derive(Debug, Clone)] -pub struct Http { +pub struct Http { keep_alive: bool, + _marker: PhantomData, } /// An instance of a server created through `Http::bind`. /// /// This server is intended as a convenience for creating a TCP listener on an /// address and then serving TCP connections accepted with the service provided. -pub struct Server { - protocol: Http, +pub struct Server +where B: Stream, + B::Item: AsRef<[u8]>, +{ + protocol: Http, new_service: S, core: Core, listener: TcpListener, shutdown_timeout: Duration, } -impl Http { +impl + 'static> Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. - pub fn new() -> Http { + pub fn new() -> Http { Http { keep_alive: true, + _marker: PhantomData, } } @@ -80,9 +85,10 @@ impl Http { /// /// The returned `Server` contains one method, `run`, which is used to /// actually run the server. - pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> - where S: NewService + + pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> + where S: NewService, Error = ::Error> + Send + Sync + 'static, + Bd: Stream, { let core = try!(Core::new()); let handle = core.handle(); @@ -111,12 +117,13 @@ impl Http { /// used through the `serve` helper method above. This can be useful, /// however, when writing mocks or accepting sockets from a non-TCP /// location. - pub fn bind_connection(&self, + pub fn bind_connection(&self, handle: &Handle, io: I, remote_addr: SocketAddr, service: S) - where S: Service + 'static, + where S: Service, Error = ::Error> + 'static, + Bd: Stream + 'static, I: Io + 'static, { self.bind_server(handle, io, HttpService { @@ -126,29 +133,48 @@ impl Http { } } +impl Clone for Http { + fn clone(&self) -> Http { + Http { + ..*self + } + } +} + +impl fmt::Debug for Http { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Http") + .field("keep_alive", &self.keep_alive) + .finish() + } +} + #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct ProtoRequest(http::RequestHead); +pub struct __ProtoRequest(http::RequestHead); #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct ProtoResponse(ResponseHead); +pub struct __ProtoResponse(ResponseHead); #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct ProtoTransport(http::Conn); +pub struct __ProtoTransport(http::Conn); #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct ProtoBindTransport { - inner: future::FutureResult, io::Error>, +pub struct __ProtoBindTransport { + inner: future::FutureResult, io::Error>, } -impl ServerProto for Http { - type Request = ProtoRequest; +impl ServerProto for Http +where T: Io + 'static, + B: AsRef<[u8]> + 'static, +{ + type Request = __ProtoRequest; type RequestBody = http::Chunk; - type Response = ProtoResponse; - type ResponseBody = http::Chunk; + type Response = __ProtoResponse; + type ResponseBody = B; type Error = ::Error; - type Transport = ProtoTransport; - type BindTransport = ProtoBindTransport; + type Transport = __ProtoTransport; + type BindTransport = __ProtoBindTransport; fn bind_transport(&self, io: T) -> Self::BindTransport { let ka = if self.keep_alive { @@ -156,14 +182,17 @@ impl ServerProto for Http { } else { http::KA::Disabled }; - ProtoBindTransport { + __ProtoBindTransport { inner: future::ok(http::Conn::new(io, ka)), } } } -impl Sink for ProtoTransport { - type SinkItem = Frame; +impl Sink for __ProtoTransport +where T: Io + 'static, + B: AsRef<[u8]>, +{ + type SinkItem = Frame<__ProtoResponse, B, ::Error>; type SinkError = io::Error; fn start_send(&mut self, item: Self::SinkItem) @@ -179,7 +208,7 @@ impl Sink for ProtoTransport { AsyncSink::Ready => Ok(AsyncSink::Ready), AsyncSink::NotReady(Frame::Message { message, body }) => { Ok(AsyncSink::NotReady(Frame::Message { - message: ProtoResponse(message), + message: __ProtoResponse(message), body: body, })) } @@ -197,8 +226,8 @@ impl Sink for ProtoTransport { } } -impl Stream for ProtoTransport { - type Item = Frame; +impl> Stream for __ProtoTransport { + type Item = Frame<__ProtoRequest, http::Chunk, ::Error>; type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { @@ -208,7 +237,7 @@ impl Stream for ProtoTransport { }; let item = match item { Frame::Message { message, body } => { - Frame::Message { message: ProtoRequest(message), body: body } + Frame::Message { message: __ProtoRequest(message), body: body } } Frame::Body { chunk } => Frame::Body { chunk: chunk }, Frame::Error { error } => Frame::Error { error: error }, @@ -217,7 +246,7 @@ impl Stream for ProtoTransport { } } -impl Transport for ProtoTransport { +impl + 'static> Transport for __ProtoTransport { fn tick(&mut self) { self.0.tick() } @@ -227,12 +256,12 @@ impl Transport for ProtoTransport { } } -impl Future for ProtoBindTransport { - type Item = ProtoTransport; +impl Future for __ProtoBindTransport { + type Item = __ProtoTransport; type Error = io::Error; - fn poll(&mut self) -> Poll, io::Error> { - self.inner.poll().map(|a| a.map(ProtoTransport)) + fn poll(&mut self) -> Poll<__ProtoTransport, io::Error> { + self.inner.poll().map(|a| a.map(__ProtoTransport)) } } @@ -241,24 +270,26 @@ struct HttpService { remote_addr: SocketAddr, } -fn map_response_to_message(res: Response) -> Message { +fn map_response_to_message(res: Response) -> Message<__ProtoResponse, B> { let (head, body) = response::split(res); if let Some(body) = body { - Message::WithBody(ProtoResponse(head), body.into()) + Message::WithBody(__ProtoResponse(head), body.into()) } else { - Message::WithoutBody(ProtoResponse(head)) + Message::WithoutBody(__ProtoResponse(head)) } } type ResponseHead = http::MessageHead<::StatusCode>; -impl Service for HttpService - where T: Service, +impl Service for HttpService + where T: Service, Error=::Error>, + B: Stream, + B::Item: AsRef<[u8]>, { - type Request = Message; - type Response = Message; + type Request = Message<__ProtoRequest, http::TokioBody>; + type Response = Message<__ProtoResponse, B>; type Error = ::Error; - type Future = Map Message>; + type Future = Map) -> Message<__ProtoResponse, B>>; fn call(&self, message: Self::Request) -> Self::Future { let (head, body) = match message { @@ -270,9 +301,11 @@ impl Service for HttpService } } -impl Server - where S: NewService +impl Server + where S: NewService, Error = ::Error> + Send + Sync + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> ::Result { @@ -370,7 +403,9 @@ impl Server } } -impl fmt::Debug for Server { +impl> fmt::Debug for Server +where B::Item: AsRef<[u8]> +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") .field("core", &"...") diff --git a/src/server/response.rs b/src/server/response.rs index 166febd621..57686e1862 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -8,16 +8,15 @@ use version; /// The Response sent to a client after receiving a Request in a Service. /// /// The default `StatusCode` for a `Response` is `200 OK`. -#[derive(Default)] -pub struct Response { +pub struct Response { head: http::MessageHead, - body: Option, + body: Option, } -impl Response { +impl Response { /// Create a new Response. #[inline] - pub fn new() -> Response { + pub fn new() -> Response { Response::default() } @@ -47,7 +46,7 @@ impl Response { /// Set the body. #[inline] - pub fn set_body>(&mut self, body: T) { + pub fn set_body>(&mut self, body: T) { self.body = Some(body.into()); } @@ -82,13 +81,22 @@ impl Response { /// /// Useful for the "builder-style" pattern. #[inline] - pub fn with_body>(mut self, body: T) -> Self { + pub fn with_body>(mut self, body: T) -> Self { self.set_body(body); self } } -impl fmt::Debug for Response { +impl Default for Response { + fn default() -> Response { + Response { + head: Default::default(), + body: None, + } + } +} + +impl fmt::Debug for Response { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Response") .field("status", &self.head.subject) @@ -98,6 +106,6 @@ impl fmt::Debug for Response { } } -pub fn split(res: Response) -> (http::MessageHead, Option) { +pub fn split(res: Response) -> (http::MessageHead, Option) { (res.head, res.body) }