Skip to content

Commit

Permalink
chore(deps): Add various potentially relevant changes on top of seanm…
Browse files Browse the repository at this point in the history
…onstar/reqwest#hyper-v1 (#2115)

* Implement `size_hint()` for `async_impl::body::Body`.

* Add the `Sync` bound to `S` in `async_impl::body::Body::wrap_stream()`.

* Install a `TokioTimer` for all targets other than wasm32.

* Implement `Accepts::none()`.

* Uncomment and fix `impl<T: Into<Body>> From<http::Response<T>> for Response`.

* Other fixes.

* Patch `hyper-util` until hyperium/hyper-util#95 is merged.

* A few fixes to the tests.
  • Loading branch information
jakubadamw authored Jan 31, 2024
1 parent 74dc685 commit 1112452
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 23 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ features = [
"multipart",
]

[patch.crates-io]
hyper-util = { git = "https://github.com/grafbase/hyper-util", rev = "c7acf8968d96a4408e952a097d93602d2e8ed01a" }

[features]
default = ["default-tls"]

Expand Down
13 changes: 12 additions & 1 deletion src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Body {
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + Send + 'static,
S: futures_core::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
Expand Down Expand Up @@ -264,6 +264,17 @@ impl HttpBody for Body {
),
}
}

fn size_hint(&self) -> http_body::SizeHint {
match self.inner {
Inner::Reusable(ref bytes) => {
let mut hint = http_body::SizeHint::default();
hint.set_exact(bytes.len() as u64);
hint
}
Inner::Streaming(ref body) => body.size_hint(),
}
}
}

// ===== impl TotalTimeoutBody =====
Expand Down
2 changes: 2 additions & 0 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ impl ClientBuilder {
builder.http2_keep_alive_while_idle(true);
}

#[cfg(not(target_arch = "wasm32"))]
builder.timer(hyper_util::rt::TokioTimer::new());
builder.pool_idle_timeout(config.pool_idle_timeout);
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
connector.set_keepalive(config.tcp_keepalive);
Expand Down
17 changes: 15 additions & 2 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ pub(super) struct Accepts {
pub(super) deflate: bool,
}

impl Accepts {
pub fn none() -> Self {
Self {
#[cfg(feature = "gzip")]
gzip: false,
#[cfg(feature = "brotli")]
brotli: false,
#[cfg(feature = "deflate")]
deflate: false,
}
}
}

/// A response decompressor over a non-blocking stream of chunks.
///
/// The inner decoder may be constructed asynchronously.
Expand Down Expand Up @@ -126,7 +139,7 @@ impl Decoder {
///
/// This decoder will buffer and decompress chunks that are brotlied.
#[cfg(feature = "brotli")]
fn brotli(body: Body) -> Decoder {
fn brotli(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
Expand All @@ -141,7 +154,7 @@ impl Decoder {
///
/// This decoder will buffer and decompress chunks that are deflated.
#[cfg(feature = "deflate")]
fn deflate(body: Body) -> Decoder {
fn deflate(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
Expand Down
13 changes: 8 additions & 5 deletions src/async_impl/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
use http_body_util::BodyExt;
use hyper::{HeaderMap, StatusCode, Version};
use hyper_util::client::legacy::connect::HttpInfo;
use mime::Mime;
Expand All @@ -16,6 +17,7 @@ use url::Url;

use super::body::Body;
use super::decoder::{Accepts, Decoder};
use crate::async_impl::body::ResponseBody;
#[cfg(feature = "cookies")]
use crate::cookie;

Expand Down Expand Up @@ -418,16 +420,19 @@ impl From<Response> for Body {
}
}

/*
// I'm not sure this conversion is that useful... People should be encouraged
// to use `http::Resposne`, not `reqwest::Response`.
impl<T: Into<Body>> From<http::Response<T>> for Response {
fn from(r: http::Response<T>) -> Response {
use crate::response::ResponseUrl;

let (mut parts, body) = r.into_parts();
let body = body.into();
let decoder = Decoder::detect(&mut parts.headers, body, Accepts::none());
let body: crate::async_impl::body::Body = body.into();
let decoder = Decoder::detect(
&mut parts.headers,
ResponseBody::new(body.map_err(Into::into)),
Accepts::none(),
);
let url = parts
.extensions
.remove::<ResponseUrl>()
Expand All @@ -441,7 +446,6 @@ impl<T: Into<Body>> From<http::Response<T>> for Response {
}
}

#[cfg(test)]
mod tests {
use super::Response;
Expand All @@ -463,4 +467,3 @@ mod tests {
assert_eq!(*response.url(), url);
}
}
*/
4 changes: 1 addition & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,13 @@ impl Error {
matches!(self.inner.kind, Kind::Request)
}

/*
#[cfg(not(target_arch = "wasm32"))]
/// Returns true if the error is related to connect
pub fn is_connect(&self) -> bool {
let mut source = self.source();

while let Some(err) = source {
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if let Some(hyper_err) = err.downcast_ref::<hyper_util::client::legacy::Error>() {
if hyper_err.is_connect() {
return true;
}
Expand All @@ -139,7 +138,6 @@ impl Error {

false
}
*/

/// Returns true if the error is related to the request or response body
pub fn is_body(&self) -> bool {
Expand Down
26 changes: 15 additions & 11 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async fn response_json() {

#[tokio::test]
async fn body_pipe_response() {
use http_body_util::BodyExt;
let _ = env_logger::try_init();

let server = server::http(move |mut req| async move {
Expand All @@ -141,10 +142,13 @@ async fn body_pipe_response() {
assert_eq!(req.uri(), "/pipe");
assert_eq!(req.headers()["transfer-encoding"], "chunked");

let mut full: Vec<u8> = Vec::new();
while let Some(item) = req.body_mut().next().await {
full.extend(&*item.unwrap());
}
let full: Vec<u8> = req
.into_body()
.collect()
.await
.expect("must succeed")
.to_bytes()
.to_vec();

assert_eq!(full, b"pipe me");

Expand Down Expand Up @@ -325,7 +329,6 @@ fn use_preconfigured_rustls_default() {

let root_cert_store = rustls::RootCertStore::empty();
let tls = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store)
.with_no_client_auth();

Expand Down Expand Up @@ -454,9 +457,11 @@ async fn highly_concurrent_requests_to_http2_server_with_low_max_concurrent_stre
let server = server::http_with_config(
move |req| async move {
assert_eq!(req.version(), http::Version::HTTP_2);
http::Response::default()
http::Response::<String>::default()
},
|builder| {
builder.http2().max_concurrent_streams(1);
},
|builder| builder.http2_only(true).http2_max_concurrent_streams(1),
);

let url = format!("http://{}", server.addr());
Expand All @@ -482,11 +487,10 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent
let server = delay_server::Server::new(
move |req| async move {
assert_eq!(req.version(), http::Version::HTTP_2);
http::Response::default()
http::Response::<String>::default()
},
|mut http| {
http.http2_only(true).http2_max_concurrent_streams(1);
http
|http| {
http.http2().max_concurrent_streams(1);
},
std::time::Duration::from_secs(2),
)
Expand Down
3 changes: 2 additions & 1 deletion tests/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(not(target_arch = "wasm32"))]
mod support;
use http_body_util::Empty;
use support::server;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand All @@ -25,7 +26,7 @@ async fn http_upgrade() {
.status(http::StatusCode::SWITCHING_PROTOCOLS)
.header(http::header::CONNECTION, "upgrade")
.header(http::header::UPGRADE, "foobar")
.body(hyper::Body::empty())
.body(Empty::<Vec<u8>>::new())
.unwrap()
}
});
Expand Down

0 comments on commit 1112452

Please sign in to comment.