Skip to content

Commit

Permalink
feat(body): add Body::wrap_body
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn committed Jul 11, 2022
1 parent 0ff6213 commit 355b6fc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 11 deletions.
29 changes: 27 additions & 2 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
Expand Down Expand Up @@ -62,6 +61,7 @@ enum Kind {
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
WrappedBody(SyncWrapper<http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>>),
}

struct Extra {
Expand Down Expand Up @@ -139,6 +139,24 @@ impl Body {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

/// Create a `Body` by wrapping another body.
#[inline]
#[cfg(any(feature = "http1", feature = "http2"))]
pub fn wrap_body<B>(body: B) -> Self
where
B: HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
// if `B` is already a `Body` just return that and avoid additional boxing
match super::try_downcast(body) {
Ok(body) => body,
Err(body) => {
let body = body.map_err(crate::Error::new_user_body).boxed_unsync();
Self::new(Kind::WrappedBody(SyncWrapper::new(body)))
}
}
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();
Expand Down Expand Up @@ -329,12 +347,12 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_data(cx),
}
}

Expand Down Expand Up @@ -393,6 +411,7 @@ impl HttpBody for Body {
},
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),
Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_trailers(cx),
_ => Poll::Ready(Ok(None)),
}
}
Expand All @@ -407,6 +426,9 @@ impl HttpBody for Body {
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
// we cannot get a `&UnsyncBoxBody` through a `SyncWrapper<UnsyncBoxBody>`
// so we have no way of calling the method on the inner body
Kind::WrappedBody(..) => false,
}
}

Expand All @@ -433,6 +455,9 @@ impl HttpBody for Body {
Kind::H2 { content_length, .. } => opt_len!(content_length),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => SizeHint::default(),
// we cannot get a `&UnsyncBoxBody` through a `SyncWrapper<UnsyncBoxBody>`
// so we have no way of calling the method on the inner body
Kind::WrappedBody(..) => SizeHint::default(),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::D
}
}

pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
Expand Down
4 changes: 0 additions & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
all(feature = "client", any(feature = "http1", feature = "http2"))
))]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
Expand Down
5 changes: 0 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub(super) enum Kind {
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
HeaderTimeout,
/// Error while reading a body from connection.
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
Body,
/// Error while writing a body to connection.
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down Expand Up @@ -92,7 +91,6 @@ pub(super) enum Header {
#[derive(Debug)]
pub(super) enum User {
/// Error calling user's HttpBody::poll_data().
#[cfg(any(feature = "http1", feature = "http2"))]
Body,
/// The user aborted writing of the outgoing body.
BodyWriteAborted,
Expand Down Expand Up @@ -367,7 +365,6 @@ impl Error {
Error::new_user(User::Service).with(cause)
}

#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) fn new_user_body<E: Into<Cause>>(cause: E) -> Error {
Error::new_user(User::Body).with(cause)
}
Expand Down Expand Up @@ -440,7 +437,6 @@ impl Error {
Kind::Accept => "error accepting connection",
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
Kind::Body => "error reading a body from connection",
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::BodyWrite => "error writing a body to connection",
Expand All @@ -451,7 +447,6 @@ impl Error {
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::Io => "connection error",

#[cfg(any(feature = "http1", feature = "http2"))]
Kind::User(User::Body) => "error from user's HttpBody stream",
Kind::User(User::BodyWriteAborted) => "user body write aborted",
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down

0 comments on commit 355b6fc

Please sign in to comment.