Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.7.0 #1583

Merged
merged 12 commits into from
Apr 21, 2023
5 changes: 4 additions & 1 deletion axum-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ futures-util = { version = "0.3", default-features = false, features = ["alloc"]
http = "0.2.7"
http-body = "0.4.5"
mime = "0.3.16"
pin-project-lite = "0.2.7"
sync_wrapper = "0.1.1"
tower-layer = "0.3"
tower-service = "0.3"

Expand All @@ -35,7 +37,8 @@ tracing = { version = "0.1.37", default-features = false, optional = true }
rustversion = "1.0.9"

[dev-dependencies]
axum = { path = "../axum", version = "0.6.0", features = ["headers"] }
axum = { path = "../axum", version = "0.6.0" }
axum-extra = { path = "../axum-extra", features = ["typed-header"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14.24"
tokio = { version = "1.25.0", features = ["macros"] }
Expand Down
157 changes: 148 additions & 9 deletions axum-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
use crate::{BoxError, Error};
use bytes::Bytes;
use bytes::{Buf, BufMut};
use http_body::Body;
use futures_util::stream::Stream;
use futures_util::TryStream;
use http::HeaderMap;
use http_body::Body as _;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use sync_wrapper::SyncWrapper;

/// A boxed [`Body`] trait object.
///
/// This is used in axum as the response body type for applications. It's
/// necessary to unify multiple response bodies types into one.
pub type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;
type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, Error>;

/// Convert a [`http_body::Body`] into a [`BoxBody`].
pub fn boxed<B>(body: B) -> BoxBody
fn boxed<B>(body: B) -> BoxBody
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
Expand Down Expand Up @@ -55,7 +57,7 @@ where
// THE SOFTWARE.
pub(crate) async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error>
where
T: Body,
T: http_body::Body,
{
futures_util::pin_mut!(body);

Expand Down Expand Up @@ -85,6 +87,143 @@ where
Ok(vec.into())
}

/// The body type used in axum requests and responses.
#[derive(Debug)]
pub struct Body(BoxBody);

impl Body {
/// Create a new `Body` that wraps another [`http_body::Body`].
pub fn new<B>(body: B) -> Self
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
}

/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body::Empty::new())
}

/// Create a new `Body` from a [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub fn from_stream<S>(stream: S) -> Self
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
Self::new(StreamBody {
stream: SyncWrapper::new(stream),
})
}
}

impl Default for Body {
fn default() -> Self {
Self::empty()
}
}

macro_rules! body_from_impl {
($ty:ty) => {
impl From<$ty> for Body {
fn from(buf: $ty) -> Self {
Self::new(http_body::Full::from(buf))
}
}
};
}

body_from_impl!(&'static [u8]);
body_from_impl!(std::borrow::Cow<'static, [u8]>);
body_from_impl!(Vec<u8>);

body_from_impl!(&'static str);
body_from_impl!(std::borrow::Cow<'static, str>);
body_from_impl!(String);

body_from_impl!(Bytes);

impl http_body::Body for Body {
type Data = Bytes;
type Error = Error;

#[inline]
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(&mut self.0).poll_data(cx)
}

#[inline]
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Result<Option<HeaderMap>, Self::Error>> {
Pin::new(&mut self.0).poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
}

impl Stream for Body {
type Item = Result<Bytes, Error>;

#[inline]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_data(cx)
}
}

pin_project! {
struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}

impl<S> http_body::Body for StreamBody<S>
where
S: TryStream,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match futures_util::ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

#[test]
fn test_try_downcast() {
assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
Expand Down
Loading