Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial implementation of ChunkedBody #502

Merged
merged 1 commit into from
Sep 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 160 additions & 4 deletions http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ mod private {
use heph::net::tcp::stream::FileSend;
use heph::net::TcpStream;

const LAST_CHUNK: &[u8] = b"0\r\n\r\n";

/// Private extention of [`Body`].
///
/// [`Body`]: super::Body
Expand Down Expand Up @@ -187,6 +189,114 @@ mod private {
}
}

/// See [`super::ChunkedBody`].
#[derive(Debug)]
pub struct SendChunkedBody<'s, 'h, 'b, B> {
pub(super) stream: &'s mut TcpStream,
pub(super) head: &'h [u8],
pub(super) body: B,
/// Slice of bytes from `body`.
pub(super) body_bytes: Option<&'b [u8]>,
pub(super) written_chunk_size: bool,
}

impl<'s, 'h, 'b, B> Future for SendChunkedBody<'s, 'h, 'b, B>
where
B: Stream<Item = io::Result<&'b [u8]>>,
{
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
// SAFETY: not moving `body: B`, ensuring it's still pinned.
#[rustfmt::skip]
let SendChunkedBody { stream, head, body, body_bytes, written_chunk_size } = unsafe { Pin::into_inner_unchecked(self) };
let mut body = unsafe { Pin::new_unchecked(body) };

// Send the HTTP head first.
// TODO: try to use vectored I/O on first call.
while !head.is_empty() {
match stream.try_send(*head) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Ok(n) => *head = &head[n..],
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
return Poll::Pending
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Poll::Ready(Err(err)),
}
}

loop {
// We have bytes we need to send.
if let Some(bytes) = body_bytes.as_mut() {
let mut size_buf = itoa::Buffer::new();
let (b1, b2) = if *written_chunk_size {
// Already written the chunk size.
("", "")
} else {
(size_buf.format(bytes.len()), "\r\n")
};

let mut bufs = [
// Chunk size.
IoSlice::new(b1.as_bytes()),
IoSlice::new(b2.as_bytes()),
IoSlice::new(bytes), // User's bytes.
IoSlice::new(b"\r\n"), // End of chunk.
];
loop {
match stream.try_send_vectored(&bufs) {
Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
Ok(mut n) => {
// FIXME: deal with `n` < `b1.len() + b2.len()`.
n -= b1.len() + b2.len();
if n >= bytes.len() {
*body_bytes = None;
break;
} else {
*bytes = &bytes[n..];
bufs[2] = IoSlice::new(bytes);
continue;
}
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
return Poll::Pending
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Poll::Ready(Err(err)),
}
}
}

// Read some bytes from the `body` stream.
match body.as_mut().poll_next(ctx) {
Poll::Ready(Some(Ok(bytes))) => {
*body_bytes = Some(bytes);
*written_chunk_size = false;
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(None) => loop {
match stream.try_send(LAST_CHUNK) {
// FIXME: properly deal with small write here.
Ok(n) if n < LAST_CHUNK.len() => {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
Ok(_) => return Poll::Ready(Ok(())),
// FIXME: properly deal with this error; can't poll
// anymore.
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
return Poll::Pending
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Poll::Ready(Err(err)),
}
},
Poll::Pending => return Poll::Pending,
}
}
}
}

/// See [`super::FileBody`].
#[derive(Debug)]
pub struct SendFileBody<'s, 'h, 'f, F> {
Expand Down Expand Up @@ -239,7 +349,7 @@ mod private {
}
}

pub(crate) use private::{PrivateBody, SendStreamingBody};
pub(crate) use private::{PrivateBody, SendChunkedBody, SendStreamingBody};
use private::{SendFileBody, SendOneshotBody};

/// An empty body.
Expand Down Expand Up @@ -408,12 +518,58 @@ where
/// Streaming body with an unknown length. Send in multiple chunks.
#[derive(Debug)]
pub struct ChunkedBody<'b, B> {
#[allow(dead_code)] // Currently unused, but need for the `Body` impl.
stream: B,
body: B,
_body_lifetime: PhantomData<&'b [u8]>,
}

// TODO: implement `Body` for `ChunkedBody`.
impl<'b, B> ChunkedBody<'b, B>
where
B: Stream<Item = io::Result<&'b [u8]>>,
{
/// Use a [`Stream`] as HTTP body with a unknown length.
///
/// If the total length of `stream` is known prefer to use
/// [`StreamingBody`].
pub const fn new(stream: B) -> ChunkedBody<'b, B> {
ChunkedBody {
body: stream,
_body_lifetime: PhantomData,
}
}
}

impl<'b, B> Body<'b> for ChunkedBody<'b, B>
where
B: Stream<Item = io::Result<&'b [u8]>>,
{
fn length(&self) -> BodyLength {
BodyLength::Chunked
}
}

impl<'b, B> PrivateBody<'b> for ChunkedBody<'b, B>
where
B: Stream<Item = io::Result<&'b [u8]>>,
{
type WriteBody<'s, 'h> = SendChunkedBody<'s, 'h, 'b, B>;

fn write_message<'s, 'h>(
self,
stream: &'s mut TcpStream,
head: &'h [u8],
) -> Self::WriteBody<'s, 'h>
where
'b: 'h,
{
SendChunkedBody {
stream,
body: self.body,
head,
body_bytes: None,
written_chunk_size: false,
}
}
}

/// Body that sends the entire file `F`.
#[derive(Debug)]
Expand Down