From 1112452cb0daa6369265a826188fd75c0bd265ff Mon Sep 17 00:00:00 2001 From: Jakub Wieczorek Date: Wed, 31 Jan 2024 21:20:25 +0100 Subject: [PATCH] chore(deps): Add various potentially relevant changes on top of seanmonstar/reqwest#hyper-v1 (#2115) * Implement `size_hint()` for `async_impl::body::Body`. * Add the `Sync` bound to `S` in `async_impl::body::Body::wrap_stream()`. * Install a `TokioTimer` for all targets other than wasm32. * Implement `Accepts::none()`. * Uncomment and fix `impl> From> for Response`. * Other fixes. * Patch `hyper-util` until https://github.com/hyperium/hyper-util/pull/95 is merged. * A few fixes to the tests. --- Cargo.toml | 3 +++ src/async_impl/body.rs | 13 ++++++++++++- src/async_impl/client.rs | 2 ++ src/async_impl/decoder.rs | 17 +++++++++++++++-- src/async_impl/response.rs | 13 ++++++++----- src/error.rs | 4 +--- tests/client.rs | 26 +++++++++++++++----------- tests/upgrade.rs | 3 ++- 8 files changed, 58 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52342e45f..7e2474440 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,9 @@ features = [ "multipart", ] +[patch.crates-io] +hyper-util = { git = "https://github.com/grafbase/hyper-util", rev = "c7acf8968d96a4408e952a097d93602d2e8ed01a" } + [features] default = ["default-tls"] diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 81cc47c47..d240edcda 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -73,7 +73,7 @@ impl Body { #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] pub fn wrap_stream(stream: S) -> Body where - S: futures_core::stream::TryStream + Send + 'static, + S: futures_core::stream::TryStream + Send + Sync + 'static, S::Error: Into>, Bytes: From, { @@ -264,6 +264,17 @@ impl HttpBody for Body { ), } } + + fn size_hint(&self) -> http_body::SizeHint { + match self.inner { + Inner::Reusable(ref bytes) => { + let mut hint = http_body::SizeHint::default(); + hint.set_exact(bytes.len() as u64); + hint + } + Inner::Streaming(ref body) => body.size_hint(), + } + } } // ===== impl TotalTimeoutBody ===== diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index a2716c0c8..8389011f5 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -628,6 +628,8 @@ impl ClientBuilder { builder.http2_keep_alive_while_idle(true); } + #[cfg(not(target_arch = "wasm32"))] + builder.timer(hyper_util::rt::TokioTimer::new()); builder.pool_idle_timeout(config.pool_idle_timeout); builder.pool_max_idle_per_host(config.pool_max_idle_per_host); connector.set_keepalive(config.tcp_keepalive); diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 92b6181cf..ca417fc84 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -37,6 +37,19 @@ pub(super) struct Accepts { pub(super) deflate: bool, } +impl Accepts { + pub fn none() -> Self { + Self { + #[cfg(feature = "gzip")] + gzip: false, + #[cfg(feature = "brotli")] + brotli: false, + #[cfg(feature = "deflate")] + deflate: false, + } + } +} + /// A response decompressor over a non-blocking stream of chunks. /// /// The inner decoder may be constructed asynchronously. @@ -126,7 +139,7 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are brotlied. #[cfg(feature = "brotli")] - fn brotli(body: Body) -> Decoder { + fn brotli(body: ResponseBody) -> Decoder { use futures_util::StreamExt; Decoder { @@ -141,7 +154,7 @@ impl Decoder { /// /// This decoder will buffer and decompress chunks that are deflated. #[cfg(feature = "deflate")] - fn deflate(body: Body) -> Decoder { + fn deflate(body: ResponseBody) -> Decoder { use futures_util::StreamExt; Decoder { diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index e8e1bde7a..fe9ce3c1f 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use bytes::Bytes; use encoding_rs::{Encoding, UTF_8}; +use http_body_util::BodyExt; use hyper::{HeaderMap, StatusCode, Version}; use hyper_util::client::legacy::connect::HttpInfo; use mime::Mime; @@ -16,6 +17,7 @@ use url::Url; use super::body::Body; use super::decoder::{Accepts, Decoder}; +use crate::async_impl::body::ResponseBody; #[cfg(feature = "cookies")] use crate::cookie; @@ -418,7 +420,6 @@ impl From for Body { } } -/* // I'm not sure this conversion is that useful... People should be encouraged // to use `http::Resposne`, not `reqwest::Response`. impl> From> for Response { @@ -426,8 +427,12 @@ impl> From> for Response { use crate::response::ResponseUrl; let (mut parts, body) = r.into_parts(); - let body = body.into(); - let decoder = Decoder::detect(&mut parts.headers, body, Accepts::none()); + let body: crate::async_impl::body::Body = body.into(); + let decoder = Decoder::detect( + &mut parts.headers, + ResponseBody::new(body.map_err(Into::into)), + Accepts::none(), + ); let url = parts .extensions .remove::() @@ -441,7 +446,6 @@ impl> From> for Response { } } - #[cfg(test)] mod tests { use super::Response; @@ -463,4 +467,3 @@ mod tests { assert_eq!(*response.url(), url); } } -*/ diff --git a/src/error.rs b/src/error.rs index 711b03c96..53e1c1f57 100644 --- a/src/error.rs +++ b/src/error.rs @@ -121,14 +121,13 @@ impl Error { matches!(self.inner.kind, Kind::Request) } - /* #[cfg(not(target_arch = "wasm32"))] /// Returns true if the error is related to connect pub fn is_connect(&self) -> bool { let mut source = self.source(); while let Some(err) = source { - if let Some(hyper_err) = err.downcast_ref::() { + if let Some(hyper_err) = err.downcast_ref::() { if hyper_err.is_connect() { return true; } @@ -139,7 +138,6 @@ impl Error { false } - */ /// Returns true if the error is related to the request or response body pub fn is_body(&self) -> bool { diff --git a/tests/client.rs b/tests/client.rs index bf77f3a47..3dc2db7d8 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -132,6 +132,7 @@ async fn response_json() { #[tokio::test] async fn body_pipe_response() { + use http_body_util::BodyExt; let _ = env_logger::try_init(); let server = server::http(move |mut req| async move { @@ -141,10 +142,13 @@ async fn body_pipe_response() { assert_eq!(req.uri(), "/pipe"); assert_eq!(req.headers()["transfer-encoding"], "chunked"); - let mut full: Vec = Vec::new(); - while let Some(item) = req.body_mut().next().await { - full.extend(&*item.unwrap()); - } + let full: Vec = req + .into_body() + .collect() + .await + .expect("must succeed") + .to_bytes() + .to_vec(); assert_eq!(full, b"pipe me"); @@ -325,7 +329,6 @@ fn use_preconfigured_rustls_default() { let root_cert_store = rustls::RootCertStore::empty(); let tls = rustls::ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(root_cert_store) .with_no_client_auth(); @@ -454,9 +457,11 @@ async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_stre let server = server::http_with_config( move |req| async move { assert_eq!(req.version(), http::Version::HTTP_2); - http::Response::default() + http::Response::::default() + }, + |builder| { + builder.http2().max_concurrent_streams(1); }, - |builder| builder.http2_only(true).http2_max_concurrent_streams(1), ); let url = format!("http://{}", server.addr()); @@ -482,11 +487,10 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent let server = delay_server::Server::new( move |req| async move { assert_eq!(req.version(), http::Version::HTTP_2); - http::Response::default() + http::Response::::default() }, - |mut http| { - http.http2_only(true).http2_max_concurrent_streams(1); - http + |http| { + http.http2().max_concurrent_streams(1); }, std::time::Duration::from_secs(2), ) diff --git a/tests/upgrade.rs b/tests/upgrade.rs index de5c2904d..2ebf446ce 100644 --- a/tests/upgrade.rs +++ b/tests/upgrade.rs @@ -1,5 +1,6 @@ #![cfg(not(target_arch = "wasm32"))] mod support; +use http_body_util::Empty; use support::server; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -25,7 +26,7 @@ async fn http_upgrade() { .status(http::StatusCode::SWITCHING_PROTOCOLS) .header(http::header::CONNECTION, "upgrade") .header(http::header::UPGRADE, "foobar") - .body(hyper::Body::empty()) + .body(Empty::>::new()) .unwrap() } });