Skip to content

Commit

Permalink
less errors
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 20, 2023
1 parent 96881e7 commit 0dca943
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
30 changes: 30 additions & 0 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ impl fmt::Debug for Body {
}
}

impl hyper::body::Body for Body {
type Data = Bytes;
type Error = crate::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
match self.inner {
Inner::Reusable(ref mut bytes) => {
let out = bytes.split_off(0);
if out.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
}
},
Inner::Streaming { ref mut body, ref mut timeout } => {
if let Some(timeout) = timeout {
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
Poll::Ready(futures_core::ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)))
}
}
}
}

// ===== impl TotalTimeoutBody =====

pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
Expand Down
12 changes: 7 additions & 5 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use quinn::TransportConfig;
#[cfg(feature = "http3")]
use quinn::VarInt;

type HyperResponseFuture = hyper_util::client::legacy::ResponseFuture;

/// An asynchronous `Client` to make Requests with.
///
/// The Client has various configuration values to tweak, but the defaults
Expand Down Expand Up @@ -612,7 +614,7 @@ impl ClientBuilder {
connector.set_timeout(config.connect_timeout);
connector.set_verbose(config.connection_verbose);

let mut builder = HyperClient::builder();
let mut builder = HyperClient::builder(hyper_util::rt::TokioExecutor::new());
if matches!(config.http_version_pref, HttpVersionPref::Http2) {
builder.http2_only(true);
}
Expand Down Expand Up @@ -1651,7 +1653,7 @@ impl ClientBuilder {
}
}

type HyperClient = hyper_util::client::legacy::Client<Connector, super::body::ImplStream>;
type HyperClient = hyper_util::client::legacy::Client<Connector, super::Body>;

impl Default for Client {
fn default() -> Self {
Expand Down Expand Up @@ -1829,7 +1831,7 @@ impl Client {
}
_ => {
let mut req = builder
.body(body.into_stream())
.body(body)
.expect("valid request parts");
*req.headers_mut() = headers.clone();
ResponseFuture::Default(self.inner.hyper.request(req))
Expand Down Expand Up @@ -2194,7 +2196,7 @@ impl PendingRequest {
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(uri)
.body(body.into_stream())
.body(body)
.expect("valid request parts");
*req.headers_mut() = self.headers.clone();
ResponseFuture::Default(self.client.hyper.request(req))
Expand Down Expand Up @@ -2423,7 +2425,7 @@ impl Future for PendingRequest {
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(uri.clone())
.body(body.into_stream())
.body(body)
.expect("valid request parts");
*req.headers_mut() = headers.clone();
std::mem::swap(self.as_mut().headers(), &mut headers);
Expand Down
42 changes: 22 additions & 20 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,31 @@ impl Decoder {
}
}

impl Stream for Decoder {
type Item = Result<Bytes, error::Error>;
impl HttpBody for Decoder {
type Data = Bytes;
type Error = crate::Error;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Do a read or poll for a pending decoder value.
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.inner {
#[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
Poll::Ready(Ok(inner)) => {
self.inner = inner;
self.poll_next(cx)
self.poll_frame(cx)
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))),
Poll::Pending => Poll::Pending,
},
Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx),
Inner::PlainText(ref mut body) => {
match futures_core::ready!(Pin::new(body).poll_frame(cx)) {
Some(Ok(frame)) => Poll::Ready(Some(Ok(frame))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))),
None => Poll::Ready(None),
}
},
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Expand All @@ -256,18 +265,6 @@ impl Stream for Decoder {
}
}
}
}

impl HttpBody for Decoder {
type Data = Bytes;
type Error = crate::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.poll_next(cx)
}

fn size_hint(&self) -> http_body::SizeHint {
match self.inner {
Expand All @@ -279,6 +276,11 @@ impl HttpBody for Decoder {
}
}

fn empty() -> ResponseBody {
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
BoxBody::new(Empty::new().map_err(|never| match never {}))
}

impl Future for Pending {
type Output = Result<Inner, std::io::Error>;

Expand All @@ -297,12 +299,12 @@ impl Future for Pending {
.expect("just peeked Some")
.unwrap_err()));
}
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
None => return Poll::Ready(Ok(Inner::PlainText(empty()))),
};

let _body = std::mem::replace(
&mut self.0,
IoStream(Body::empty().into_stream()).peekable(),
IoStream(empty()).peekable(),
);

match self.1 {
Expand Down

0 comments on commit 0dca943

Please sign in to comment.