diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 56a6dd882..999bfa561 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -237,6 +237,36 @@ impl fmt::Debug for Body { } } +impl hyper::body::Body for Body { + type Data = Bytes; + type Error = crate::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll, Self::Error>>> { + match self.inner { + Inner::Reusable(ref mut bytes) => { + let out = bytes.split_off(0); + if out.is_empty() { + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(hyper::body::Frame::data(out)))) + } + }, + Inner::Streaming { ref mut body, ref mut timeout } => { + if let Some(timeout) = timeout { + if let Poll::Ready(()) = timeout.as_mut().poll(cx) { + return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); + } + } + Poll::Ready(futures_core::ready!(Pin::new(body).poll_frame(cx)) + .map(|opt_chunk| opt_chunk.map_err(crate::error::body))) + } + } + } +} + // ===== impl TotalTimeoutBody ===== pub(crate) fn total_timeout(body: B, timeout: Pin>) -> TotalTimeoutBody { diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index de10bab24..b757c1738 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -52,6 +52,8 @@ use quinn::TransportConfig; #[cfg(feature = "http3")] use quinn::VarInt; +type HyperResponseFuture = hyper_util::client::legacy::ResponseFuture; + /// An asynchronous `Client` to make Requests with. /// /// The Client has various configuration values to tweak, but the defaults @@ -612,7 +614,7 @@ impl ClientBuilder { connector.set_timeout(config.connect_timeout); connector.set_verbose(config.connection_verbose); - let mut builder = HyperClient::builder(); + let mut builder = HyperClient::builder(hyper_util::rt::TokioExecutor::new()); if matches!(config.http_version_pref, HttpVersionPref::Http2) { builder.http2_only(true); } @@ -1651,7 +1653,7 @@ impl ClientBuilder { } } -type HyperClient = hyper_util::client::legacy::Client; +type HyperClient = hyper_util::client::legacy::Client; impl Default for Client { fn default() -> Self { @@ -1829,7 +1831,7 @@ impl Client { } _ => { let mut req = builder - .body(body.into_stream()) + .body(body) .expect("valid request parts"); *req.headers_mut() = headers.clone(); ResponseFuture::Default(self.inner.hyper.request(req)) @@ -2194,7 +2196,7 @@ impl PendingRequest { let mut req = hyper::Request::builder() .method(self.method.clone()) .uri(uri) - .body(body.into_stream()) + .body(body) .expect("valid request parts"); *req.headers_mut() = self.headers.clone(); ResponseFuture::Default(self.client.hyper.request(req)) @@ -2423,7 +2425,7 @@ impl Future for PendingRequest { let mut req = hyper::Request::builder() .method(self.method.clone()) .uri(uri.clone()) - .body(body.into_stream()) + .body(body) .expect("valid request parts"); *req.headers_mut() = headers.clone(); std::mem::swap(self.as_mut().headers(), &mut headers); diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 8d54c0d1d..dcff434cb 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -214,22 +214,31 @@ impl Decoder { } } -impl Stream for Decoder { - type Item = Result; +impl HttpBody for Decoder { + type Data = Bytes; + type Error = crate::Error; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Do a read or poll for a pending decoder value. + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll, Self::Error>>> { match self.inner { #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { Poll::Ready(Ok(inner)) => { self.inner = inner; - self.poll_next(cx) + self.poll_frame(cx) } Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))), Poll::Pending => Poll::Pending, }, - Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), + Inner::PlainText(ref mut body) => { + match futures_core::ready!(Pin::new(body).poll_frame(cx)) { + Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))), + Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))), + None => Poll::Ready(None), + } + }, #[cfg(feature = "gzip")] Inner::Gzip(ref mut decoder) => { match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { @@ -256,18 +265,6 @@ impl Stream for Decoder { } } } -} - -impl HttpBody for Decoder { - type Data = Bytes; - type Error = crate::Error; - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll, Self::Error>>> { - self.poll_next(cx) - } fn size_hint(&self) -> http_body::SizeHint { match self.inner { @@ -279,6 +276,11 @@ impl HttpBody for Decoder { } } +fn empty() -> ResponseBody { + use http_body_util::{combinators::BoxBody, BodyExt, Empty}; + BoxBody::new(Empty::new().map_err(|never| match never {})) +} + impl Future for Pending { type Output = Result; @@ -297,12 +299,12 @@ impl Future for Pending { .expect("just peeked Some") .unwrap_err())); } - None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), + None => return Poll::Ready(Ok(Inner::PlainText(empty()))), }; let _body = std::mem::replace( &mut self.0, - IoStream(Body::empty().into_stream()).peekable(), + IoStream(empty()).peekable(), ); match self.1 {