From c119097fd072db51751b100fa186b6f64785954d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 13 Apr 2018 13:20:47 -0700 Subject: [PATCH] feat(http2): add HTTP/2 support for Client and Server --- Cargo.toml | 3 +- examples/multi_server.rs | 4 +- examples/web_api.rs | 2 +- src/body.rs | 30 ++- src/chunk.rs | 55 +++-- src/client/conn.rs | 231 +++++++++++++++++---- src/client/dispatch.rs | 155 +++++++++++--- src/client/mod.rs | 216 ++++++++++++++------ src/client/pool.rs | 430 ++++++++++++++++++++++++++++++--------- src/common/exec.rs | 38 ++++ src/common/mod.rs | 4 + src/error.rs | 11 + src/lib.rs | 2 + src/proto/h1/conn.rs | 10 +- src/proto/h1/dispatch.rs | 38 ++-- src/proto/h1/mod.rs | 2 + src/proto/h2/client.rs | 144 +++++++++++++ src/proto/h2/mod.rs | 120 +++++++++++ src/proto/h2/server.rs | 198 ++++++++++++++++++ src/proto/mod.rs | 4 +- src/server/conn.rs | 55 +++-- src/server/mod.rs | 120 +++++------ tests/integration.rs | 146 +++++++++++++ tests/server.rs | 33 ++- tests/support/mod.rs | 328 +++++++++++++++++++++++++++++ 25 files changed, 2015 insertions(+), 364 deletions(-) create mode 100644 src/common/exec.rs create mode 100644 src/proto/h2/client.rs create mode 100644 src/proto/h2/server.rs create mode 100644 tests/integration.rs create mode 100644 tests/support/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 2750432a75..4d750d6630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ futures-cpupool = "0.1.6" futures-timer = "0.1.0" http = "0.1.5" httparse = "1.0" +h2 = "0.1.5" iovec = "0.1" log = "0.4" net2 = "0.2.32" @@ -35,7 +36,7 @@ tokio = "0.1.5" tokio-executor = "0.1.0" tokio-service = "0.1" tokio-io = "0.1" -want = "0.0.2" +want = "0.0.3" [dev-dependencies] num_cpus = "1.0" diff --git a/examples/multi_server.rs b/examples/multi_server.rs index 239d7abb90..e25bfcec19 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -51,12 +51,12 @@ fn main() { println!("Listening on http://{}", srv2.incoming_ref().local_addr()); tokio::spawn(srv1.for_each(move |conn| { - tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err))); + tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err))); Ok(()) }).map_err(|_| ())); tokio::spawn(srv2.for_each(move |conn| { - tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err))); + tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err))); Ok(()) }).map_err(|_| ())); diff --git a/examples/web_api.rs b/examples/web_api.rs index 607620720a..e20bbe4f6f 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -83,7 +83,7 @@ fn main() { println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr()); serve.map_err(|_| ()).for_each(move |conn| { - tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err))) + tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err))) }) })); } diff --git a/src/body.rs b/src/body.rs index 6e93b32fc6..e272e3087b 100644 --- a/src/body.rs +++ b/src/body.rs @@ -5,6 +5,7 @@ use std::fmt; use bytes::Bytes; use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; +use h2; use http::HeaderMap; use common::Never; @@ -13,9 +14,9 @@ use super::Chunk; type BodySender = mpsc::Sender>; /// This trait represents a streaming body of a `Request` or `Response`. -pub trait Payload { +pub trait Payload: Send + 'static { /// A buffer of bytes representing a single chunk of a body. - type Data: AsRef<[u8]>; + type Data: AsRef<[u8]> + Send; /// The error type of this stream. type Error: Into>; @@ -107,6 +108,7 @@ enum Kind { _close_tx: oneshot::Sender<()>, rx: mpsc::Receiver>, }, + H2(h2::RecvStream), Wrapped(Box> + Send>), Once(Option), Empty, @@ -219,6 +221,10 @@ impl Body { } } + pub(crate) fn h2(recv: h2::RecvStream) -> Self { + Body::new(Kind::H2(recv)) + } + pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { self.delayed_eof = Some(DelayEof::NotEof(fut)); } @@ -269,6 +275,17 @@ impl Body { Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), }, + Kind::H2(ref mut h2) => { + h2.poll() + .map(|async| { + async.map(|opt| { + opt.map(|bytes| { + Chunk::h2(bytes, h2.release_capacity()) + }) + }) + }) + .map_err(::Error::new_body) + }, Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body), Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Empty => Ok(Async::Ready(None)), @@ -291,9 +308,17 @@ impl Payload for Body { self.poll_eof() } + fn poll_trailers(&mut self) -> Poll, Self::Error> { + match self.kind { + Kind::H2(ref mut h2) => h2.poll_trailers().map_err(::Error::new_h2), + _ => Ok(Async::Ready(None)), + } + } + fn is_end_stream(&self) -> bool { match self.kind { Kind::Chan { .. } => false, + Kind::H2(..) => false, Kind::Wrapped(..) => false, Kind::Once(ref val) => val.is_none(), Kind::Empty => true @@ -303,6 +328,7 @@ impl Payload for Body { fn content_length(&self) -> Option { match self.kind { Kind::Chan { .. } => None, + Kind::H2(..) => None, Kind::Wrapped(..) => None, Kind::Once(Some(ref val)) => Some(val.len() as u64), Kind::Once(None) => None, diff --git a/src/chunk.rs b/src/chunk.rs index 3dd3fb762b..f0f8d22054 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,12 +1,38 @@ use std::fmt; use bytes::Bytes; +use h2::ReleaseCapacity; /// A piece of a message body. pub struct Chunk(Inner); -enum Inner { - Shared(Bytes), +struct Inner { + bytes: Bytes, + _flow_control: Option, +} + +struct AutoRelease { + cap: usize, + release: ReleaseCapacity, +} + +impl Drop for AutoRelease { + fn drop(&mut self) { + let _ = self.release.release_capacity(self.cap); + } +} + +impl Chunk { + pub(crate) fn h2(bytes: Bytes, rel_cap: &ReleaseCapacity) -> Chunk { + let cap = bytes.len(); + Chunk(Inner { + bytes: bytes, + _flow_control: Some(AutoRelease { + cap: cap, + release: rel_cap.clone(), + }), + }) + } } impl From> for Chunk { @@ -39,17 +65,18 @@ impl From<&'static str> for Chunk { impl From for Chunk { #[inline] - fn from(mem: Bytes) -> Chunk { - Chunk(Inner::Shared(mem)) + fn from(bytes: Bytes) -> Chunk { + Chunk(Inner { + bytes: bytes, + _flow_control: None, + }) } } impl From for Bytes { #[inline] fn from(chunk: Chunk) -> Bytes { - match chunk.0 { - Inner::Shared(bytes) => bytes, - } + chunk.0.bytes } } @@ -65,9 +92,7 @@ impl ::std::ops::Deref for Chunk { impl AsRef<[u8]> for Chunk { #[inline] fn as_ref(&self) -> &[u8] { - match self.0 { - Inner::Shared(ref slice) => slice, - } + &self.0.bytes } } @@ -81,7 +106,7 @@ impl fmt::Debug for Chunk { impl Default for Chunk { #[inline] fn default() -> Chunk { - Chunk(Inner::Shared(Bytes::new())) + Chunk::from(Bytes::new()) } } @@ -91,17 +116,13 @@ impl IntoIterator for Chunk { #[inline] fn into_iter(self) -> Self::IntoIter { - match self.0 { - Inner::Shared(bytes) => bytes.into_iter(), - } + self.0.bytes.into_iter() } } impl Extend for Chunk { #[inline] fn extend(&mut self, iter: T) where T: IntoIterator { - match self.0 { - Inner::Shared(ref mut bytes) => bytes.extend(iter) - } + self.0.bytes.extend(iter) } } diff --git a/src/client/conn.rs b/src/client/conn.rs index feea7bf785..90eb4f6b27 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -16,6 +16,7 @@ use futures::future::{self, Either}; use tokio_io::{AsyncRead, AsyncWrite}; use body::Payload; +use common::Exec; use proto; use super::dispatch; use {Body, Request, Response, StatusCode}; @@ -25,7 +26,7 @@ use {Body, Request, Response, StatusCode}; /// This is a shortcut for `Builder::new().handshake(io)`. pub fn handshake(io: T) -> Handshake where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, { Builder::new() .handshake(io) @@ -33,10 +34,10 @@ where /// The sender side of an established connection. pub struct SendRequest { - dispatch: dispatch::Sender, Response>, - + dispatch: dispatch::Sender, Response>, } + /// A future that processes all HTTP state for the IO object. /// /// In most cases, this should just be spawned into an executor, so that it @@ -44,15 +45,17 @@ pub struct SendRequest { #[must_use = "futures do nothing unless polled"] pub struct Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { - inner: proto::dispatch::Dispatcher< - proto::dispatch::Client, - B, - T, - B::Data, - proto::ClientUpgradeTransaction, + inner: Either< + proto::dispatch::Dispatcher< + proto::dispatch::Client, + B, + T, + proto::ClientUpgradeTransaction, + >, + proto::h2::Client, >, } @@ -62,7 +65,9 @@ where /// After setting options, the builder is used to create a `Handshake` future. #[derive(Clone, Debug)] pub struct Builder { + exec: Exec, h1_writev: bool, + http2: bool, } /// A future setting up HTTP over an IO object. @@ -103,7 +108,18 @@ pub struct Parts { _inner: (), } -// internal client api +// ========== internal client api + +/// A `Future` for when `SendRequest::poll_ready()` is ready. +pub(super) struct WhenReady { + tx: Option>, +} + +// A `SendRequest` that can be cloned to send HTTP2 requests. +// private for now, probably not a great idea of a type... +pub(super) struct Http2SendRequest { + dispatch: dispatch::UnboundedSender, Response>, +} #[must_use = "futures do nothing unless polled"] pub(super) struct HandshakeNoUpgrades { @@ -127,6 +143,12 @@ impl SendRequest self.dispatch.poll_ready() } + pub(super) fn when_ready(self) -> WhenReady { + WhenReady { + tx: Some(self), + } + } + pub(super) fn is_ready(&self) -> bool { self.dispatch.is_ready() } @@ -134,6 +156,12 @@ impl SendRequest pub(super) fn is_closed(&self) -> bool { self.dispatch.is_closed() } + + pub(super) fn into_http2(self) -> Http2SendRequest { + Http2SendRequest { + dispatch: self.dispatch.unbound(), + } + } } impl SendRequest @@ -257,16 +285,81 @@ impl fmt::Debug for SendRequest { } } +// ===== impl Http2SendRequest + +impl Http2SendRequest { + pub(super) fn is_ready(&self) -> bool { + self.dispatch.is_ready() + } + + pub(super) fn is_closed(&self) -> bool { + self.dispatch.is_closed() + } +} + +impl Http2SendRequest +where + B: Payload + 'static, +{ + //TODO: replace with `impl Future` when stable + pub(super) fn send_request_retryable(&mut self, req: Request) -> Box, Error=(::Error, Option>)> + Send> + where + B: Send, + { + let inner = match self.dispatch.try_send(req) { + Ok(rx) => { + Either::A(rx.then(move |res| { + match res { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + } + })) + }, + Err(req) => { + debug!("connection was not ready"); + let err = ::Error::new_canceled(Some("connection was not ready")); + Either::B(future::err((err, Some(req)))) + } + }; + Box::new(inner) + } +} + +impl fmt::Debug for Http2SendRequest { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Http2SendRequest") + .finish() + } +} + +impl Clone for Http2SendRequest { + fn clone(&self) -> Self { + Http2SendRequest { + dispatch: self.dispatch.clone(), + } + } +} + // ===== impl Connection impl Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { /// Return the inner IO object, and additional information. + /// + /// Only works for HTTP/1 connections. HTTP/2 connections will panic. pub fn into_parts(self) -> Parts { - let (io, read_buf) = self.inner.into_inner(); + let (io, read_buf) = match self.inner { + Either::A(h1) => h1.into_inner(), + Either::B(_h2) => { + panic!("http2 cannot into_inner"); + } + }; + Parts { io: io, read_buf: read_buf, @@ -282,13 +375,20 @@ where /// but it is not desired to actally shutdown the IO object. Instead you /// would take it back using `into_parts`. pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { - self.inner.poll_without_shutdown() + match self.inner { + Either::A(ref mut h1) => { + h1.poll_without_shutdown() + }, + Either::B(ref mut h2) => { + h2.poll() + } + } } } impl Future for Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { type Item = (); @@ -301,7 +401,7 @@ where impl fmt::Debug for Connection where - T: AsyncRead + AsyncWrite + fmt::Debug, + T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, B: Payload + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -317,20 +417,37 @@ impl Builder { #[inline] pub fn new() -> Builder { Builder { + exec: Exec::Default, h1_writev: true, + http2: false, } } + /* + pub(super) fn exec(&mut self, exec: Exec) -> &mut Builder { + self.exec = exec; + self + } + */ + pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder { self.h1_writev = enabled; self } + /// Sets whether HTTP2 is required. + /// + /// Default is false. + pub fn http2_only(&mut self, enabled: bool) -> &mut Builder { + self.http2 = enabled; + self + } + /// Constructs a connection with the configured options and IO. #[inline] pub fn handshake(&self, io: T) -> Handshake where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { Handshake { @@ -344,7 +461,7 @@ impl Builder { pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { HandshakeNoUpgrades { @@ -361,7 +478,7 @@ impl Builder { impl Future for Handshake where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { type Item = (SendRequest, Connection); @@ -386,15 +503,17 @@ impl fmt::Debug for Handshake { impl Future for HandshakeNoUpgrades where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { - type Item = (SendRequest, proto::dispatch::Dispatcher< - proto::dispatch::Client, - B, - T, - B::Data, - proto::ClientTransaction, + type Item = (SendRequest, Either< + proto::h1::Dispatcher< + proto::h1::dispatch::Client, + B, + T, + proto::ClientTransaction, + >, + proto::h2::Client, >); type Error = ::Error; @@ -405,35 +524,45 @@ where impl Future for HandshakeInner where - T: AsyncRead + AsyncWrite, - B: Payload + 'static, + T: AsyncRead + AsyncWrite + Send + 'static, + B: Payload, R: proto::Http1Transaction< Incoming=StatusCode, Outgoing=proto::RequestLine, >, { - type Item = (SendRequest, proto::dispatch::Dispatcher< - proto::dispatch::Client, - B, - T, - B::Data, - R, + type Item = (SendRequest, Either< + proto::h1::Dispatcher< + proto::h1::dispatch::Client, + B, + T, + R, + >, + proto::h2::Client, >); type Error = ::Error; fn poll(&mut self) -> Poll { let io = self.io.take().expect("polled more than once"); let (tx, rx) = dispatch::channel(); - let mut conn = proto::Conn::new(io); - if !self.builder.h1_writev { - conn.set_write_strategy_flatten(); - } - let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); + let either = if !self.builder.http2 { + let mut conn = proto::Conn::new(io); + if !self.builder.h1_writev { + conn.set_write_strategy_flatten(); + } + let cd = proto::h1::dispatch::Client::new(rx); + let dispatch = proto::h1::Dispatcher::new(cd, conn); + Either::A(dispatch) + } else { + let h2 = proto::h2::Client::new(io, rx, self.builder.exec.clone()); + Either::B(h2) + }; + Ok(Async::Ready(( SendRequest { dispatch: tx, }, - dispatch, + either, ))) } } @@ -457,6 +586,24 @@ impl fmt::Debug for ResponseFuture { } } +// ===== impl WhenReady + +impl Future for WhenReady { + type Item = SendRequest; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + let mut tx = self.tx.take().expect("polled after complete"); + match tx.poll_ready()? { + Async::Ready(()) => Ok(Async::Ready(tx)), + Async::NotReady => { + self.tx = Some(tx); + Ok(Async::NotReady) + } + } + } +} + // assert trait markers trait AssertSend: Send {} @@ -469,7 +616,7 @@ impl AssertSendSync for SendRequest {} #[doc(hidden)] impl AssertSend for Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, B::Data: Send + 'static, {} @@ -477,7 +624,7 @@ where #[doc(hidden)] impl AssertSendSync for Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, B::Data: Send + Sync + 'static, {} diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 5b54b71829..7915a6b001 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,17 +1,19 @@ +use std::sync::Arc; + use futures::{Async, Poll, Stream}; use futures::sync::{mpsc, oneshot}; use want; use common::Never; -//pub type Callback = oneshot::Sender)>>; pub type RetryPromise = oneshot::Receiver)>>; pub type Promise = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { - let (tx, rx) = mpsc::channel(0); + let (tx, rx) = mpsc::unbounded(); let (giver, taker) = want::new(); let tx = Sender { + buffered_once: false, giver: giver, inner: tx, }; @@ -22,28 +24,38 @@ pub fn channel() -> (Sender, Receiver) { (tx, rx) } +/// A bounded sender of requests and callbacks for when responses are ready. +/// +/// While the inner sender is unbounded, the Giver is used to determine +/// if the Receiver is ready for another request. pub struct Sender { - // The Giver helps watch that the the Receiver side has been polled - // when the queue is empty. This helps us know when a request and - // response have been fully processed, and a connection is ready - // for more. + /// One message is always allowed, even if the Receiver hasn't asked + /// for it yet. This boolean keeps track of whether we've sent one + /// without notice. + buffered_once: bool, + /// The Giver helps watch that the the Receiver side has been polled + /// when the queue is empty. This helps us know when a request and + /// response have been fully processed, and a connection is ready + /// for more. giver: want::Giver, - //inner: mpsc::Sender<(T, Callback)>, - inner: mpsc::Sender>, + /// Actually bounded by the Giver, plus `buffered_once`. + inner: mpsc::UnboundedSender>, +} + +/// An unbounded version. +/// +/// Cannot poll the Giver, but can still use it to determine if the Receiver +/// has been dropped. However, this version can be cloned. +pub struct UnboundedSender { + // Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. + giver: Arc, + inner: mpsc::UnboundedSender>, } impl Sender { pub fn poll_ready(&mut self) -> Poll<(), ::Error> { - match self.inner.poll_ready() { - Ok(Async::Ready(())) => { - // there's room in the queue, but does the Connection - // want a message yet? - self.giver.poll_want() - .map_err(|_| ::Error::new_closed()) - }, - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err(::Error::new_closed()), - } + self.giver.poll_want() + .map_err(|_| ::Error::new_closed()) } pub fn is_ready(&self) -> bool { @@ -54,24 +66,75 @@ impl Sender { self.giver.is_canceled() } + fn can_send(&mut self) -> bool { + if self.giver.give() || !self.buffered_once { + // If the receiver is ready *now*, then of course we can send. + // + // If the receiver isn't ready yet, but we don't have anything + // in the channel yet, then allow one message. + self.buffered_once = true; + true + } else { + false + } + } + pub fn try_send(&mut self, val: T) -> Result, T> { + if !self.can_send() { + return Err(val); + } let (tx, rx) = oneshot::channel(); - self.inner.try_send(Envelope(Some((val, Callback::Retry(tx))))) + self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx))))) .map(move |_| rx) .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) } pub fn send(&mut self, val: T) -> Result, T> { + if !self.can_send() { + return Err(val); + } let (tx, rx) = oneshot::channel(); - self.inner.try_send(Envelope(Some((val, Callback::NoRetry(tx))))) + self.inner.unbounded_send(Envelope(Some((val, Callback::NoRetry(tx))))) .map(move |_| rx) .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) } + + pub fn unbound(self) -> UnboundedSender { + UnboundedSender { + giver: Arc::new(self.giver), + inner: self.inner, + } + } +} + +impl UnboundedSender { + pub fn is_ready(&self) -> bool { + self.giver.is_wanting() + } + + pub fn is_closed(&self) -> bool { + self.giver.is_canceled() + } + + pub fn try_send(&mut self, val: T) -> Result, T> { + let (tx, rx) = oneshot::channel(); + self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx))))) + .map(move |_| rx) + .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) + } +} + +impl Clone for UnboundedSender { + fn clone(&self) -> Self { + UnboundedSender { + giver: self.giver.clone(), + inner: self.inner.clone(), + } + } } pub struct Receiver { - //inner: mpsc::Receiver<(T, Callback)>, - inner: mpsc::Receiver>, + inner: mpsc::UnboundedReceiver>, taker: want::Taker, } @@ -166,19 +229,21 @@ mod tests { #[cfg(feature = "nightly")] extern crate test; - use futures::{future, Future}; + use futures::{future, Future, Stream}; - #[cfg(feature = "nightly")] - use futures::{Stream}; + + #[derive(Debug)] + struct Custom(i32); #[test] fn drop_receiver_sends_cancel_errors() { let _ = pretty_env_logger::try_init(); future::lazy(|| { - #[derive(Debug)] - struct Custom(i32); - let (mut tx, rx) = super::channel::(); + let (mut tx, mut rx) = super::channel::(); + + // must poll once for try_send to succeed + assert!(rx.poll().expect("rx empty").is_not_ready()); let promise = tx.try_send(Custom(43)).unwrap(); drop(rx); @@ -198,6 +263,40 @@ mod tests { }).wait().unwrap(); } + #[test] + fn sender_checks_for_want_on_send() { + future::lazy(|| { + let (mut tx, mut rx) = super::channel::(); + // one is allowed to buffer, second is rejected + let _ = tx.try_send(Custom(1)).expect("1 buffered"); + tx.try_send(Custom(2)).expect_err("2 not ready"); + + assert!(rx.poll().expect("rx 1").is_ready()); + // Even though 1 has been popped, only 1 could be buffered for the + // lifetime of the channel. + tx.try_send(Custom(2)).expect_err("2 still not ready"); + + assert!(rx.poll().expect("rx empty").is_not_ready()); + let _ = tx.try_send(Custom(2)).expect("2 ready"); + + Ok::<(), ()>(()) + }).wait().unwrap(); + } + + #[test] + fn unbounded_sender_doesnt_bound_on_want() { + let (tx, rx) = super::channel::(); + let mut tx = tx.unbound(); + + let _ = tx.try_send(Custom(1)).unwrap(); + let _ = tx.try_send(Custom(2)).unwrap(); + let _ = tx.try_send(Custom(3)).unwrap(); + + drop(rx); + + let _ = tx.try_send(Custom(4)).unwrap_err(); + } + #[cfg(feature = "nightly")] #[bench] fn giver_queue_throughput(b: &mut test::Bencher) { diff --git a/src/client/mod.rs b/src/client/mod.rs index 73cbbbb720..e14ae5d85f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -6,16 +6,16 @@ use std::sync::Arc; use std::time::Duration; use futures::{Async, Future, Poll}; -use futures::future::{self, Executor}; +use futures::future::{self, Either, Executor}; use futures::sync::oneshot; use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; use http::uri::Scheme; -use tokio_executor::spawn; pub use tokio_service::Service; use body::{Body, Payload}; -use self::pool::Pool; +use common::Exec; +use self::pool::{Pool, Poolable, Reservation}; pub use self::connect::{Connect, HttpConnector}; @@ -37,6 +37,7 @@ pub struct Client { pool: Pool>, retry_canceled_requests: bool, set_host: bool, + ver: Ver, } impl Client { @@ -143,7 +144,7 @@ where C: Connect + Sync + 'static, } }; - if self.set_host { + if self.set_host && self.ver == Ver::Http1 { if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") { let hostname = uri.host().expect("authority implies host"); let host = if let Some(port) = uri.port() { @@ -171,50 +172,78 @@ where C: Connect + Sync + 'static, //TODO: replace with `impl Future` when stable fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError> + Send> { let url = req.uri().clone(); - let checkout = self.pool.checkout(domain); + let ver = self.ver; + let pool_key = (Arc::new(domain.to_string()), self.ver); + let checkout = self.pool.checkout(pool_key.clone()); let connect = { let executor = self.executor.clone(); let pool = self.pool.clone(); - let pool_key = Arc::new(domain.to_string()); let h1_writev = self.h1_writev; let connector = self.connector.clone(); let dst = Destination { uri: url, }; future::lazy(move || { - connector.connect(dst) - .map_err(::Error::new_connect) - .and_then(move |(io, connected)| { - conn::Builder::new() - .h1_writev(h1_writev) - .handshake_no_upgrades(io) - .and_then(move |(tx, conn)| { - executor.execute(conn.map_err(|e| debug!("client connection error: {}", e))); - Ok(pool.pooled(pool_key, PoolClient { - is_proxied: connected.is_proxied, - tx: tx, - })) - }) - }) + if let Some(connecting) = pool.connecting(&pool_key) { + Either::A(connector.connect(dst) + .map_err(::Error::new_connect) + .and_then(move |(io, connected)| { + conn::Builder::new() + .h1_writev(h1_writev) + .http2_only(pool_key.1 == Ver::Http2) + .handshake_no_upgrades(io) + .and_then(move |(tx, conn)| { + executor.execute(conn.map_err(|e| { + debug!("client connection error: {}", e) + })); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + tx.when_ready() + }) + .map(move |tx| { + pool.pooled(connecting, PoolClient { + is_proxied: connected.is_proxied, + tx: match ver { + Ver::Http1 => PoolTx::Http1(tx), + Ver::Http2 => PoolTx::Http2(tx.into_http2()), + }, + }) + }) + })) + } else { + let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); + Either::B(future::err(canceled)) + } }) }; let race = checkout.select(connect) .map(|(pooled, _work)| pooled) - .map_err(|(e, _checkout)| { - // the Pool Checkout cannot error, so the only error - // is from the Connector - // XXX: should wait on the Checkout? Problem is - // that if the connector is failing, it may be that we - // never had a pooled stream at all - ClientError::Normal(e) + .or_else(|(e, other)| { + // Either checkout or connect could get canceled: + // + // 1. Connect is canceled if this is HTTP/2 and there is + // an outstanding HTTP/2 connecting task. + // 2. Checkout is canceled if the pool cannot deliver an + // idle connection reliably. + // + // In both cases, we should just wait for the other future. + if e.is_canceled() { + //trace!("checkout/connect race canceled: {}", e); + Either::A(other.map_err(ClientError::Normal)) + } else { + Either::B(future::err(ClientError::Normal(e))) + } }); let executor = self.executor.clone(); let resp = race.and_then(move |mut pooled| { let conn_reused = pooled.is_reused(); - set_relative_uri(req.uri_mut(), pooled.is_proxied); - let fut = pooled.tx.send_request_retryable(req) + if ver == Ver::Http1 { + set_relative_uri(req.uri_mut(), pooled.is_proxied); + } + let fut = pooled.send_request_retryable(req) .map_err(move |(err, orig_req)| { if let Some(req) = orig_req { ClientError::Canceled { @@ -235,14 +264,14 @@ where C: Connect + Sync + 'static, // for a new request to start. // // It won't be ready if there is a body to stream. - if pooled.tx.is_ready() { + if pooled.is_ready() { drop(pooled); } else if !res.body().is_empty() { let (delayed_tx, delayed_rx) = oneshot::channel(); res.body_mut().delayed_eof(delayed_rx); executor.execute( future::poll_fn(move || { - pooled.tx.poll_ready() + pooled.poll_ready() }) .then(move |_| { // At this point, `pooled` is dropped, and had a chance @@ -291,6 +320,7 @@ impl Clone for Client { pool: self.pool.clone(), retry_canceled_requests: self.retry_canceled_requests, set_host: self.set_host, + ver: self.ver, } } } @@ -366,15 +396,74 @@ where struct PoolClient { is_proxied: bool, - tx: conn::SendRequest, + tx: PoolTx, } -impl self::pool::Closed for PoolClient +enum PoolTx { + Http1(conn::SendRequest), + Http2(conn::Http2SendRequest), +} + +impl PoolClient { + fn poll_ready(&mut self) -> Poll<(), ::Error> { + match self.tx { + PoolTx::Http1(ref mut tx) => tx.poll_ready(), + PoolTx::Http2(_) => Ok(Async::Ready(())), + } + } + + fn is_ready(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_ready(), + PoolTx::Http2(ref tx) => tx.is_ready(), + } + } +} + +impl PoolClient { + //TODO: replace with `impl Future` when stable + fn send_request_retryable(&mut self, req: Request) -> Box, Error=(::Error, Option>)> + Send> + where + B: Send, + { + match self.tx { + PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req), + PoolTx::Http2(ref mut tx) => tx.send_request_retryable(req), + } + } +} + +impl Poolable for PoolClient where B: 'static, { fn is_closed(&self) -> bool { - self.tx.is_closed() + match self.tx { + PoolTx::Http1(ref tx) => tx.is_closed(), + PoolTx::Http2(ref tx) => tx.is_closed(), + } + } + + fn reserve(self) -> Reservation { + match self.tx { + PoolTx::Http1(tx) => { + Reservation::Unique(PoolClient { + is_proxied: self.is_proxied, + tx: PoolTx::Http1(tx), + }) + }, + PoolTx::Http2(tx) => { + let b = PoolClient { + is_proxied: self.is_proxied, + tx: PoolTx::Http2(tx.clone()), + }; + let a = PoolClient { + is_proxied: self.is_proxied, + tx: PoolTx::Http2(tx), + }; + Reservation::Shared(a, b) + } + } } } @@ -387,17 +476,24 @@ enum ClientError { } } +/// A marker to identify what version a pooled connection is. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +enum Ver { + Http1, + Http2, +} + fn set_relative_uri(uri: &mut Uri, is_proxied: bool) { if is_proxied && uri.scheme_part() != Some(&Scheme::HTTPS) { return; } let path = match uri.path_and_query() { - Some(path) => { + Some(path) if path.as_str() != "/" => { let mut parts = ::http::uri::Parts::default(); parts.path_and_query = Some(path.clone()); Uri::from_parts(parts).expect("path is valid uri") }, - None => { + _none_or_just_slash => { "/".parse().expect("/ is valid path") } }; @@ -416,6 +512,7 @@ pub struct Builder { max_idle: usize, retry_canceled_requests: bool, set_host: bool, + ver: Ver, } impl Default for Builder { @@ -428,6 +525,7 @@ impl Default for Builder { max_idle: 5, retry_canceled_requests: true, set_host: true, + ver: Ver::Http1, } } } @@ -467,6 +565,20 @@ impl Builder { self } + /// Set whether the connection **must** use HTTP/2. + /// + /// Note that setting this to true prevents HTTP/1 from being allowed. + /// + /// Default is false. + pub fn http2_only(&mut self, val: bool) -> &mut Self { + self.ver = if val { + Ver::Http2 + } else { + Ver::Http1 + }; + self + } + /// Set whether to retry requests that get disrupted before ever starting /// to write. /// @@ -534,6 +646,7 @@ impl Builder { pool: Pool::new(self.keep_alive, self.keep_alive_timeout), retry_canceled_requests: self.retry_canceled_requests, set_host: self.set_host, + ver: self.ver, } } } @@ -546,33 +659,20 @@ impl fmt::Debug for Builder { .field("http1_writev", &self.h1_writev) .field("max_idle", &self.max_idle) .field("set_host", &self.set_host) + .field("version", &self.ver) .finish() } } -// ===== impl Exec ===== - -#[derive(Clone)] -enum Exec { - Default, - Executor(Arc + Send>> + Send + Sync>), -} +#[cfg(test)] +mod unit_tests { + use super::*; + #[test] + fn set_relative_uri_with_implicit_path() { + let mut uri = "http://hyper.rs".parse().unwrap(); + set_relative_uri(&mut uri, false); -impl Exec { - fn execute(&self, fut: F) - where - F: Future + Send + 'static, - { - match *self { - Exec::Default => spawn(fut), - Exec::Executor(ref e) => { - let _ = e.execute(Box::new(fut)) - .map_err(|err| { - panic!("executor error: {:?}", err.kind()); - }); - }, - } + assert_eq!(uri.to_string(), "/"); } } - diff --git a/src/client/pool.rs b/src/client/pool.rs index 19679494df..a5730f8284 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, Weak}; @@ -8,10 +8,10 @@ use futures::{Future, Async, Poll, Stream}; use futures::sync::oneshot; use futures_timer::Interval; -use common::Never; -use super::Exec; +use common::{Exec, Never}; +use super::Ver; -pub struct Pool { +pub(super) struct Pool { inner: Arc>>, } @@ -20,15 +20,42 @@ pub struct Pool { // This is a trait to allow the `client::pool::tests` to work for `i32`. // // See https://github.com/hyperium/hyper/issues/1429 -pub trait Closed { +pub(super) trait Poolable: Sized { fn is_closed(&self) -> bool; + /// Reserve this connection. + /// + /// Allows for HTTP/2 to return a shared reservation. + fn reserve(self) -> Reservation; } +/// When checking out a pooled connection, it might be that the connection +/// only supports a single reservation, or it might be usable for many. +/// +/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be +/// used for multiple requests. +pub(super) enum Reservation { + /// This connection could be used multiple times, the first one will be + /// reinserted into the `idle` pool, and the second will be given to + /// the `Checkout`. + #[allow(unused)] + Shared(T, T), + /// This connection requires unique access. It will be returned after + /// use is complete. + Unique(T), +} + +/// Simple type alias in case the key type needs to be adjusted. +type Key = (Arc, Ver); + struct PoolInner { + // A flag that a connection is being estabilished, and the connection + // should be shared. This prevents making multiple HTTP/2 connections + // to the same host. + connecting: HashSet, enabled: bool, // These are internal Conns sitting in the event loop in the KeepAlive // state, waiting to receive a new Request to send on the socket. - idle: HashMap, Vec>>, + idle: HashMap>>, // These are outstanding Checkouts that are waiting for a socket to be // able to send a Request one. This is used when "racing" for a new // connection. @@ -38,7 +65,7 @@ struct PoolInner { // this list is checked for any parked Checkouts, and tries to notify // them that the Conn could be used instead of waiting for a brand new // connection. - parked: HashMap, VecDeque>>, + parked: HashMap>>, timeout: Option, // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. @@ -49,6 +76,7 @@ impl Pool { pub fn new(enabled: bool, timeout: Option) -> Pool { Pool { inner: Arc::new(Mutex::new(PoolInner { + connecting: HashSet::new(), enabled: enabled, idle: HashMap::new(), idle_interval_ref: None, @@ -59,40 +87,69 @@ impl Pool { } } -impl Pool { - pub fn checkout(&self, key: &str) -> Checkout { +impl Pool { + /// Returns a `Checkout` which is a future that resolves if an idle + /// connection becomes available. + pub fn checkout(&self, key: Key) -> Checkout { Checkout { - key: Arc::new(key.to_owned()), + key, pool: self.clone(), parked: None, } } - fn take(&self, key: &Arc) -> Option> { + /// Ensure that there is only ever 1 connecting task for HTTP/2 + /// connections. This does nothing for HTTP/1. + pub(super) fn connecting(&self, key: &Key) -> Option> { + if key.1 == Ver::Http2 { + let mut inner = self.inner.lock().unwrap(); + if inner.connecting.insert(key.clone()) { + let connecting = Connecting { + key: key.clone(), + pool: Arc::downgrade(&self.inner), + }; + Some(connecting) + } else { + trace!("HTTP/2 connecting already in progress for {:?}", key.0); + None + } + } else { + Some(Connecting { + key: key.clone(), + // in HTTP/1's case, there is never a lock, so we don't + // need to do anything in Drop. + pool: Weak::new(), + }) + } + } + + fn take(&self, key: &Key) -> Option> { let entry = { let mut inner = self.inner.lock().unwrap(); let expiration = Expiration::new(inner.timeout); - let mut should_remove = false; - let entry = inner.idle.get_mut(key).and_then(|list| { - trace!("take; url = {:?}, expiration = {:?}", key, expiration.0); - while let Some(entry) = list.pop() { - if !expiration.expires(entry.idle_at) { - if !entry.value.is_closed() { - should_remove = list.is_empty(); - return Some(entry); - } + let maybe_entry = inner.idle.get_mut(key) + .and_then(|list| { + trace!("take? {:?}: expiration = {:?}", key, expiration.0); + // A block to end the mutable borrow on list, + // so the map below can check is_empty() + { + let popper = IdlePopper { + key, + list, + }; + popper.pop(&expiration) } - trace!("removing unacceptable pooled {:?}", key); - // every other case the Idle should just be dropped - // 1. Idle but expired - // 2. Busy (something else somehow took it?) - // 3. Disabled don't reuse of course - } - should_remove = true; - None - }); + .map(|e| (e, list.is_empty())) + }); - if should_remove { + let (entry, empty) = if let Some((e, empty)) = maybe_entry { + (Some(e), empty) + } else { + // No entry found means nuke the list for sure. + (None, true) + }; + if empty { + //TODO: This could be done with the HashMap::entry API instead. inner.idle.remove(key); } entry @@ -101,17 +158,35 @@ impl Pool { entry.map(|e| self.reuse(key, e.value)) } - - pub fn pooled(&self, key: Arc, value: T) -> Pooled { + pub(super) fn pooled(&self, mut connecting: Connecting, value: T) -> Pooled { + let value = match value.reserve() { + Reservation::Shared(to_insert, to_return) => { + debug_assert_eq!( + connecting.key.1, + Ver::Http2, + "shared reservation without Http2" + ); + let mut inner = self.inner.lock().unwrap(); + inner.put(connecting.key.clone(), to_insert); + // Do this here instead of Drop for Connecting because we + // already have a lock, no need to lock the mutex twice. + inner.connected(&connecting.key); + // prevent the Drop of Connecting from repeating inner.connected() + connecting.pool = Weak::new(); + + to_return + }, + Reservation::Unique(value) => value, + }; Pooled { is_reused: false, - key: key, + key: connecting.key.clone(), pool: Arc::downgrade(&self.inner), value: Some(value) } } - fn reuse(&self, key: &Arc, value: T) -> Pooled { + fn reuse(&self, key: &Key, value: T) -> Pooled { debug!("reuse idle connection for {:?}", key); Pooled { is_reused: true, @@ -121,8 +196,8 @@ impl Pool { } } - fn park(&mut self, key: Arc, tx: oneshot::Sender) { - trace!("park; waiting for idle connection: {:?}", key); + fn park(&mut self, key: Key, tx: oneshot::Sender) { + trace!("checkout waiting for idle connection: {:?}", key); self.inner.lock().unwrap() .parked.entry(key) .or_insert(VecDeque::new()) @@ -130,19 +205,83 @@ impl Pool { } } -impl PoolInner { - fn put(&mut self, key: Arc, value: T) { +/// Pop off this list, looking for a usable connection that hasn't expired. +struct IdlePopper<'a, T: 'a> { + key: &'a Key, + list: &'a mut Vec>, +} + +impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { + fn pop(self, expiration: &Expiration) -> Option> { + while let Some(entry) = self.list.pop() { + // If the connection has been closed, or is older than our idle + // timeout, simply drop it and keep looking... + // + // TODO: Actually, since the `idle` list is pushed to the end always, + // that would imply that if *this* entry is expired, then anything + // "earlier" in the list would *have* to be expired also... Right? + // + // In that case, we could just break out of the loop and drop the + // whole list... + if entry.value.is_closed() || expiration.expires(entry.idle_at) { + trace!("remove unacceptable pooled connection for {:?}", self.key); + continue; + } + + let value = match entry.value.reserve() { + Reservation::Shared(to_reinsert, to_checkout) => { + self.list.push(Idle { + idle_at: Instant::now(), + value: to_reinsert, + }); + to_checkout + }, + Reservation::Unique(unique) => { + unique + } + }; + + return Some(Idle { + idle_at: entry.idle_at, + value, + }); + } + + None + } +} + +impl PoolInner { + fn put(&mut self, key: Key, value: T) { if !self.enabled { return; } + if key.1 == Ver::Http2 && self.idle.contains_key(&key) { + trace!("Pool::put; existing idle HTTP/2 connection for {:?}", key); + return; + } trace!("Pool::put {:?}", key); let mut remove_parked = false; let mut value = Some(value); if let Some(parked) = self.parked.get_mut(&key) { while let Some(tx) = parked.pop_front() { if !tx.is_canceled() { - match tx.send(value.take().unwrap()) { - Ok(()) => break, + let reserved = value.take().expect("value already sent"); + let reserved = match reserved.reserve() { + Reservation::Shared(to_keep, to_send) => { + value = Some(to_keep); + to_send + }, + Reservation::Unique(uniq) => uniq, + }; + match tx.send(reserved) { + Ok(()) => { + if value.is_none() { + break; + } else { + continue; + } + }, Err(e) => { value = Some(e); } @@ -170,6 +309,20 @@ impl PoolInner { None => trace!("Pool::put found parked {:?}", key), } } + + /// A `Connecting` task is complete. Not necessarily successfully, + /// but the lock is going away, so clean up. + fn connected(&mut self, key: &Key) { + let existed = self.connecting.remove(key); + debug_assert!( + existed, + "Connecting dropped, key not in pool.connecting" + ); + // cancel any waiters. if there are any, it's because + // this Connecting task didn't complete successfully. + // those waiters would never receive a connection. + self.parked.remove(key); + } } impl PoolInner { @@ -177,7 +330,7 @@ impl PoolInner { /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out /// those parked senders. - fn clean_parked(&mut self, key: &Arc) { + fn clean_parked(&mut self, key: &Key) { let mut remove_parked = false; if let Some(parked) = self.parked.get_mut(key) { parked.retain(|tx| { @@ -191,7 +344,7 @@ impl PoolInner { } } -impl PoolInner { +impl PoolInner { fn clear_expired(&mut self) { let dur = if let Some(dur) = self.timeout { dur @@ -218,7 +371,7 @@ impl PoolInner { } -impl Pool { +impl Pool { pub(super) fn spawn_expired_interval(&self, exec: &Exec) { let (dur, rx) = { let mut inner = self.inner.lock().unwrap(); @@ -257,14 +410,16 @@ impl Clone for Pool { } } -pub struct Pooled { +/// A wrapped poolable value that tries to reinsert to the Pool on Drop. +// Note: The bounds `T: Poolable` is needed for the Drop impl. +pub(super) struct Pooled { value: Option, is_reused: bool, - key: Arc, + key: Key, pool: Weak>>, } -impl Pooled { +impl Pooled { pub fn is_reused(&self) -> bool { self.is_reused } @@ -278,22 +433,28 @@ impl Pooled { } } -impl Deref for Pooled { +impl Deref for Pooled { type Target = T; fn deref(&self) -> &T { self.as_ref() } } -impl DerefMut for Pooled { +impl DerefMut for Pooled { fn deref_mut(&mut self) -> &mut T { self.as_mut() } } -impl Drop for Pooled { +impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { + if value.is_closed() { + // If we *already* know the connection is done here, + // it shouldn't be re-inserted back into the pool. + return; + } + if let Some(inner) = self.pool.upgrade() { if let Ok(mut inner) = inner.lock() { inner.put(self.key.clone(), value); @@ -305,7 +466,7 @@ impl Drop for Pooled { } } -impl fmt::Debug for Pooled { +impl fmt::Debug for Pooled { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Pooled") .field("key", &self.key) @@ -318,33 +479,30 @@ struct Idle { value: T, } -pub struct Checkout { - key: Arc, +pub(super) struct Checkout { + key: Key, pool: Pool, parked: Option>, } -struct NotParked; - -impl Checkout { - fn poll_parked(&mut self) -> Poll, NotParked> { - let mut drop_parked = false; +impl Checkout { + fn poll_parked(&mut self) -> Poll>, ::Error> { + static CANCELED: &str = "pool checkout failed"; if let Some(ref mut rx) = self.parked { match rx.poll() { Ok(Async::Ready(value)) => { if !value.is_closed() { - return Ok(Async::Ready(self.pool.reuse(&self.key, value))); + Ok(Async::Ready(Some(self.pool.reuse(&self.key, value)))) + } else { + Err(::Error::new_canceled(Some(CANCELED))) } - drop_parked = true; }, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_canceled) => drop_parked = true, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))), } + } else { + Ok(Async::Ready(None)) } - if drop_parked { - self.parked.take(); - } - Err(NotParked) } fn park(&mut self) { @@ -357,14 +515,13 @@ impl Checkout { } } -impl Future for Checkout { +impl Future for Checkout { type Item = Pooled; type Error = ::Error; fn poll(&mut self) -> Poll { - match self.poll_parked() { - Ok(async) => return Ok(async), - Err(_not_parked) => (), + if let Some(pooled) = try_ready!(self.poll_parked()) { + return Ok(Async::Ready(pooled)); } let entry = self.pool.take(&self.key); @@ -387,6 +544,27 @@ impl Drop for Checkout { } } +pub(super) struct Connecting { + key: Key, + pool: Weak>>, +} + +impl Drop for Connecting { + fn drop(&mut self) { + if let Some(pool) = self.pool.upgrade() { + // No need to panic on drop, that could abort! + if let Ok(mut inner) = pool.lock() { + debug_assert_eq!( + self.key.1, + Ver::Http2, + "Connecting constructed without Http2" + ); + inner.connected(&self.key); + } + } + } +} + struct Expiration(Option); impl Expiration { @@ -411,7 +589,7 @@ struct IdleInterval { pool_drop_notifier: oneshot::Receiver, } -impl Future for IdleInterval { +impl Future for IdleInterval { type Item = (); type Error = (); @@ -441,28 +619,58 @@ impl Future for IdleInterval { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::sync::{Arc, Weak}; use std::time::Duration; use futures::{Async, Future}; use futures::future; - use super::{Closed, Pool, Exec}; + use super::{Connecting, Key, Poolable, Pool, Reservation, Exec, Ver}; + + /// Test unique reservations. + #[derive(Debug, PartialEq, Eq)] + struct Uniq(T); + + impl Poolable for Uniq { + fn is_closed(&self) -> bool { + false + } + + fn reserve(self) -> Reservation { + Reservation::Unique(self) + } + } - impl Closed for i32 { + /* + #[derive(Debug, PartialEq, Eq, Clone, Copy)] + struct Share(T); + + impl Poolable for Share { fn is_closed(&self) -> bool { false } + + fn reserve(self) -> Reservation { + Reservation::Shared(self.clone(), self) + } + } + */ + + fn c(key: Key) -> Connecting { + Connecting { + key, + pool: Weak::new(), + } } #[test] fn test_pool_checkout_smoke() { let pool = Pool::new(true, Some(Duration::from_secs(5))); - let key = Arc::new("foo".to_string()); - let pooled = pool.pooled(key.clone(), 41); + let key = (Arc::new("foo".to_string()), Ver::Http1); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); - match pool.checkout(&key).poll().unwrap() { - Async::Ready(pooled) => assert_eq!(*pooled, 41), + match pool.checkout(key).poll().unwrap() { + Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)), _ => panic!("not ready"), } } @@ -471,11 +679,11 @@ mod tests { fn test_pool_checkout_returns_none_if_expired() { future::lazy(|| { let pool = Pool::new(true, Some(Duration::from_millis(100))); - let key = Arc::new("foo".to_string()); - let pooled = pool.pooled(key.clone(), 41); + let key = (Arc::new("foo".to_string()), Ver::Http1); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); - assert!(pool.checkout(&key).poll().unwrap().is_not_ready()); + assert!(pool.checkout(key).poll().unwrap().is_not_ready()); ::futures::future::ok::<(), ()>(()) }).wait().unwrap(); } @@ -484,17 +692,17 @@ mod tests { fn test_pool_checkout_removes_expired() { future::lazy(|| { let pool = Pool::new(true, Some(Duration::from_millis(100))); - let key = Arc::new("foo".to_string()); + let key = (Arc::new("foo".to_string()), Ver::Http1); - pool.pooled(key.clone(), 41); - pool.pooled(key.clone(), 5); - pool.pooled(key.clone(), 99); + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); // checkout.poll() should clean out the expired - pool.checkout(&key).poll().unwrap(); + pool.checkout(key.clone()).poll().unwrap(); assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); Ok::<(), ()>(()) @@ -509,11 +717,11 @@ mod tests { let executor = runtime.executor(); pool.spawn_expired_interval(&Exec::Executor(Arc::new(executor))); - let key = Arc::new("foo".to_string()); + let key = (Arc::new("foo".to_string()), Ver::Http1); - pool.pooled(key.clone(), 41); - pool.pooled(key.clone(), 5); - pool.pooled(key.clone(), 99); + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); @@ -527,10 +735,10 @@ mod tests { #[test] fn test_pool_checkout_task_unparked() { let pool = Pool::new(true, Some(Duration::from_secs(10))); - let key = Arc::new("foo".to_string()); - let pooled = pool.pooled(key.clone(), 41); + let key = (Arc::new("foo".to_string()), Ver::Http1); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); - let checkout = pool.checkout(&key).join(future::lazy(move || { + let checkout = pool.checkout(key).join(future::lazy(move || { // the checkout future will park first, // and then this lazy future will be polled, which will insert // the pooled back into the pool @@ -539,17 +747,17 @@ mod tests { drop(pooled); Ok(()) })).map(|(entry, _)| entry); - assert_eq!(*checkout.wait().unwrap(), 41); + assert_eq!(*checkout.wait().unwrap(), Uniq(41)); } #[test] fn test_pool_checkout_drop_cleans_up_parked() { future::lazy(|| { - let pool = Pool::::new(true, Some(Duration::from_secs(10))); - let key = Arc::new("localhost:12345".to_string()); + let pool = Pool::>::new(true, Some(Duration::from_secs(10))); + let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); - let mut checkout1 = pool.checkout(&key); - let mut checkout2 = pool.checkout(&key); + let mut checkout1 = pool.checkout(key.clone()); + let mut checkout2 = pool.checkout(key.clone()); // first poll needed to get into Pool's parked checkout1.poll().unwrap(); @@ -567,4 +775,32 @@ mod tests { ::futures::future::ok::<(), ()>(()) }).wait().unwrap(); } + + #[derive(Debug)] + struct CanClose { + val: i32, + closed: bool, + } + + impl Poolable for CanClose { + fn is_closed(&self) -> bool { + self.closed + } + + fn reserve(self) -> Reservation { + Reservation::Unique(self) + } + } + + #[test] + fn pooled_drop_if_closed_doesnt_reinsert() { + let pool = Pool::new(true, Some(Duration::from_secs(10))); + let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); + pool.pooled(c(key.clone()), CanClose { + val: 57, + closed: true, + }); + + assert!(!pool.inner.lock().unwrap().idle.contains_key(&key)); + } } diff --git a/src/common/exec.rs b/src/common/exec.rs new file mode 100644 index 0000000000..2227448c2a --- /dev/null +++ b/src/common/exec.rs @@ -0,0 +1,38 @@ +use std::fmt; +use std::sync::Arc; + +use futures::future::{Executor, Future}; +use tokio_executor::spawn; + +/// Either the user provides an executor for background tasks, or we use +/// `tokio::spawn`. +#[derive(Clone)] +pub(crate) enum Exec { + Default, + Executor(Arc + Send>> + Send + Sync>), +} + + +impl Exec { + pub(crate) fn execute(&self, fut: F) + where + F: Future + Send + 'static, + { + match *self { + Exec::Default => spawn(fut), + Exec::Executor(ref e) => { + let _ = e.execute(Box::new(fut)) + .map_err(|err| { + panic!("executor error: {:?}", err.kind()); + }); + }, + } + } +} + +impl fmt::Debug for Exec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Exec") + .finish() + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index a627be7d3f..e47aacafa6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,2 +1,6 @@ +mod exec; + +pub(crate) use self::exec::Exec; + #[derive(Debug)] pub enum Never {} diff --git a/src/error.rs b/src/error.rs index 4d2f41a7b7..f1491ad0fb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -55,6 +55,9 @@ pub(crate) enum Kind { /// Error calling AsyncWrite::shutdown() Shutdown, + /// A general error from h2. + Http2, + /// User tried to create a Request with bad version. UnsupportedVersion, /// User tried to create a CONNECT Request with the Client. @@ -215,6 +218,10 @@ impl Error { pub(crate) fn new_shutdown(cause: io::Error) -> Error { Error::new(Kind::Shutdown, Some(Box::new(cause))) } + + pub(crate) fn new_h2(cause: ::h2::Error) -> Error { + Error::new(Kind::Http2, Some(Box::new(cause))) + } } impl fmt::Debug for Error { @@ -259,6 +266,7 @@ impl StdError for Error { Kind::BodyWrite => "error write a body to connection", Kind::BodyUser => "error from user's Payload stream", Kind::Shutdown => "error shutting down connection", + Kind::Http2 => "http2 general error", Kind::UnsupportedVersion => "request has unsupported HTTP version", Kind::UnsupportedRequestMethod => "request has unsupported HTTP method", @@ -319,3 +327,6 @@ trait AssertSendSync: Send + Sync + 'static {} #[doc(hidden)] impl AssertSendSync for Error {} +#[cfg(test)] +mod tests { +} diff --git a/src/lib.rs b/src/lib.rs index 860a528a12..95c257d443 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate futures_cpupool; extern crate futures_timer; +extern crate h2; extern crate http; extern crate httparse; extern crate iovec; @@ -36,6 +37,7 @@ extern crate want; extern crate test; pub use http::{ + HeaderMap, Method, Request, Response, diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 3ec87968e1..767d4edc60 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -129,7 +129,7 @@ where I: AsyncRead + AsyncWrite, let must_error = self.should_error_on_eof(); self.state.close_read(); self.io.consume_leading_lines(); - let was_mid_parse = !self.io.read_buf().is_empty(); + let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); return if was_mid_parse || must_error { debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len()); self.on_parse_error(e) @@ -566,7 +566,7 @@ where I: AsyncRead + AsyncWrite, match self.io.io_mut().shutdown() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(())) => { - trace!("shut down IO"); + trace!("shut down IO complete"); Ok(Async::Ready(())) } Err(e) => { @@ -599,6 +599,12 @@ where I: AsyncRead + AsyncWrite, Ok(()) } } + + // Used in h1::dispatch tests + #[cfg(test)] + pub(super) fn io_mut(&mut self) -> &mut I { + self.io.io_mut() + } } impl, T> fmt::Debug for Conn { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 01926742ea..a504dce826 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -7,8 +7,8 @@ use tokio_service::Service; use body::{Body, Payload}; use proto::{BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; -pub(crate) struct Dispatcher { - conn: Conn, +pub(crate) struct Dispatcher { + conn: Conn, dispatch: D, body_tx: Option<::body::Sender>, body_rx: Option, @@ -31,23 +31,20 @@ pub struct Server { } pub struct Client { - callback: Option<::client::dispatch::Callback, Response>>, + callback: Option<::client::dispatch::Callback, Response>>, rx: ClientRx, } -pub type ClientMsg = Request; +type ClientRx = ::client::dispatch::Receiver, Response>; -type ClientRx = ::client::dispatch::Receiver, Response>; - -impl Dispatcher +impl Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, T: Http1Transaction, - Bs: Payload, + Bs: Payload, { - pub fn new(dispatch: D, conn: Conn) -> Self { + pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { conn: conn, dispatch: dispatch, @@ -286,13 +283,12 @@ where } -impl Future for Dispatcher +impl Future for Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, T: Http1Transaction, - Bs: Payload, + Bs: Payload, { type Item = (); type Error = ::Error; @@ -493,11 +489,18 @@ mod tests { fn client_read_bytes_before_writing_request() { let _ = pretty_env_logger::try_init(); ::futures::lazy(|| { - let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100); + // Block at 0 for now, but we will release this response before + // the request is ready to write later... + let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); let (mut tx, rx) = ::client::dispatch::channel(); let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); + // First poll is needed to allow tx to send... + assert!(dispatcher.poll().expect("nothing is ready").is_not_ready()); + // Unblock our IO, which has a response before we've sent request! + dispatcher.conn.io_mut().block_in(100); + let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap(); let a1 = dispatcher.poll().expect("error should be sent on channel"); @@ -506,13 +509,6 @@ mod tests { .expect("callback poll") .expect_err("callback response"); - /* - let err = match async { - Async::Ready(result) => result.unwrap_err(), - Async::Pending => panic!("callback should be ready"), - }; - */ - match (err.0.kind(), err.1) { (&::error::Kind::Canceled, Some(_)) => (), other => panic!("expected Canceled, got {:?}", other), diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 42af891b14..0a9bff1527 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -1,6 +1,8 @@ pub(crate) use self::conn::Conn; +pub(crate) use self::dispatch::Dispatcher; pub use self::decode::Decoder; pub use self::encode::{EncodedBuf, Encoder}; +pub use self::io::Cursor; //TODO: move out of h1::io mod conn; mod date; diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs new file mode 100644 index 0000000000..deca0af828 --- /dev/null +++ b/src/proto/h2/client.rs @@ -0,0 +1,144 @@ +use bytes::IntoBuf; +use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, Either}; +use futures::sync::oneshot; +use h2::client::{Builder, Handshake, SendRequest}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use body::Payload; +use ::common::{Exec, Never}; +use super::{PipeToSendStream, SendBuf}; +use ::{Body, Request, Response}; + +type ClientRx = ::client::dispatch::Receiver, Response>; + +pub struct Client +where + B: Payload, +{ + executor: Exec, + rx: ClientRx, + state: State>, +} + +enum State where B: IntoBuf { + Handshaking(Handshake), + Ready(SendRequest, oneshot::Sender), +} + +impl Client +where + T: AsyncRead + AsyncWrite + Send + 'static, + B: Payload, +{ + pub(crate) fn new(io: T, rx: ClientRx, exec: Exec) -> Client { + let handshake = Builder::new() + // we don't expose PUSH promises yet + .enable_push(false) + .handshake(io); + + Client { + executor: exec, + rx: rx, + state: State::Handshaking(handshake), + } + } +} + +impl Future for Client +where + T: AsyncRead + AsyncWrite + Send + 'static, + B: Payload + 'static, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + State::Handshaking(ref mut h) => { + let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); + // A oneshot channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (tx, rx) = oneshot::channel(); + let fut = conn + .map_err(|e| debug!("client h2 connection error: {}", e)) + .select2(rx) + .then(|res| match res { + Ok(Either::A(((), _))) | + Err(Either::A(((), _))) => { + // conn has finished either way + Either::A(future::ok(())) + }, + Err(Either::B((_, conn))) => { + // oneshot has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + Either::B(conn) + } + Ok(Either::B((never, _))) => match never {}, + }); + self.executor.execute(fut); + State::Ready(request_tx, tx) + }, + State::Ready(ref mut tx, _) => { + try_ready!(tx.poll_ready().map_err(::Error::new_h2)); + match self.rx.poll() { + Ok(Async::Ready(Some((req, mut cb)))) => { + // check that future hasn't been canceled already + if let Async::Ready(()) = cb.poll_cancel().expect("poll_cancel cannot error") { + trace!("request canceled"); + continue; + } + let (head, body) = req.into_parts(); + let mut req = ::http::Request::from_parts(head, ()); + super::strip_connection_headers(req.headers_mut()); + let eos = body.is_end_stream(); + let (fut, body_tx) = match tx.send_request(req, eos) { + Ok(ok) => ok, + Err(err) => { + debug!("client send request error: {}", err); + let _ = cb.send(Err((::Error::new_h2(err), None))); + continue; + } + }; + if !eos { + let pipe = PipeToSendStream::new(body, body_tx); + self.executor.execute(pipe.map_err(|e| debug!("client request body error: {}", e))); + } + + let fut = fut + .then(move |result| { + match result { + Ok(res) => { + let res = res.map(::Body::h2); + let _ = cb.send(Ok(res)); + }, + Err(err) => { + debug!("client response error: {}", err); + let _ = cb.send(Err((::Error::new_h2(err), None))); + } + } + Ok(()) + }); + self.executor.execute(fut); + continue; + }, + + Ok(Async::NotReady) => return Ok(Async::NotReady), + + Ok(Async::Ready(None)) | + Err(_) => { + trace!("client tx dropped"); + return Ok(Async::Ready(())); + } + } + }, + }; + self.state = next; + } + } +} diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 8b13789179..06aab6a1c1 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -1 +1,121 @@ +use bytes::Buf; +use futures::{Async, Future, Poll}; +use h2::{Reason, SendStream}; +use http::HeaderMap; +use http::header::{CONNECTION, TRANSFER_ENCODING}; +use ::body::Payload; +use ::proto::h1::Cursor; + +mod client; +mod server; + +pub(crate) use self::client::Client; +pub(crate) use self::server::Server; + +fn strip_connection_headers(headers: &mut HeaderMap) { + if headers.remove(TRANSFER_ENCODING).is_some() { + trace!("removed illegal Transfer-Encoding header"); + } + if headers.contains_key(CONNECTION) { + warn!("Connection header illegal in HTTP/2"); + //TODO: actually remove it, after checking the value + //and removing all related headers + } +} + +// body adapters used by both Client and Server + +struct PipeToSendStream +where + S: Payload, +{ + body_tx: SendStream>, + stream: S, +} + +impl PipeToSendStream +where + S: Payload, +{ + fn new(stream: S, tx: SendStream>) -> PipeToSendStream { + PipeToSendStream { + body_tx: tx, + stream: stream, + } + } +} + +impl Future for PipeToSendStream +where + S: Payload, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + // TODO: make use of flow control on SendStream + // If you're looking at this and thinking of trying to fix this TODO, + // you may want to look at: + // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html + // + // With that doc open, we'd want to do these things: + // - check self.body_tx.capacity() to see if we can send *any* data + // - if > 0: + // - poll self.stream + // - reserve chunk.len() more capacity (because its about to be used)? + // - send the chunk + // - else: + // - try reserve a smallish amount of capacity + // - call self.body_tx.poll_capacity(), return if NotReady + match self.stream.poll_data() { + Ok(Async::Ready(Some(chunk))) => { + trace!("send body chunk: {}B", chunk.as_ref().len()); + self.body_tx.send_data(SendBuf(Some(Cursor::new(chunk))), false) + .map_err(::Error::new_h2)?; + }, + Ok(Async::Ready(None)) => { + trace!("send body eos"); + self.body_tx.send_data(SendBuf(None), true) + .map_err(::Error::new_h2)?; + return Ok(Async::Ready(())); + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => { + let err = ::Error::new_user_body(err); + trace!("send body user stream error: {}", err); + self.body_tx.send_reset(Reason::INTERNAL_ERROR); + return Err(err); + } + } + } + } +} + +struct SendBuf(Option>); + +impl> Buf for SendBuf { + #[inline] + fn remaining(&self) -> usize { + self.0 + .as_ref() + .map(|b| b.remaining()) + .unwrap_or(0) + } + + #[inline] + fn bytes(&self) -> &[u8] { + self.0 + .as_ref() + .map(|b| b.bytes()) + .unwrap_or(&[]) + } + + #[inline] + fn advance(&mut self, cnt: usize) { + self.0 + .as_mut() + .map(|b| b.advance(cnt)); + } +} diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs new file mode 100644 index 0000000000..c0958d0361 --- /dev/null +++ b/src/proto/h2/server.rs @@ -0,0 +1,198 @@ +use futures::{Async, Future, Poll, Stream}; +use h2::Reason; +use h2::server::{Builder, Connection, Handshake, SendResponse}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use ::body::Payload; +use ::common::Exec; +use ::server::Service; +use super::{PipeToSendStream, SendBuf}; + +use ::{Body, Request, Response}; + +pub(crate) struct Server +where + S: Service, + B: Payload, +{ + exec: Exec, + service: S, + state: State, +} + +enum State +where + B: Payload, +{ + Handshaking(Handshake>), + Serving(Serving), +} + +struct Serving +where + B: Payload, +{ + conn: Connection>, +} + + +impl Server +where + T: AsyncRead + AsyncWrite, + S: Service, Response=Response>, + S::Error: Into>, + S::Future: Send + 'static, + B: Payload, +{ + pub(crate) fn new(io: T, service: S, exec: Exec) -> Server { + let handshake = Builder::new() + .handshake(io); + Server { + exec, + state: State::Handshaking(handshake), + service, + } + } + + pub fn graceful_shutdown(&mut self) { + unimplemented!("h2 server graceful shutdown"); + } +} + +impl Future for Server +where + T: AsyncRead + AsyncWrite, + S: Service, Response=Response>, + S::Error: Into>, + S::Future: Send + 'static, + B: Payload, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + State::Handshaking(ref mut h) => { + let conn = try_ready!(h.poll().map_err(::Error::new_h2)); + State::Serving(Serving { + conn: conn, + }) + }, + State::Serving(ref mut srv) => { + return srv.poll_server(&mut self.service, &self.exec); + } + }; + self.state = next; + } + } +} + +impl Serving +where + T: AsyncRead + AsyncWrite, + B: Payload, +{ + fn poll_server(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error> + where + S: Service< + Request=Request, + Response=Response, + >, + S::Error: Into>, + S::Future: Send + 'static, + { + while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { + trace!("incoming request"); + let req = req.map(::Body::h2); + let fut = H2Stream::new(service.call(req), respond); + exec.execute(fut); + } + + // no more incoming streams... + trace!("incoming connection complete"); + Ok(Async::Ready(())) + } +} + +struct H2Stream +where + B: Payload, +{ + reply: SendResponse>, + state: H2StreamState, +} + +enum H2StreamState +where + B: Payload, +{ + Service(F), + Body(PipeToSendStream), +} + +impl H2Stream +where + F: Future>, + F::Error: Into>, + B: Payload, +{ + fn new(fut: F, respond: SendResponse>) -> H2Stream { + H2Stream { + reply: respond, + state: H2StreamState::Service(fut), + } + } + + fn poll2(&mut self) -> Poll<(), ::Error> { + loop { + let next = match self.state { + H2StreamState::Service(ref mut h) => { + let res = try_ready!(h.poll().map_err(::Error::new_user_service)); + let (head, body) = res.into_parts(); + let mut res = ::http::Response::from_parts(head, ()); + super::strip_connection_headers(res.headers_mut()); + macro_rules! reply { + ($eos:expr) => ({ + match self.reply.send_response(res, $eos) { + Ok(tx) => tx, + Err(e) => { + trace!("send response error: {}", e); + self.reply.send_reset(Reason::INTERNAL_ERROR); + return Err(::Error::new_h2(e)); + } + } + }) + } + if !body.is_end_stream() { + let body_tx = reply!(false); + H2StreamState::Body(PipeToSendStream::new(body, body_tx)) + } else { + reply!(true); + return Ok(Async::Ready(())); + } + }, + H2StreamState::Body(ref mut pipe) => { + return pipe.poll(); + } + }; + self.state = next; + } + } +} + +impl Future for H2Stream +where + F: Future>, + F::Error: Into>, + B: Payload, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + self.poll2() + .map_err(|e| debug!("stream error: {}", e)) + } +} + diff --git a/src/proto/mod.rs b/src/proto/mod.rs index e03ccef14e..4dd38dc1b8 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,8 +6,8 @@ use headers; pub(crate) use self::h1::{dispatch, Conn}; -mod h1; -//mod h2; +pub(crate) mod h1; +pub(crate) mod h2; /// An Incoming Message head. Includes request/status line, and headers. diff --git a/src/server/conn.rs b/src/server/conn.rs index 0a987e757c..6e55da3eca 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -12,6 +12,7 @@ use std::fmt; use bytes::Bytes; use futures::{Future, Poll}; +use futures::future::{Either}; use tokio_io::{AsyncRead, AsyncWrite}; use proto; @@ -27,12 +28,18 @@ where S: HyperService, S::ResponseBody: Payload, { - pub(super) conn: proto::dispatch::Dispatcher< - proto::dispatch::Server, - S::ResponseBody, - I, - ::Data, - proto::ServerTransaction, + pub(super) conn: Either< + proto::h1::Dispatcher< + proto::h1::dispatch::Server, + S::ResponseBody, + I, + proto::ServerTransaction, + >, + proto::h2::Server< + I, + S, + S::ResponseBody, + >, >, } @@ -62,12 +69,23 @@ impl Connection where S: Service, Response=Response> + 'static, S::Error: Into>, + S::Future: Send, I: AsyncRead + AsyncWrite + 'static, B: Payload + 'static, { - /// Disables keep-alive for this connection. - pub fn disable_keep_alive(&mut self) { - self.conn.disable_keep_alive() + /// Start a graceful shutdown process for this connection. + /// + /// This `Connection` should continue to be polled until shutdown + /// can finish. + pub fn graceful_shutdown(&mut self) { + match self.conn { + Either::A(ref mut h1) => { + h1.disable_keep_alive(); + }, + Either::B(ref mut h2) => { + h2.graceful_shutdown(); + } + } } /// Return the inner IO object, and additional information. @@ -76,7 +94,14 @@ where /// that the connection is "done". Otherwise, it may not have finished /// flushing all necessary HTTP bytes. pub fn into_parts(self) -> Parts { - let (io, read_buf) = self.conn.into_inner(); + let (io, read_buf) = match self.conn { + Either::A(h1) => { + h1.into_inner() + }, + Either::B(_h2) => { + panic!("h2 cannot into_inner"); + } + }; Parts { io: io, read_buf: read_buf, @@ -92,8 +117,13 @@ where /// but it is not desired to actally shutdown the IO object. Instead you /// would take it back using `into_parts`. pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { - try_ready!(self.conn.poll_without_shutdown()); - Ok(().into()) + match self.conn { + Either::A(ref mut h1) => { + try_ready!(h1.poll_without_shutdown()); + Ok(().into()) + }, + Either::B(ref mut h2) => h2.poll(), + } } } @@ -101,6 +131,7 @@ impl Future for Connection where S: Service, Response=Response> + 'static, S::Error: Into>, + S::Future: Send, I: AsyncRead + AsyncWrite + 'static, B: Payload + 'static, { diff --git a/src/server/mod.rs b/src/server/mod.rs index 9070ea9d87..d6ba0909f7 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,13 +8,12 @@ mod service; use std::fmt; use std::io; -use std::marker::PhantomData; use std::net::{SocketAddr, TcpListener as StdTcpListener}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use futures::task::{self, Task}; -use futures::future::{self}; +use futures::future::{self, Either, Executor}; use futures::{Future, Stream, Poll, Async}; use futures_timer::Delay; use http::{Request, Response}; @@ -25,6 +24,7 @@ use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; use body::{Body, Payload}; +use common::Exec; use proto; use self::addr_stream::AddrStream; use self::hyper_service::HyperService; @@ -37,23 +37,22 @@ pub use self::service::{const_service, service_fn}; /// This structure is used to create instances of `Server` or to spawn off tasks /// which handle a connection to an HTTP server. Each instance of `Http` can be /// configured with various protocol-level options such as keepalive. -pub struct Http { - max_buf_size: Option, +#[derive(Clone, Debug)] +pub struct Http { + exec: Exec, + http2: bool, keep_alive: bool, + max_buf_size: Option, pipeline: bool, sleep_on_errors: bool, - _marker: PhantomData, } /// An instance of a server created through `Http::bind`. /// /// This server is intended as a convenience for creating a TCP listener on an /// address and then serving TCP connections accepted with the service provided. -pub struct Server -where - B: Payload, -{ - protocol: Http, +pub struct Server { + protocol: Http, new_service: S, handle: Handle, listener: TcpListener, @@ -105,19 +104,28 @@ impl fmt::Debug for AddrIncoming { // ===== impl Http ===== -impl + 'static> Http { +impl Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. - pub fn new() -> Http { + pub fn new() -> Http { Http { + exec: Exec::Default, + http2: false, keep_alive: true, max_buf_size: None, pipeline: false, sleep_on_errors: false, - _marker: PhantomData, } } + /// Sets whether HTTP2 is required. + /// + /// Default is false + pub fn http2_only(&mut self, val: bool) -> &mut Self { + self.http2 = val; + self + } + /// Enables or disables HTTP keep-alive. /// /// Default is true. @@ -142,6 +150,17 @@ impl + 'static> Http { self } + /// Set the executor used to spawn background tasks. + /// + /// Default uses implicit default (like `tokio::spawn`). + pub fn executor(&mut self, exec: E) -> &mut Self + where + E: Executor + Send>> + Send + Sync + 'static + { + self.exec = Exec::Executor(Arc::new(exec)); + self + } + /// Swallow connection accept errors. Instead of passing up IO errors when /// the server is under heavy load the errors will be ignored. Some /// connection accept errors (like "connection reset") can be ignored, some @@ -164,11 +183,11 @@ impl + 'static> Http { /// /// The returned `Server` contains one method, `run`, which is used to /// actually run the server. - pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> + pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> where S: NewService, Response=Response> + 'static, S::Error: Into>, - Bd: Payload, + Bd: Payload, { let handle = Handle::current(); let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; @@ -193,7 +212,7 @@ impl + 'static> Http { where S: NewService, Response=Response>, S::Error: Into>, - Bd: Payload, + Bd: Payload, { let handle = Handle::current(); let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; @@ -217,7 +236,7 @@ impl + 'static> Http { where S: NewService, Response = Response>, S::Error: Into>, - Bd: Payload, + Bd: Payload, { let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; @@ -238,18 +257,12 @@ impl + 'static> Http { I::Item: AsyncRead + AsyncWrite, S: NewService, Response = Response>, S::Error: Into>, - Bd: Payload, + Bd: Payload, { Serve { incoming: incoming, new_service: new_service, - protocol: Http { - keep_alive: self.keep_alive, - max_buf_size: self.max_buf_size, - pipeline: self.pipeline, - sleep_on_errors: self.sleep_on_errors, - _marker: PhantomData, - }, + protocol: self.clone(), } } @@ -276,11 +289,10 @@ impl + 'static> Http { /// # S: Service, Response=Response, Error=hyper::Error> + Send + 'static, /// # S::Future: Send /// # { - /// let http = Http::::new(); + /// let http = Http::new(); /// let conn = http.serve_connection(some_io, some_service); /// /// let fut = conn - /// .map(|_| ()) /// .map_err(|e| eprintln!("server connection error: {}", e)); /// /// tokio::spawn(fut); @@ -291,43 +303,32 @@ impl + 'static> Http { where S: Service, Response = Response>, S::Error: Into>, + S::Future: Send + 'static, Bd: Payload, I: AsyncRead + AsyncWrite, { - let mut conn = proto::Conn::new(io); - if !self.keep_alive { - conn.disable_keep_alive(); - } - conn.set_flush_pipeline(self.pipeline); - if let Some(max) = self.max_buf_size { - conn.set_max_buf_size(max); - } - Connection { - conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn), - } - } -} - - + let either = if !self.http2 { + let mut conn = proto::Conn::new(io); + if !self.keep_alive { + conn.disable_keep_alive(); + } + conn.set_flush_pipeline(self.pipeline); + if let Some(max) = self.max_buf_size { + conn.set_max_buf_size(max); + } + let sd = proto::h1::dispatch::Server::new(service); + Either::A(proto::h1::Dispatcher::new(sd, conn)) + } else { + let h2 = proto::h2::Server::new(io, service, self.exec.clone()); + Either::B(h2) + }; -impl Clone for Http { - fn clone(&self) -> Http { - Http { - ..*self + Connection { + conn: either, } } } -impl fmt::Debug for Http { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Http") - .field("keep_alive", &self.keep_alive) - .field("pipeline", &self.pipeline) - .finish() - } -} - - // ===== impl Server ===== @@ -351,12 +352,12 @@ impl Future for Run { } -impl Server +impl Server where S: NewService, Response = Response> + Send + 'static, S::Error: Into>, ::Instance: Send, - <::Instance as Service>::Future: Send, + <::Instance as Service>::Future: Send + 'static, B: Payload + Send + 'static, B::Data: Send, { @@ -479,7 +480,7 @@ where } } -impl fmt::Debug for Server +impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") @@ -516,6 +517,7 @@ where I::Item: AsyncRead + AsyncWrite, S: NewService, Response=Response>, S::Error: Into>, + ::Future: Send + 'static, B: Payload, { type Item = Connection; diff --git a/tests/integration.rs b/tests/integration.rs new file mode 100644 index 0000000000..ea65712cd9 --- /dev/null +++ b/tests/integration.rs @@ -0,0 +1,146 @@ +#[macro_use] +mod support; +use self::support::*; + +t! { + get_1, + client: + request: + uri: "/", + ; + response: + status: 200, + ; + server: + request: + uri: "/", + ; + response: + ; +} + +t! { + get_implicit_path, + client: + request: + uri: "", + ; + response: + status: 200, + ; + server: + request: + uri: "/", + ; + response: + ; +} + +t! { + get_body, + client: + request: + uri: "/", + ; + response: + status: 200, + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + server: + request: + uri: "/", + ; + response: + headers: { + "content-length" => 11, + }, + body: "hello world", + ; +} + +t! { + get_body_chunked, + client: + request: + uri: "/", + ; + response: + status: 200, + headers: { + // h2 doesn't actually receive the transfer-encoding header + }, + body: "hello world", + ; + server: + request: + uri: "/", + ; + response: + headers: { + // http2 should strip this header + "transfer-encoding" => "chunked", + }, + body: "hello world", + ; +} + +t! { + post_chunked, + client: + request: + method: "POST", + uri: "/post_chunked", + headers: { + // http2 should strip this header + "transfer-encoding" => "chunked", + }, + body: "hello world", + ; + response: + ; + server: + request: + method: "POST", + uri: "/post_chunked", + body: "hello world", + ; + response: + ; +} + +t! { + get_2, + client: + request: + uri: "/1", + ; + response: + status: 200, + ; + request: + uri: "/2", + ; + response: + status: 200, + ; + server: + request: + uri: "/1", + ; + response: + ; + request: + uri: "/2", + ; + response: + ; +} + +t! { + get_parallel_http2, + parallel: 0..10 +} + diff --git a/tests/server.rs b/tests/server.rs index d5209d5a59..0cc22aa2d4 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -95,7 +95,7 @@ fn get_implicitly_empty() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new().serve_connection(socket, GetImplicitlyEmpty) + Http::new().serve_connection(socket, GetImplicitlyEmpty) }); fut.wait().unwrap(); @@ -110,7 +110,6 @@ fn get_implicitly_empty() { fn call(&self, req: Request) -> Self::Future { Box::new(req.into_body() - .concat2() .map(|buf| { assert!(buf.is_empty()); @@ -776,13 +775,13 @@ fn disable_keep_alive_mid_request() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new().serve_connection(socket, HelloWorld) + Http::new().serve_connection(socket, HelloWorld) .select2(rx1) .then(|r| { match r { Ok(Either::A(_)) => panic!("expected rx first"), Ok(Either::B(((), mut conn))) => { - conn.disable_keep_alive(); + conn.graceful_shutdown(); tx2.send(()).unwrap(); conn } @@ -841,13 +840,13 @@ fn disable_keep_alive_post_request() { stream: socket, _debug: dropped2, }; - Http::::new().serve_connection(transport, HelloWorld) + Http::new().serve_connection(transport, HelloWorld) .select2(rx1) .then(|r| { match r { Ok(Either::A(_)) => panic!("expected rx first"), Ok(Either::B(((), mut conn))) => { - conn.disable_keep_alive(); + conn.graceful_shutdown(); conn } Err(Either::A((e, _))) => panic!("unexpected error {}", e), @@ -883,7 +882,7 @@ fn empty_parse_eof_does_not_return_error() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new().serve_connection(socket, HelloWorld) + Http::new().serve_connection(socket, HelloWorld) }); fut.wait().unwrap(); @@ -905,8 +904,7 @@ fn nonempty_parse_eof_returns_error() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new().serve_connection(socket, HelloWorld) - .map(|_| ()) + Http::new().serve_connection(socket, HelloWorld) }); fut.wait().unwrap_err(); @@ -933,14 +931,13 @@ fn returning_1xx_response_is_error() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new() + Http::new() .serve_connection(socket, service_fn(|_| { Ok::<_, hyper::Error>(Response::builder() .status(StatusCode::CONTINUE) .body(Body::empty()) .unwrap()) })) - .map(|_| ()) }); fut.wait().unwrap_err(); @@ -981,7 +978,7 @@ fn upgrades() { .map_err(|_| -> hyper::Error { unreachable!() }) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - let conn = Http::::new() + let conn = Http::new() .serve_connection(socket, service_fn(|_| { let res = Response::builder() .status(101) @@ -1034,9 +1031,8 @@ fn parse_errors_send_4xx_response() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new() + Http::new() .serve_connection(socket, HelloWorld) - .map(|_| ()) }); fut.wait().unwrap_err(); @@ -1063,9 +1059,8 @@ fn illegal_request_length_returns_400_response() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new() + Http::new() .serve_connection(socket, HelloWorld) - .map(|_| ()) }); fut.wait().unwrap_err(); @@ -1096,10 +1091,9 @@ fn max_buf_size() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new() + Http::new() .max_buf_size(MAX) .serve_connection(socket, HelloWorld) - .map(|_| ()) }); fut.wait().unwrap_err(); @@ -1140,7 +1134,7 @@ fn streaming_body() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let socket = item.unwrap(); - Http::::new() + Http::new() .keep_alive(false) .serve_connection(socket, service_fn(|_| { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; @@ -1149,7 +1143,6 @@ fn streaming_body() { let b = hyper::Body::wrap_stream(b); Ok::<_, hyper::Error>(Response::new(b)) })) - .map(|_| ()) }); fut.join(rx).wait().unwrap(); diff --git a/tests/support/mod.rs b/tests/support/mod.rs new file mode 100644 index 0000000000..decfe43df2 --- /dev/null +++ b/tests/support/mod.rs @@ -0,0 +1,328 @@ +pub extern crate futures; +pub extern crate hyper; +pub extern crate tokio; + +pub use std::net::SocketAddr; +pub use self::futures::{future, Future, Stream}; +pub use self::futures::sync::oneshot; +pub use self::hyper::{HeaderMap, StatusCode}; +pub use self::tokio::runtime::Runtime; + +macro_rules! t { + ( + $name:ident, + parallel: $range:expr + ) => ( + #[test] + fn $name() { + + let mut c = vec![]; + let mut s = vec![]; + + for _i in $range { + c.push(( + __CReq { + uri: "/", + ..Default::default() + }, + __CRes { + ..Default::default() + }, + )); + s.push(( + __SReq { + uri: "/", + ..Default::default() + }, + __SRes { + ..Default::default() + }, + )); + } + + __run_test(__TestConfig { + client_version: 2, + client_msgs: c, + server_version: 2, + server_msgs: s, + parallel: true, + connections: 1, + }); + + } + ); + ( + $name:ident, + client: $( + request: $( + $c_req_prop:ident: $c_req_val:tt, + )*; + response: $( + $c_res_prop:ident: $c_res_val:tt, + )*; + )* + server: $( + request: $( + $s_req_prop:ident: $s_req_val:tt, + )*; + response: $( + $s_res_prop:ident: $s_res_val:tt, + )*; + )* + ) => ( + #[test] + fn $name() { + let c = vec![$(( + __CReq { + $($c_req_prop: __internal_req_res_prop!($c_req_prop: $c_req_val),)* + ..Default::default() + }, + __CRes { + $($c_res_prop: __internal_req_res_prop!($c_res_prop: $c_res_val),)* + ..Default::default() + } + ),)*]; + let s = vec![$(( + __SReq { + $($s_req_prop: __internal_req_res_prop!($s_req_prop: $s_req_val),)* + ..Default::default() + }, + __SRes { + $($s_res_prop: __internal_req_res_prop!($s_res_prop: $s_res_val),)* + ..Default::default() + } + ),)*]; + + __run_test(__TestConfig { + client_version: 1, + client_msgs: c.clone(), + server_version: 1, + server_msgs: s.clone(), + parallel: false, + connections: 1, + }); + + __run_test(__TestConfig { + client_version: 2, + client_msgs: c, + server_version: 2, + server_msgs: s, + parallel: false, + connections: 1, + }); + } + ); +} + +macro_rules! __internal_req_res_prop { + (method: $prop_val:expr) => ( + $prop_val + ); + (status: $prop_val:expr) => ( + StatusCode::from_u16($prop_val).expect("status code") + ); + (headers: $map:tt) => ({ + #[allow(unused_mut)] + let mut headers = HeaderMap::new(); + __internal_headers!(headers, $map); + headers + }); + ($prop_name:ident: $prop_val:expr) => ( + From::from($prop_val) + ) +} + +macro_rules! __internal_headers { + ($headers:ident, { $($name:expr => $val:expr,)* }) => { + $( + $headers.insert($name, $val.to_string().parse().expect("header value")); + )* + } +} + +#[derive(Clone, Debug, Default)] +pub struct __CReq { + pub method: &'static str, + pub uri: &'static str, + pub headers: HeaderMap, + pub body: Vec, +} + +#[derive(Clone, Debug, Default)] +pub struct __CRes { + pub status: hyper::StatusCode, + pub body: Vec, + pub headers: HeaderMap, +} + +#[derive(Clone, Debug, Default)] +pub struct __SReq { + pub method: &'static str, + pub uri: &'static str, + pub headers: HeaderMap, + pub body: Vec, +} + +#[derive(Clone, Debug, Default)] +pub struct __SRes { + pub status: hyper::StatusCode, + pub body: Vec, + pub headers: HeaderMap, +} + +pub struct __TestConfig { + pub client_version: usize, + pub client_msgs: Vec<(__CReq, __CRes)>, + + pub server_version: usize, + pub server_msgs: Vec<(__SReq, __SRes)>, + + pub parallel: bool, + pub connections: usize, +} + +pub fn __run_test(cfg: __TestConfig) { + extern crate pretty_env_logger; + use hyper::{Body, Client, Request, Response}; + use hyper::client::HttpConnector; + use std::sync::Arc; + let _ = pretty_env_logger::try_init(); + let rt = Runtime::new().expect("new rt"); + let handle = rt.reactor().clone(); + + let connector = HttpConnector::new_with_handle(1, handle.clone()); + let client = Client::builder() + .http2_only(cfg.client_version == 2) + .executor(rt.executor()) + .build::<_, Body>(connector); + + let serve_handles = ::std::sync::Mutex::new( + cfg.server_msgs + ); + let service = hyper::server::service_fn(move |req: Request| -> Box, Error=hyper::Error> + Send> { + let (sreq, sres) = serve_handles.lock() + .unwrap() + .remove(0); + + assert_eq!(req.uri().path(), sreq.uri); + assert_eq!(req.method(), &sreq.method); + for (name, value) in &sreq.headers { + assert_eq!( + req.headers()[name], + value + ); + } + let sbody = sreq.body; + Box::new(req.into_body() + .concat2() + .map(move |body| { + assert_eq!(body.as_ref(), sbody.as_slice()); + + let mut res = Response::builder() + .status(sres.status) + .body(sres.body.into()) + .expect("Response::build"); + *res.headers_mut() = sres.headers; + res + })) + }); + let new_service = hyper::server::const_service(service); + + let serve = hyper::server::Http::new() + .http2_only(cfg.server_version == 2) + .executor(rt.executor()) + .serve_addr_handle( + &SocketAddr::from(([127, 0, 0, 1], 0)), + &handle, + new_service, + ) + .expect("serve_addr_handle"); + + let addr = serve.incoming_ref().local_addr(); + let exe = rt.executor(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (success_tx, success_rx) = oneshot::channel(); + let expected_connections = cfg.connections; + let server = serve + .fold(0, move |cnt, conn: hyper::server::Connection<_, _>| { + exe.spawn(conn.map_err(|e| panic!("server connection error: {}", e))); + Ok::<_, hyper::Error>(cnt + 1) + }) + .map(move |cnt| { + assert_eq!(cnt, expected_connections); + }) + .map_err(|e| panic!("serve error: {}", e)) + .select2(shutdown_rx) + .map(move |_| { + let _ = success_tx.send(()); + }) + .map_err(|_| panic!("shutdown not ok")); + + rt.executor().spawn(server); + + let make_request = Arc::new(move |client: &Client, creq: __CReq, cres: __CRes| { + let uri = format!("http://{}{}", addr, creq.uri); + let mut req = Request::builder() + .method(creq.method) + .uri(uri) + //.headers(creq.headers) + .body(creq.body.into()) + .expect("Request::build"); + *req.headers_mut() = creq.headers; + let cstatus = cres.status; + let cheaders = cres.headers; + let cbody = cres.body; + + client.request(req) + .and_then(move |res| { + assert_eq!(res.status(), cstatus); + //assert_eq!(res.version(), c_version); + for (name, value) in &cheaders { + assert_eq!( + res.headers()[name], + value + ); + } + res.into_body().concat2() + }) + .map(move |body| { + assert_eq!(body.as_ref(), cbody.as_slice()); + }) + .map_err(|e| panic!("client error: {}", e)) + }); + + + let client_futures: Box + Send> = if cfg.parallel { + let mut client_futures = vec![]; + for (creq, cres) in cfg.client_msgs { + client_futures.push(make_request(&client, creq, cres)); + } + drop(client); + Box::new(future::join_all(client_futures).map(|_| ())) + } else { + let mut client_futures: Box, Error=()> + Send> = + Box::new(future::ok(client)); + for (creq, cres) in cfg.client_msgs { + let mk_request = make_request.clone(); + client_futures = Box::new( + client_futures + .and_then(move |client| { + let fut = mk_request(&client, creq, cres); + fut.map(move |()| client) + }) + ); + } + Box::new(client_futures.map(|_| ())) + }; + + let client_futures = client_futures.map(move |_| { + let _ = shutdown_tx.send(()); + }); + rt.executor().spawn(client_futures); + rt.shutdown_on_idle().wait().expect("rt"); + success_rx + .map_err(|_| "something panicked") + .wait() + .expect("shutdown succeeded"); +} +