diff --git a/examples/client.rs b/examples/client.rs index 126d4395eb..9c411254b6 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -41,7 +41,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> { let addr = format!("{}:{}", host, port); let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/client_json.rs b/examples/client_json.rs index 3c1a7ffd39..2084e071fe 100644 --- a/examples/client_json.rs +++ b/examples/client_json.rs @@ -30,7 +30,7 @@ async fn fetch_json(url: hyper::Uri) -> Result> { let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/gateway.rs b/examples/gateway.rs index 373b36ecf8..22de700eea 100644 --- a/examples/gateway.rs +++ b/examples/gateway.rs @@ -43,7 +43,8 @@ async fn main() -> Result<(), Box> { async move { let client_stream = TcpStream::connect(addr).await.unwrap(); - let (mut sender, conn) = hyper::client::conn::handshake(client_stream).await?; + let (mut sender, conn) = + hyper::client::conn::http1::handshake(client_stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { println!("Connection failed: {:?}", err); diff --git a/examples/http_proxy.rs b/examples/http_proxy.rs index 8d262f9002..6bb0a8d24e 100644 --- a/examples/http_proxy.rs +++ b/examples/http_proxy.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; -use hyper::client::conn::Builder; +use hyper::client::conn::http1::Builder; use hyper::server::conn::Http; use hyper::service::service_fn; use hyper::upgrade::Upgraded; diff --git a/examples/upgrades.rs b/examples/upgrades.rs index 31980cb9b8..e00d86f2d9 100644 --- a/examples/upgrades.rs +++ b/examples/upgrades.rs @@ -97,7 +97,7 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> { .unwrap(); let stream = TcpStream::connect(addr).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { diff --git a/examples/web_api.rs b/examples/web_api.rs index 220c5d822e..9c4f5c12e1 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -31,7 +31,7 @@ async fn client_request_response() -> Result> { let port = req.uri().port_u16().expect("uri has no port"); let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; - let (mut sender, conn) = hyper::client::conn::handshake(stream).await?; + let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?; tokio::task::spawn(async move { if let Err(err) = conn.await { diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 7878031656..c4776fdae8 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -4,6 +4,7 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; +use bytes::Bytes; use http::{Request, Response}; use httparse::ParserConfig; use tokio::io::{AsyncRead, AsyncWrite}; @@ -27,6 +28,27 @@ pub struct SendRequest { dispatch: dispatch::Sender, Response>, } +/// Deconstructed parts of a `Connection`. +/// +/// This allows taking apart a `Connection` at a later time, in order to +/// reclaim the IO object, and additional related pieces. +#[derive(Debug)] +pub struct Parts { + /// The original IO object used in the handshake. + pub io: T, + /// A buffer of bytes that have been read but not processed as HTTP. + /// + /// For instance, if the `Connection` is used for an HTTP upgrade request, + /// it is possible the server sent back the first bytes of the new protocol + /// along with the response upgrade. + /// + /// You will want to check for any existing bytes if you plan to continue + /// communicating on the IO object. + pub read_buf: Bytes, + _inner: (), +} + + /// A future that processes all HTTP state for the IO object. /// /// In most cases, this should just be spawned into an executor, so that it @@ -40,6 +62,40 @@ where inner: Option>, } +impl Connection +where + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, + B: Body + 'static, + B::Error: Into>, +{ + /// Return the inner IO object, and additional information. + /// + /// Only works for HTTP/1 connections. HTTP/2 connections will panic. + pub fn into_parts(self) -> Parts { + let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner(); + Parts { + io, + read_buf, + _inner: (), + } + } + + /// Poll the connection for completion, but without calling `shutdown` + /// on the underlying IO. + /// + /// This is useful to allow running a connection while doing an HTTP + /// upgrade. Once the upgrade is completed, the connection would be "done", + /// but it is not desired to actually shutdown the IO object. Instead you + /// would take it back using `into_parts`. + /// + /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) + /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) + /// to work with this function; or use the `without_shutdown` wrapper. + pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.inner.as_mut().expect("algready upgraded").poll_without_shutdown(cx) + } +} + /// A builder to configure an HTTP connection. /// /// After setting options, the builder is used to create a handshake future. @@ -52,6 +108,8 @@ pub struct Builder { h1_title_case_headers: bool, h1_preserve_header_case: bool, #[cfg(feature = "ffi")] + h1_headers_raw: bool, + #[cfg(feature = "ffi")] h1_preserve_header_order: bool, h1_read_buf_exact_size: Option, h1_max_buf_size: Option, @@ -61,11 +119,14 @@ pub struct Builder { /// /// This is a shortcut for `Builder::new().handshake(io)`. /// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake( +pub async fn handshake( io: T, -) -> crate::Result<(SendRequest, Connection)> +) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: Body + 'static, + B::Data: Send, + B::Error: Into>, { Builder::new().handshake(io).await } @@ -80,6 +141,13 @@ impl SendRequest { self.dispatch.poll_ready(cx) } + /// Waits until the dispatcher is ready + /// + /// If the associated connection is closed, this returns an Error. + pub async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + /* pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); @@ -232,6 +300,8 @@ impl Builder { h1_title_case_headers: false, h1_preserve_header_case: false, #[cfg(feature = "ffi")] + h1_headers_raw: false, + #[cfg(feature = "ffi")] h1_preserve_header_order: false, h1_max_buf_size: None, } @@ -386,6 +456,12 @@ impl Builder { self } + #[cfg(feature = "ffi")] + pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Builder { + self.h1_headers_raw = enabled; + self + } + /// Sets the exact size of the read buffer to *always* use. /// /// Note that setting this option unsets the `http1_max_buf_size` option. @@ -459,6 +535,10 @@ impl Builder { if opts.h1_preserve_header_order { conn.set_preserve_header_order(); } + #[cfg(feature = "ffi")] + if opts.h1_headers_raw { + conn.set_raw_headers(true); + } if opts.h09_responses { conn.set_h09_responses(); } diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index a8d53bafcf..ee7325d385 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -52,11 +52,14 @@ pub struct Builder { /// /// This is a shortcut for `Builder::new().handshake(io)`. /// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake( +pub async fn handshake( io: T, -) -> crate::Result<(SendRequest, Connection)> +) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: Body + 'static, + B::Data: Send, + B::Error: Into>, { Builder::new().handshake(io).await } @@ -75,6 +78,13 @@ impl SendRequest { } } + /// Waits until the dispatcher is ready + /// + /// If the associated connection is closed, this returns an Error. + pub async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + /* pub(super) async fn when_ready(self) -> crate::Result { let mut me = Some(self); @@ -175,6 +185,27 @@ impl fmt::Debug for SendRequest { // ===== impl Connection +impl Connection +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: Body + Unpin + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ + /// Returns whether the [extended CONNECT protocol][1] is enabled or not. + /// + /// This setting is configured by the server peer by sending the + /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value received from the + /// remote. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 + pub fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner.1.is_extended_connect_protocol_enabled() + } +} + impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 0abe5542df..f60bce4080 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -23,7 +23,7 @@ //! async fn main() -> Result<(), Box> { //! let target_stream = TcpStream::connect("example.com:80").await?; //! -//! let (mut request_sender, connection) = conn::handshake(target_stream).await?; +//! let (mut request_sender, connection) = conn::http1::handshake(target_stream).await?; //! //! // spawn a task to poll the connection and drive the HTTP state //! tokio::spawn(async move { @@ -52,886 +52,8 @@ //! # } //! ``` -use std::error::Error as StdError; -use std::fmt; -#[cfg(not(all(feature = "http1", feature = "http2")))] -use std::marker::PhantomData; -use std::sync::Arc; -#[cfg(feature = "http2")] -use std::time::Duration; - -use bytes::Bytes; -use futures_util::future; -use httparse::ParserConfig; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::{debug, trace}; - -use super::dispatch; -use crate::body::Body; -#[cfg(not(all(feature = "http1", feature = "http2")))] -use crate::common::Never; -use crate::common::{ - exec::{BoxSendFuture, Exec}, - task, Future, Pin, Poll, -}; -use crate::proto; -use crate::rt::Executor; -#[cfg(feature = "http1")] -use crate::upgrade::Upgraded; -use crate::{common::time::Time, rt::Timer}; -use crate::{Recv, Request, Response}; - #[cfg(feature = "http1")] pub mod http1; #[cfg(feature = "http2")] pub mod http2; -#[cfg(feature = "http1")] -type Http1Dispatcher = - proto::dispatch::Dispatcher, B, T, proto::h1::ClientTransaction>; - -#[cfg(not(feature = "http1"))] -type Http1Dispatcher = (Never, PhantomData<(T, Pin>)>); - -#[cfg(feature = "http2")] -type Http2ClientTask = proto::h2::ClientTask; - -#[cfg(not(feature = "http2"))] -type Http2ClientTask = (Never, PhantomData>>); - -pin_project! { - #[project = ProtoClientProj] - enum ProtoClient - where - B: Body, - { - H1 { - #[pin] - h1: Http1Dispatcher, - }, - H2 { - #[pin] - h2: Http2ClientTask, - }, - } -} - -/// Returns a handshake future over some IO. -/// -/// This is a shortcut for `Builder::new().handshake(io)`. -/// See [`client::conn`](crate::client::conn) for more. -pub async fn handshake(io: T) -> crate::Result<(SendRequest, Connection)> -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + 'static, - B::Data: Send, - B::Error: Into>, -{ - Builder::new().handshake(io).await -} - -/// The sender side of an established connection. -pub struct SendRequest { - dispatch: dispatch::Sender, Response>, -} - -/// A future that processes all HTTP state for the IO object. -/// -/// In most cases, this should just be spawned into an executor, so that it -/// can process incoming and outgoing messages, notice hangups, and the like. -#[must_use = "futures do nothing unless polled"] -pub struct Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: Body + 'static, -{ - inner: Option>, -} - -/// A builder to configure an HTTP connection. -/// -/// After setting options, the builder is used to create a handshake future. -#[derive(Clone, Debug)] -pub struct Builder { - pub(super) exec: Exec, - pub(super) timer: Time, - h09_responses: bool, - h1_parser_config: ParserConfig, - h1_writev: Option, - h1_title_case_headers: bool, - h1_preserve_header_case: bool, - #[cfg(feature = "ffi")] - h1_preserve_header_order: bool, - h1_read_buf_exact_size: Option, - h1_max_buf_size: Option, - #[cfg(feature = "ffi")] - h1_headers_raw: bool, - #[cfg(feature = "http2")] - h2_builder: proto::h2::client::Config, - version: Proto, -} - -#[derive(Clone, Debug)] -enum Proto { - #[cfg(feature = "http1")] - Http1, - #[cfg(feature = "http2")] - Http2, -} - -/// A future returned by `SendRequest::send_request`. -/// -/// Yields a `Response` if successful. -#[must_use = "futures do nothing unless polled"] -pub struct ResponseFuture { - inner: ResponseFutureState, -} - -enum ResponseFutureState { - Waiting(dispatch::Promise>), - // Option is to be able to `take()` it in `poll` - Error(Option), -} - -/// Deconstructed parts of a `Connection`. -/// -/// This allows taking apart a `Connection` at a later time, in order to -/// reclaim the IO object, and additional related pieces. -#[derive(Debug)] -pub struct Parts { - /// The original IO object used in the handshake. - pub io: T, - /// A buffer of bytes that have been read but not processed as HTTP. - /// - /// For instance, if the `Connection` is used for an HTTP upgrade request, - /// it is possible the server sent back the first bytes of the new protocol - /// along with the response upgrade. - /// - /// You will want to check for any existing bytes if you plan to continue - /// communicating on the IO object. - pub read_buf: Bytes, - _inner: (), -} - -// ===== impl SendRequest - -impl SendRequest { - /// Polls to determine whether this sender can be used yet for a request. - /// - /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.dispatch.poll_ready(cx) - } -} - -impl SendRequest -where - B: Body + 'static, -{ - /// Sends a `Request` on the associated connection. - /// - /// Returns a future that if successful, yields the `Response`. - /// - /// # Note - /// - /// There are some key differences in what automatic things the `Client` - /// does for you that will not be done here: - /// - /// - `Client` requires absolute-form `Uri`s, since the scheme and - /// authority are needed to connect. They aren't required here. - /// - Since the `Client` requires absolute-form `Uri`s, it can add - /// the `Host` header based on it. You must add a `Host` header yourself - /// before calling this method. - /// - Since absolute-form `Uri`s are not required, if received, they will - /// be serialized as-is. - pub fn send_request(&mut self, req: Request) -> ResponseFuture { - let inner = match self.dispatch.send(req) { - Ok(rx) => ResponseFutureState::Waiting(rx), - Err(_req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - ResponseFutureState::Error(Some(err)) - } - }; - - ResponseFuture { inner } - } -} - -impl fmt::Debug for SendRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SendRequest").finish() - } -} - -// ===== impl Connection - -impl Connection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + Unpin + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - /// Return the inner IO object, and additional information. - /// - /// Only works for HTTP/1 connections. HTTP/2 connections will panic. - pub fn into_parts(self) -> Parts { - match self.inner.expect("already upgraded") { - #[cfg(feature = "http1")] - ProtoClient::H1 { h1 } => { - let (io, read_buf, _) = h1.into_inner(); - Parts { - io, - read_buf, - _inner: (), - } - } - ProtoClient::H2 { .. } => { - panic!("http2 cannot into_inner"); - } - - #[cfg(not(feature = "http1"))] - ProtoClient::H1 { h1 } => match h1.0 {}, - } - } - - /// Poll the connection for completion, but without calling `shutdown` - /// on the underlying IO. - /// - /// This is useful to allow running a connection while doing an HTTP - /// upgrade. Once the upgrade is completed, the connection would be "done", - /// but it is not desired to actually shutdown the IO object. Instead you - /// would take it back using `into_parts`. - /// - /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) - /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) - /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { - match *self.inner.as_mut().expect("already upgraded") { - #[cfg(feature = "http1")] - ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), - #[cfg(feature = "http2")] - ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), - - #[cfg(not(feature = "http1"))] - ProtoClient::H1 { ref mut h1 } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoClient::H2 { ref mut h2, .. } => match h2.0 {}, - } - } - - /// Prevent shutdown of the underlying IO object at the end of service the request, - /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - pub fn without_shutdown(self) -> impl Future>> { - let mut conn = Some(self); - future::poll_fn(move |cx| -> Poll>> { - ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; - Poll::Ready(Ok(conn.take().unwrap().into_parts())) - }) - } - - /// Returns whether the [extended CONNECT protocol][1] is enabled or not. - /// - /// This setting is configured by the server peer by sending the - /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. - /// This method returns the currently acknowledged value received from the - /// remote. - /// - /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 - #[cfg(feature = "http2")] - pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool { - match self.inner.as_ref().unwrap() { - ProtoClient::H1 { .. } => false, - ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(), - } - } -} - -impl Future for Connection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Output = crate::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? { - proto::Dispatched::Shutdown => Poll::Ready(Ok(())), - #[cfg(feature = "http1")] - proto::Dispatched::Upgrade(pending) => match self.inner.take() { - Some(ProtoClient::H1 { h1 }) => { - let (io, buf, _) = h1.into_inner(); - pending.fulfill(Upgraded::new(io, buf)); - Poll::Ready(Ok(())) - } - _ => { - drop(pending); - unreachable!("Upgrade expects h1"); - } - }, - } - } -} - -impl fmt::Debug for Connection -where - T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, - B: Body + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Connection").finish() - } -} - -// ===== impl Builder - -impl Builder { - /// Creates a new connection builder. - #[inline] - pub fn new() -> Builder { - Builder { - exec: Exec::Default, - timer: Time::Empty, - h09_responses: false, - h1_writev: None, - h1_read_buf_exact_size: None, - h1_parser_config: Default::default(), - h1_title_case_headers: false, - h1_preserve_header_case: false, - #[cfg(feature = "ffi")] - h1_preserve_header_order: false, - h1_max_buf_size: None, - #[cfg(feature = "ffi")] - h1_headers_raw: false, - #[cfg(feature = "http2")] - h2_builder: Default::default(), - #[cfg(feature = "http1")] - version: Proto::Http1, - #[cfg(not(feature = "http1"))] - version: Proto::Http2, - } - } - - /// Provide an executor to execute background HTTP2 tasks. - pub fn executor(&mut self, exec: E) -> &mut Builder - where - E: Executor + Send + Sync + 'static, - { - self.exec = Exec::Executor(Arc::new(exec)); - self - } - - /// Provide a timer to execute background HTTP2 tasks. - pub fn timer(&mut self, timer: M) -> &mut Builder - where - M: Timer + Send + Sync + 'static, - { - self.timer = Time::Timer(Arc::new(timer)); - self - } - - /// Set whether HTTP/0.9 responses should be tolerated. - /// - /// Default is false. - pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder { - self.h09_responses = enabled; - self - } - - /// Set whether HTTP/1 connections will accept spaces between header names - /// and the colon that follow them in responses. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > No whitespace is allowed between the header field-name and colon. In - /// > the past, differences in the handling of such whitespace have led to - /// > security vulnerabilities in request routing and response handling. A - /// > server MUST reject any received request message that contains - /// > whitespace between a header field-name and colon with a response code - /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a - /// > response message before forwarding the message downstream. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_spaces_after_header_name_in_responses( - &mut self, - enabled: bool, - ) -> &mut Builder { - self.h1_parser_config - .allow_spaces_after_header_name_in_responses(enabled); - self - } - - /// Set whether HTTP/1 connections will accept obsolete line folding for - /// header values. - /// - /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when - /// parsing. - /// - /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has - /// to say about it: - /// - /// > A server that receives an obs-fold in a request message that is not - /// > within a message/http container MUST either reject the message by - /// > sending a 400 (Bad Request), preferably with a representation - /// > explaining that obsolete line folding is unacceptable, or replace - /// > each received obs-fold with one or more SP octets prior to - /// > interpreting the field value or forwarding the message downstream. - /// - /// > A proxy or gateway that receives an obs-fold in a response message - /// > that is not within a message/http container MUST either discard the - /// > message and replace it with a 502 (Bad Gateway) response, preferably - /// > with a representation explaining that unacceptable line folding was - /// > received, or replace each received obs-fold with one or more SP - /// > octets prior to interpreting the field value or forwarding the - /// > message downstream. - /// - /// > A user agent that receives an obs-fold in a response message that is - /// > not within a message/http container MUST replace each received - /// > obs-fold with one or more SP octets prior to interpreting the field - /// > value. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - /// - /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 - pub fn http1_allow_obsolete_multiline_headers_in_responses( - &mut self, - enabled: bool, - ) -> &mut Builder { - self.h1_parser_config - .allow_obsolete_multiline_headers_in_responses(enabled); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { - self.h1_writev = Some(enabled); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder { - self.h1_title_case_headers = enabled; - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Response`. It will also look for and use - /// such an extension in any provided `Request`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder { - self.h1_preserve_header_case = enabled; - self - } - - /// Set whether to support preserving original header order. - /// - /// Currently, this will record the order in which headers are received, and store this - /// ordering in a private extension on the `Response`. It will also look for and use - /// such an extension in any provided `Request`. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - #[cfg(feature = "ffi")] - pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder { - self.h1_preserve_header_order = enabled; - self - } - - /// Sets the exact size of the read buffer to *always* use. - /// - /// Note that setting this option unsets the `http1_max_buf_size` option. - /// - /// Default is an adaptive read buffer. - pub fn http1_read_buf_exact_size(&mut self, sz: Option) -> &mut Builder { - self.h1_read_buf_exact_size = sz; - self.h1_max_buf_size = None; - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// Note that setting this option unsets the `http1_read_exact_buf_size` option. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { - assert!( - max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, - "the max_buf_size cannot be smaller than the minimum that h1 specifies." - ); - - self.h1_max_buf_size = Some(max); - self.h1_read_buf_exact_size = None; - self - } - - #[cfg(feature = "ffi")] - pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self { - self.h1_headers_raw = enabled; - self - } - - /// Sets whether HTTP2 is required. - /// - /// Default is false. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_only(&mut self, enabled: bool) -> &mut Builder { - if enabled { - self.version = Proto::Http2 - } - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_stream_window_size = sz; - } - self - } - - /// Sets the max connection-level flow control for HTTP2 - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.adaptive_window = false; - self.h2_builder.initial_conn_window_size = sz; - } - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - use proto::h2::SPEC_WINDOW_SIZE; - - self.h2_builder.adaptive_window = enabled; - if enabled { - self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; - self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; - } - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - if let Some(sz) = sz.into() { - self.h2_builder.max_frame_size = sz; - } - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.h2_builder.keep_alive_interval = interval.into(); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.h2_builder.keep_alive_timeout = timeout; - self - } - - /// Sets whether HTTP2 keep-alive should apply while the connection is idle. - /// - /// If disabled, keep-alive pings are only sent while there are open - /// request/responses streams. If enabled, pings are also sent when no - /// streams are active. Does nothing if `http2_keep_alive_interval` is - /// disabled. - /// - /// Default is `false`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { - self.h2_builder.keep_alive_while_idle = enabled; - self - } - - /// Sets the maximum number of HTTP2 concurrent locally reset streams. - /// - /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more - /// details. - /// - /// The default value is determined by the `h2` crate. - /// - /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { - self.h2_builder.max_concurrent_reset_streams = Some(max); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently 1MB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { - assert!(max <= std::u32::MAX as usize); - self.h2_builder.max_send_buffer_size = max; - self - } - - /// Constructs a connection with the configured options and IO. - /// See [`client::conn`](crate::client::conn) for more. - /// - /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will - /// do nothing. - pub fn handshake( - &self, - io: T, - ) -> impl Future, Connection)>> - where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Body + 'static, - B::Data: Send, - B::Error: Into>, - { - let opts = self.clone(); - - async move { - trace!("client handshake {:?}", opts.version); - - let (tx, rx) = dispatch::channel(); - let proto = match opts.version { - #[cfg(feature = "http1")] - Proto::Http1 => { - let mut conn = proto::Conn::new(io); - conn.set_h1_parser_config(opts.h1_parser_config); - if let Some(writev) = opts.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } - if opts.h1_title_case_headers { - conn.set_title_case_headers(); - } - if opts.h1_preserve_header_case { - conn.set_preserve_header_case(); - } - #[cfg(feature = "ffi")] - if opts.h1_preserve_header_order { - conn.set_preserve_header_order(); - } - if opts.h09_responses { - conn.set_h09_responses(); - } - - #[cfg(feature = "ffi")] - conn.set_raw_headers(opts.h1_headers_raw); - - if let Some(sz) = opts.h1_read_buf_exact_size { - conn.set_read_buf_exact_size(sz); - } - if let Some(max) = opts.h1_max_buf_size { - conn.set_max_buf_size(max); - } - let cd = proto::h1::dispatch::Client::new(rx); - let dispatch = proto::h1::Dispatcher::new(cd, conn); - ProtoClient::H1 { h1: dispatch } - } - #[cfg(feature = "http2")] - Proto::Http2 => { - let h2 = proto::h2::client::handshake( - io, - rx, - &opts.h2_builder, - opts.exec.clone(), - opts.timer.clone(), - ) - .await?; - ProtoClient::H2 { h2 } - } - }; - - Ok(( - SendRequest { dispatch: tx }, - Connection { inner: Some(proto) }, - )) - } - } -} - -// ===== impl ResponseFuture - -impl Future for ResponseFuture { - type Output = crate::Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match self.inner { - ResponseFutureState::Waiting(ref mut rx) => { - Pin::new(rx).poll(cx).map(|res| match res { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_canceled) => panic!("dispatch dropped without returning error"), - }) - } - ResponseFutureState::Error(ref mut err) => { - Poll::Ready(Err(err.take().expect("polled after ready"))) - } - } - } -} - -impl fmt::Debug for ResponseFuture { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ResponseFuture").finish() - } -} - -// ===== impl ProtoClient - -impl Future for ProtoClient -where - T: AsyncRead + AsyncWrite + Send + Unpin + 'static, - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, -{ - type Output = crate::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match self.project() { - #[cfg(feature = "http1")] - ProtoClientProj::H1 { h1 } => h1.poll(cx), - #[cfg(feature = "http2")] - ProtoClientProj::H2 { h2, .. } => h2.poll(cx), - - #[cfg(not(feature = "http1"))] - ProtoClientProj::H1 { h1 } => match h1.0 {}, - #[cfg(not(feature = "http2"))] - ProtoClientProj::H2 { h2, .. } => match h2.0 {}, - } - } -} - -// assert trait markers - -trait AssertSend: Send {} -trait AssertSendSync: Send + Sync {} - -#[doc(hidden)] -impl AssertSendSync for SendRequest {} - -#[doc(hidden)] -impl AssertSend for Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: Body + 'static, - B::Data: Send, -{ -} - -#[doc(hidden)] -impl AssertSendSync for Connection -where - T: AsyncRead + AsyncWrite + Send + 'static, - B: Body + 'static, - B::Data: Send + Sync + 'static, -{ -} - -#[doc(hidden)] -impl AssertSendSync for Builder {} - -#[doc(hidden)] -impl AssertSend for ResponseFuture {} diff --git a/src/ffi/client.rs b/src/ffi/client.rs index 22cae7fbd4..637b490e0d 100644 --- a/src/ffi/client.rs +++ b/src/ffi/client.rs @@ -13,7 +13,11 @@ use super::task::{hyper_executor, hyper_task, hyper_task_return_type, AsTaskType /// An options builder to configure an HTTP client connection. pub struct hyper_clientconn_options { - builder: conn::Builder, + http1_allow_obsolete_multiline_headers_in_responses: bool, + http1_headers_raw: bool, + http1_preserve_header_case: bool, + http1_preserve_header_order: bool, + http2: bool, /// Use a `Weak` to prevent cycles. exec: WeakExec, } @@ -24,7 +28,14 @@ pub struct hyper_clientconn_options { /// send multiple requests on a single connection, such as when HTTP/1 /// keep-alive or HTTP/2 is used. pub struct hyper_clientconn { - tx: conn::SendRequest, + tx: Tx, +} + +enum Tx { + #[cfg(feature = "http1")] + Http1(conn::http1::SendRequest), + #[cfg(feature = "http2")] + Http2(conn::http2::SendRequest), } // ===== impl hyper_clientconn ===== @@ -42,13 +53,35 @@ ffi_fn! { let io = non_null! { Box::from_raw(io) ?= ptr::null_mut() }; Box::into_raw(hyper_task::boxed(async move { - options.builder.handshake::<_, crate::Recv>(io) + #[cfg(feature = "http2")] + { + if options.http2 { + return conn::http2::Builder::new() + .executor(options.exec.clone()) + .handshake::<_, crate::Recv>(io) + .await + .map(|(tx, conn)| { + options.exec.execute(Box::pin(async move { + let _ = conn.await; + })); + hyper_clientconn { tx: Tx::Http2(tx) } + }); + } + } + + conn::http1::Builder::new() + .executor(options.exec.clone()) + .http1_allow_obsolete_multiline_headers_in_responses(options.http1_allow_obsolete_multiline_headers_in_responses) + .http1_headers_raw(options.http1_headers_raw) + .http1_preserve_header_case(options.http1_preserve_header_case) + .http1_preserve_header_order(options.http1_preserve_header_order) + .handshake::<_, crate::Recv>(io) .await .map(|(tx, conn)| { options.exec.execute(Box::pin(async move { let _ = conn.await; })); - hyper_clientconn { tx } + hyper_clientconn { tx: Tx::Http1(tx) } }) })) } ?= std::ptr::null_mut() @@ -65,7 +98,10 @@ ffi_fn! { // Update request with original-case map of headers req.finalize_request(); - let fut = non_null! { &mut *conn ?= ptr::null_mut() }.tx.send_request(req.0); + let fut = match non_null! { &mut *conn ?= ptr::null_mut() }.tx { + Tx::Http1(ref mut tx) => futures_util::future::Either::Left(tx.send_request(req.0)), + Tx::Http2(ref mut tx) => futures_util::future::Either::Right(tx.send_request(req.0)), + }; let fut = async move { fut.await.map(hyper_response::wrap) @@ -93,10 +129,12 @@ unsafe impl AsTaskType for hyper_clientconn { ffi_fn! { /// Creates a new set of HTTP clientconn options to be used in a handshake. fn hyper_clientconn_options_new() -> *mut hyper_clientconn_options { - let builder = conn::Builder::new(); - Box::into_raw(Box::new(hyper_clientconn_options { - builder, + http1_allow_obsolete_multiline_headers_in_responses: false, + http1_headers_raw: false, + http1_preserve_header_case: false, + http1_preserve_header_order: false, + http2: false, exec: WeakExec::new(), })) } ?= std::ptr::null_mut() @@ -108,7 +146,7 @@ ffi_fn! { /// Pass `0` to allow lowercase normalization (default), `1` to retain original case. fn hyper_clientconn_options_set_preserve_header_case(opts: *mut hyper_clientconn_options, enabled: c_int) { let opts = non_null! { &mut *opts ?= () }; - opts.builder.http1_preserve_header_case(enabled != 0); + opts.http1_preserve_header_case = enabled != 0; } } @@ -118,7 +156,7 @@ ffi_fn! { /// Pass `0` to allow reordering (default), `1` to retain original ordering. fn hyper_clientconn_options_set_preserve_header_order(opts: *mut hyper_clientconn_options, enabled: c_int) { let opts = non_null! { &mut *opts ?= () }; - opts.builder.http1_preserve_header_order(enabled != 0); + opts.http1_preserve_header_order = enabled != 0; } } @@ -140,7 +178,6 @@ ffi_fn! { let weak_exec = hyper_executor::downgrade(&exec); std::mem::forget(exec); - opts.builder.executor(weak_exec.clone()); opts.exec = weak_exec; } } @@ -153,7 +190,7 @@ ffi_fn! { #[cfg(feature = "http2")] { let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; - opts.builder.http2_only(enabled != 0); + opts.http2 = enabled != 0; hyper_code::HYPERE_OK } @@ -175,7 +212,7 @@ ffi_fn! { /// If enabled, see `hyper_response_headers_raw()` for usage. fn hyper_clientconn_options_headers_raw(opts: *mut hyper_clientconn_options, enabled: c_int) -> hyper_code { let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; - opts.builder.http1_headers_raw(enabled != 0); + opts.http1_headers_raw = enabled != 0; hyper_code::HYPERE_OK } } @@ -188,7 +225,7 @@ ffi_fn! { /// fn hyper_clientconn_options_http1_allow_multiline_headers(opts: *mut hyper_clientconn_options, enabled: c_int) -> hyper_code { let opts = non_null! { &mut *opts ?= hyper_code::HYPERE_INVALID_ARG }; - opts.builder.http1_allow_obsolete_multiline_headers_in_responses(enabled != 0); + opts.http1_allow_obsolete_multiline_headers_in_responses = enabled != 0; hyper_code::HYPERE_OK } } diff --git a/tests/client.rs b/tests/client.rs index e1ff61ea88..f9ee93e3cc 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -232,19 +232,17 @@ macro_rules! test { // Wrapper around hyper::client::conn::Builder with set_host field to mimic // hyper::client::Builder. struct Builder { - inner: hyper::client::conn::Builder, + inner: hyper::client::conn::http1::Builder, set_host: bool, http09_responses: bool, - http2_only: bool, } impl Builder { fn new() -> Self { Self { - inner: hyper::client::conn::Builder::new(), + inner: hyper::client::conn::http1::Builder::new(), set_host: true, http09_responses: false, - http2_only: false, } } @@ -260,17 +258,10 @@ macro_rules! test { self.inner.http09_responses(val); self } - - #[allow(unused)] - fn http2_only(&mut self, val: bool) -> &mut Self { - self.http2_only = val; - self.inner.http2_only(val); - self - } } impl std::ops::Deref for Builder { - type Target = hyper::client::conn::Builder; + type Target = hyper::client::conn::http1::Builder; fn deref(&self) -> &Self::Target { &self.inner @@ -292,7 +283,7 @@ macro_rules! test { return Err(Error::UnsupportedVersion); } - if req.version() == Version::HTTP_2 && !builder.http2_only { + if req.version() == Version::HTTP_2 { return Err(Error::UnsupportedVersion); } @@ -1376,7 +1367,7 @@ mod conn { let client = async move { let tcp = tcp_connect(&addr).await.expect("connect"); - let (mut client, conn) = conn::handshake(tcp).await.expect("handshake"); + let (mut client, conn) = conn::http1::handshake(tcp).await.expect("handshake"); tokio::task::spawn(async move { conn.await.expect("http conn"); @@ -1420,7 +1411,7 @@ mod conn { let client = async move { let tcp = tcp_connect(&addr).await.expect("connect"); - let (mut client, conn) = conn::handshake(tcp).await.expect("handshake"); + let (mut client, conn) = conn::http1::handshake(tcp).await.expect("handshake"); tokio::task::spawn(async move { conn.await.expect("http conn"); @@ -1478,7 +1469,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1524,7 +1515,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1581,7 +1572,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1626,7 +1617,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1668,7 +1659,7 @@ mod conn { let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = rt.block_on(conn::http1::handshake(tcp)).unwrap(); rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); @@ -1738,7 +1729,7 @@ mod conn { shutdown_called: false, }; - let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = rt.block_on(conn::http1::handshake(io)).unwrap(); { let until_upgrade = poll_fn(|ctx| conn.poll_without_shutdown(ctx)); @@ -1824,7 +1815,7 @@ mod conn { shutdown_called: false, }; - let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = rt.block_on(conn::http1::handshake(io)).unwrap(); { let until_tunneled = poll_fn(|ctx| conn.poll_without_shutdown(ctx)); @@ -1922,9 +1913,8 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) - .http2_only(true) .handshake(io) .await .expect("http handshake"); @@ -1985,10 +1975,9 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (_client, conn) = conn::Builder::new() + let (_client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) .timer(TokioTimer) - .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) // enable while idle since we aren't sending requests @@ -2020,10 +2009,9 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) .timer(TokioTimer) - .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake::<_, Recv>(io) @@ -2058,10 +2046,9 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) .timer(TokioTimer) - .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake(io) @@ -2126,10 +2113,9 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) .timer(TokioTimer) - .http2_only(true) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) .handshake(io) @@ -2188,9 +2174,8 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) - .http2_only(true) .handshake(io) .await .expect("http handshake"); @@ -2245,9 +2230,8 @@ mod conn { }); let io = tcp_connect(&addr).await.expect("tcp connect"); - let (mut client, conn) = conn::Builder::new() + let (mut client, conn) = conn::http2::Builder::new() .executor(TokioExecutor) - .http2_only(true) .handshake::<_, Empty>(io) .await .expect("http handshake"); diff --git a/tests/server.rs b/tests/server.rs index 27d40c0af6..da13f2b1cf 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -2457,9 +2457,8 @@ async fn http2_keep_alive_with_responsive_client() { }); let tcp = connect_async(addr).await; - let (mut client, conn) = hyper::client::conn::Builder::new() + let (mut client, conn) = hyper::client::conn::http2::Builder::new() .executor(TokioExecutor) - .http2_only(true) .handshake(tcp) .await .expect("http handshake"); @@ -3077,20 +3076,32 @@ impl TestClient { let host = req.uri().host().expect("uri has no host"); let port = req.uri().port_u16().expect("uri has no port"); - let mut builder = hyper::client::conn::Builder::new(); - builder.http2_only(self.http2_only); - builder.executor(TokioExecutor); - let stream = TkTcpStream::connect(format!("{}:{}", host, port)) .await .unwrap(); - let (mut sender, conn) = builder.handshake(stream).await.unwrap(); + if self.http2_only { + let (mut sender, conn) = hyper::client::conn::http2::Builder::new() + .executor(TokioExecutor) + .handshake(stream) + .await + .unwrap(); + tokio::task::spawn(async move { + conn.await.unwrap(); + }); - tokio::task::spawn(async move { - conn.await.unwrap(); - }); + sender.send_request(req).await + } else { + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .executor(TokioExecutor) + .handshake(stream) + .await + .unwrap(); + tokio::task::spawn(async move { + conn.await.unwrap(); + }); - sender.send_request(req).await + sender.send_request(req).await + } } } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index c6489271aa..89ef72e4b7 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -8,7 +8,6 @@ use std::sync::{ use bytes::Bytes; use http_body_util::Full; -use hyper::client::conn::Builder; use hyper::server::conn::Http; use tokio::net::{TcpListener, TcpStream}; @@ -421,20 +420,32 @@ async fn async_test(cfg: __TestConfig) { async move { let stream = TcpStream::connect(addr).await.unwrap(); - let (mut sender, conn) = hyper::client::conn::Builder::new() - .executor(TokioExecutor) - .http2_only(http2_only) - .handshake(stream) - .await - .unwrap(); + let res = if http2_only { + let (mut sender, conn) = hyper::client::conn::http2::Builder::new() + .executor(TokioExecutor) + .handshake(stream) + .await + .unwrap(); - tokio::task::spawn(async move { - if let Err(err) = conn.await { - panic!("{:?}", err); - } - }); + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + sender.send_request(req).await.unwrap() + } else { + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(stream) + .await + .unwrap(); - let res = sender.send_request(req).await.unwrap(); + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + sender.send_request(req).await.unwrap() + }; assert_eq!(res.status(), cstatus, "server status"); assert_eq!(res.version(), version, "server version"); @@ -508,19 +519,32 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) .await .unwrap(); - let mut builder = Builder::new(); - builder.http2_only(http2_only); - builder.executor(TokioExecutor); - - let (mut sender, conn) = builder.handshake(stream).await.unwrap(); - - tokio::task::spawn(async move { - if let Err(err) = conn.await { - panic!("{:?}", err); - } - }); - - let resp = sender.send_request(req).await?; + let resp = if http2_only { + let (mut sender, conn) = hyper::client::conn::http2::Builder::new() + .executor(TokioExecutor) + .handshake(stream) + .await + .unwrap(); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + + sender.send_request(req).await? + } else { + let builder = hyper::client::conn::http1::Builder::new(); + let (mut sender, conn) = builder.handshake(stream).await.unwrap(); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + panic!("{:?}", err); + } + }); + + sender.send_request(req).await? + }; let (mut parts, body) = resp.into_parts();