Skip to content

Commit

Permalink
Add Body::from_stream (#1752)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn authored Feb 13, 2023
1 parent 8685796 commit b6cc689
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
4 changes: 3 additions & 1 deletion axum-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

# Unreleased

- None.
- **added:** Add `Body::from_stream` ([#1752])

[#1752]: https://github.com/tokio-rs/axum/pull/1752

# 0.3.2 (20. January, 2023)

Expand Down
56 changes: 47 additions & 9 deletions axum-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use crate::{BoxError, Error};
use bytes::Bytes;
use bytes::{Buf, BufMut};
use futures_util::stream::Stream;
use futures_util::stream::{BoxStream, Stream};
use futures_util::{StreamExt, TryStreamExt};
use http::HeaderMap;
use http_body::Body as _;
use std::pin::Pin;
Expand Down Expand Up @@ -91,7 +92,21 @@ where

/// The body type used in axum requests and responses.
#[derive(Debug)]
pub struct Body(BoxBody);
pub struct Body(Inner);

enum Inner {
Boxed(BoxBody),
Stream(BoxStream<'static, Result<Bytes, Error>>),
}

impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Boxed(_) => f.debug_tuple("Boxed").finish(),
Self::Stream(_) => f.debug_tuple("Stream").finish(),
}
}
}

impl Body {
/// Create a new `Body` that wraps another [`http_body::Body`].
Expand All @@ -100,13 +115,24 @@ impl Body {
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
try_downcast(body).unwrap_or_else(|body| Self(Inner::Boxed(boxed(body))))
}

/// Create an empty body.
pub fn empty() -> Self {
Self::new(http_body::Empty::new())
}

/// Create a new `Body` from a [`Stream`].
pub fn from_stream<S, T, E>(stream: S) -> Self
where
S: Stream<Item = Result<T, E>> + Send + 'static,
T: Into<Bytes> + 'static,
E: Into<BoxError> + 'static,
{
let stream = stream.map_ok(Into::into).map_err(Error::new).boxed();
Self(Inner::Stream(stream))
}
}

impl Default for Body {
Expand Down Expand Up @@ -143,26 +169,38 @@ impl http_body::Body for Body {
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(&mut self.0).poll_data(cx)
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match &mut self.0 {
Inner::Boxed(boxed) => Pin::new(boxed).poll_data(cx),
Inner::Stream(stream) => Pin::new(stream).poll_next(cx),
}
}

#[inline]
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> std::task::Poll<Result<Option<HeaderMap>, Self::Error>> {
Pin::new(&mut self.0).poll_trailers(cx)
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match &mut self.0 {
Inner::Boxed(boxed) => Pin::new(boxed).poll_trailers(cx),
Inner::Stream(_) => Poll::Ready(Ok(None)),
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
match &self.0 {
Inner::Boxed(boxed) => boxed.size_hint(),
Inner::Stream(_) => Default::default(),
}
}

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
match &self.0 {
Inner::Boxed(boxed) => boxed.is_end_stream(),
Inner::Stream(_) => false,
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion axum/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

# Unreleased

- None.
- **added:** Add `Body::from_stream` ([#1752])

[#1752]: https://github.com/tokio-rs/axum/pull/1752

# 0.6.6 (12. February, 2023)

Expand Down

0 comments on commit b6cc689

Please sign in to comment.