diff --git a/http/src/body.rs b/http/src/body.rs index 981970482..db2c7afb5 100644 --- a/http/src/body.rs +++ b/http/src/body.rs @@ -47,6 +47,8 @@ mod private { use heph::net::tcp::stream::FileSend; use heph::net::TcpStream; + const LAST_CHUNK: &[u8] = b"0\r\n\r\n"; + /// Private extention of [`Body`]. /// /// [`Body`]: super::Body @@ -187,6 +189,114 @@ mod private { } } + /// See [`super::ChunkedBody`]. + #[derive(Debug)] + pub struct SendChunkedBody<'s, 'h, 'b, B> { + pub(super) stream: &'s mut TcpStream, + pub(super) head: &'h [u8], + pub(super) body: B, + /// Slice of bytes from `body`. + pub(super) body_bytes: Option<&'b [u8]>, + pub(super) written_chunk_size: bool, + } + + impl<'s, 'h, 'b, B> Future for SendChunkedBody<'s, 'h, 'b, B> + where + B: Stream>, + { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { + // SAFETY: not moving `body: B`, ensuring it's still pinned. + #[rustfmt::skip] + let SendChunkedBody { stream, head, body, body_bytes, written_chunk_size } = unsafe { Pin::into_inner_unchecked(self) }; + let mut body = unsafe { Pin::new_unchecked(body) }; + + // Send the HTTP head first. + // TODO: try to use vectored I/O on first call. + while !head.is_empty() { + match stream.try_send(*head) { + Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())), + Ok(n) => *head = &head[n..], + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + return Poll::Pending + } + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, + Err(err) => return Poll::Ready(Err(err)), + } + } + + loop { + // We have bytes we need to send. + if let Some(bytes) = body_bytes.as_mut() { + let mut size_buf = itoa::Buffer::new(); + let (b1, b2) = if *written_chunk_size { + // Already written the chunk size. + ("", "") + } else { + (size_buf.format(bytes.len()), "\r\n") + }; + + let mut bufs = [ + // Chunk size. + IoSlice::new(b1.as_bytes()), + IoSlice::new(b2.as_bytes()), + IoSlice::new(bytes), // User's bytes. + IoSlice::new(b"\r\n"), // End of chunk. + ]; + loop { + match stream.try_send_vectored(&bufs) { + Ok(0) => return Poll::Ready(Err(io::ErrorKind::WriteZero.into())), + Ok(mut n) => { + // FIXME: deal with `n` < `b1.len() + b2.len()`. + n -= b1.len() + b2.len(); + if n >= bytes.len() { + *body_bytes = None; + break; + } else { + *bytes = &bytes[n..]; + bufs[2] = IoSlice::new(bytes); + continue; + } + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + return Poll::Pending + } + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, + Err(err) => return Poll::Ready(Err(err)), + } + } + } + + // Read some bytes from the `body` stream. + match body.as_mut().poll_next(ctx) { + Poll::Ready(Some(Ok(bytes))) => { + *body_bytes = Some(bytes); + *written_chunk_size = false; + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(None) => loop { + match stream.try_send(LAST_CHUNK) { + // FIXME: properly deal with small write here. + Ok(n) if n < LAST_CHUNK.len() => { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + Ok(_) => return Poll::Ready(Ok(())), + // FIXME: properly deal with this error; can't poll + // anymore. + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + return Poll::Pending + } + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, + Err(err) => return Poll::Ready(Err(err)), + } + }, + Poll::Pending => return Poll::Pending, + } + } + } + } + /// See [`super::FileBody`]. #[derive(Debug)] pub struct SendFileBody<'s, 'h, 'f, F> { @@ -239,7 +349,7 @@ mod private { } } -pub(crate) use private::{PrivateBody, SendStreamingBody}; +pub(crate) use private::{PrivateBody, SendChunkedBody, SendStreamingBody}; use private::{SendFileBody, SendOneshotBody}; /// An empty body. @@ -408,12 +518,58 @@ where /// Streaming body with an unknown length. Send in multiple chunks. #[derive(Debug)] pub struct ChunkedBody<'b, B> { - #[allow(dead_code)] // Currently unused, but need for the `Body` impl. - stream: B, + body: B, _body_lifetime: PhantomData<&'b [u8]>, } -// TODO: implement `Body` for `ChunkedBody`. +impl<'b, B> ChunkedBody<'b, B> +where + B: Stream>, +{ + /// Use a [`Stream`] as HTTP body with a unknown length. + /// + /// If the total length of `stream` is known prefer to use + /// [`StreamingBody`]. + pub const fn new(stream: B) -> ChunkedBody<'b, B> { + ChunkedBody { + body: stream, + _body_lifetime: PhantomData, + } + } +} + +impl<'b, B> Body<'b> for ChunkedBody<'b, B> +where + B: Stream>, +{ + fn length(&self) -> BodyLength { + BodyLength::Chunked + } +} + +impl<'b, B> PrivateBody<'b> for ChunkedBody<'b, B> +where + B: Stream>, +{ + type WriteBody<'s, 'h> = SendChunkedBody<'s, 'h, 'b, B>; + + fn write_message<'s, 'h>( + self, + stream: &'s mut TcpStream, + head: &'h [u8], + ) -> Self::WriteBody<'s, 'h> + where + 'b: 'h, + { + SendChunkedBody { + stream, + body: self.body, + head, + body_bytes: None, + written_chunk_size: false, + } + } +} /// Body that sends the entire file `F`. #[derive(Debug)]