From 214344a8cbdf0150d37f242f538fe1ac6d82a2e9 Mon Sep 17 00:00:00 2001
From: aeryz <abdullaheryz@protonmail.com>
Date: Wed, 6 Jan 2021 18:53:08 +0300
Subject: [PATCH 1/4] feat(body): add trailers to Body channel (#2260)

---
 benches/end_to_end.rs |  5 +++-
 src/body/body.rs      | 60 +++++++++++++++++++++++++++++++------------
 src/client/conn.rs    |  5 +++-
 3 files changed, 51 insertions(+), 19 deletions(-)

diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs
index 6376697afc..fde3c670cc 100644
--- a/benches/end_to_end.rs
+++ b/benches/end_to_end.rs
@@ -8,7 +8,7 @@ use std::net::SocketAddr;
 use futures_util::future::join_all;
 
 use hyper::client::HttpConnector;
-use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server};
+use hyper::{body::HttpBody as _, Body, HeaderMap, Method, Request, Response, Server};
 
 // HTTP1
 
@@ -313,6 +313,9 @@ impl Opts {
                     for _ in 0..chunk_cnt {
                         tx.send_data(chunk.into()).await.expect("send_data");
                     }
+                    tx.send_trailers(HeaderMap::new())
+                        .await
+                        .expect("send_trailers");
                 });
                 body
             } else {
diff --git a/src/body/body.rs b/src/body/body.rs
index 4a1d6210bc..c853b24ce9 100644
--- a/src/body/body.rs
+++ b/src/body/body.rs
@@ -5,8 +5,6 @@ use std::fmt;
 
 use bytes::Bytes;
 use futures_channel::mpsc;
-#[cfg(any(feature = "http1", feature = "http2"))]
-#[cfg(feature = "client")]
 use futures_channel::oneshot;
 use futures_core::Stream; // for mpsc::Receiver
 #[cfg(feature = "stream")]
@@ -17,14 +15,16 @@ use http_body::{Body as HttpBody, SizeHint};
 use super::DecodedLength;
 #[cfg(feature = "stream")]
 use crate::common::sync_wrapper::SyncWrapper;
-use crate::common::{task, watch, Pin, Poll};
+use crate::common::Future;
 #[cfg(any(feature = "http1", feature = "http2"))]
 #[cfg(feature = "client")]
-use crate::common::{Future, Never};
+use crate::common::Never;
+use crate::common::{task, watch, Pin, Poll};
 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
 use crate::proto::h2::ping;
 
 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
+type TrailersSender = oneshot::Sender<HeaderMap>;
 
 /// A stream of `Bytes`, used when receiving bodies.
 ///
@@ -43,7 +43,8 @@ enum Kind {
     Chan {
         content_length: DecodedLength,
         want_tx: watch::Sender,
-        rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
+        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
+        trailers_rx: oneshot::Receiver<HeaderMap>,
     },
     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
     H2 {
@@ -104,7 +105,8 @@ enum DelayEof {
 #[must_use = "Sender does nothing unless sent on"]
 pub struct Sender {
     want_rx: watch::Receiver,
-    tx: BodySender,
+    data_tx: BodySender,
+    trailers_tx: Option<TrailersSender>,
 }
 
 const WANT_PENDING: usize = 1;
@@ -135,7 +137,8 @@ impl Body {
     }
 
     pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
-        let (tx, rx) = mpsc::channel(0);
+        let (data_tx, data_rx) = mpsc::channel(0);
+        let (trailers_tx, trailers_rx) = oneshot::channel();
 
         // If wanter is true, `Sender::poll_ready()` won't becoming ready
         // until the `Body` has been polled for data once.
@@ -143,11 +146,16 @@ impl Body {
 
         let (want_tx, want_rx) = watch::channel(want);
 
-        let tx = Sender { want_rx, tx };
+        let tx = Sender {
+            want_rx,
+            data_tx,
+            trailers_tx: Some(trailers_tx),
+        };
         let rx = Body::new(Kind::Chan {
             content_length,
             want_tx,
-            rx,
+            data_rx,
+            trailers_rx,
         });
 
         (tx, rx)
@@ -265,12 +273,13 @@ impl Body {
             Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
             Kind::Chan {
                 content_length: ref mut len,
-                ref mut rx,
+                ref mut data_rx,
                 ref mut want_tx,
+                ..
             } => {
                 want_tx.send(WANT_READY);
 
-                match ready!(Pin::new(rx).poll_next(cx)?) {
+                match ready!(Pin::new(data_rx).poll_next(cx)?) {
                     Some(chunk) => {
                         len.sub_if(chunk.len() as u64);
                         Poll::Ready(Some(Ok(chunk)))
@@ -348,6 +357,13 @@ impl HttpBody for Body {
                 }
                 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
             },
+            Kind::Chan {
+                ref mut trailers_rx,
+                ..
+            } => match ready!(Pin::new(trailers_rx).poll(cx)) {
+                Ok(t) => Poll::Ready(Ok(Some(t))),
+                Err(_) => Poll::Ready(Err(crate::Error::new_closed())),
+            },
             _ => Poll::Ready(Ok(None)),
         }
     }
@@ -499,7 +515,7 @@ impl Sender {
     pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
         // Check if the receiver end has tried polling for the body yet
         ready!(self.poll_want(cx)?);
-        self.tx
+        self.data_tx
             .poll_ready(cx)
             .map_err(|_| crate::Error::new_closed())
     }
@@ -520,11 +536,21 @@ impl Sender {
     /// Send data on this channel when it is ready.
     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
         self.ready().await?;
-        self.tx
+        self.data_tx
             .try_send(Ok(chunk))
             .map_err(|_| crate::Error::new_closed())
     }
 
+    /// Send trailers on this channel when it is ready.
+    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
+        self.ready().await?;
+        let tx = match self.trailers_tx.take() {
+            Some(tx) => tx,
+            None => return Err(crate::Error::new_closed()),
+        };
+        tx.send(trailers).map_err(|_| crate::Error::new_closed())
+    }
+
     /// Try to send data on this channel.
     ///
     /// # Errors
@@ -538,7 +564,7 @@ impl Sender {
     /// that doesn't have an async context. If in an async context, prefer
     /// `send_data()` instead.
     pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
-        self.tx
+        self.data_tx
             .try_send(Ok(chunk))
             .map_err(|err| err.into_inner().expect("just sent Ok"))
     }
@@ -546,7 +572,7 @@ impl Sender {
     /// Aborts the body in an abnormal fashion.
     pub fn abort(self) {
         let _ = self
-            .tx
+            .data_tx
             // clone so the send works even if buffer is full
             .clone()
             .try_send(Err(crate::Error::new_body_write_aborted()));
@@ -554,7 +580,7 @@ impl Sender {
 
     #[cfg(feature = "http1")]
     pub(crate) fn send_error(&mut self, err: crate::Error) {
-        let _ = self.tx.try_send(Err(err));
+        let _ = self.data_tx.try_send(Err(err));
     }
 }
 
@@ -600,7 +626,7 @@ mod tests {
 
         assert_eq!(
             mem::size_of::<Sender>(),
-            mem::size_of::<usize>() * 4,
+            mem::size_of::<usize>() * 5,
             "Sender"
         );
 
diff --git a/src/client/conn.rs b/src/client/conn.rs
index 23b548dacd..64f7f4803a 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -63,7 +63,10 @@ use tower_service::Service;
 
 use super::dispatch;
 use crate::body::HttpBody;
-use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll};
+use crate::common::{
+    exec::{BoxSendFuture, Exec},
+    task, Future, Pin, Poll,
+};
 use crate::proto;
 use crate::rt::Executor;
 #[cfg(feature = "http1")]

From 30afdf8e4f44e6800e74ee1e9e6010ba4463208a Mon Sep 17 00:00:00 2001
From: aeryz <abdullaheryz@protonmail.com>
Date: Sun, 10 Jan 2021 16:23:01 +0300
Subject: [PATCH 2/4] feat(body): make 'send_trailers' sync

---
 src/body/body.rs | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/body/body.rs b/src/body/body.rs
index c9ad52bc51..c8019a5806 100644
--- a/src/body/body.rs
+++ b/src/body/body.rs
@@ -559,7 +559,7 @@ impl Sender {
         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
     }
 
-    /// Send data on this channel when it is ready.
+    /// Send data on data channel when it is ready.
     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
         self.ready().await?;
         self.data_tx
@@ -567,9 +567,8 @@ impl Sender {
             .map_err(|_| crate::Error::new_closed())
     }
 
-    /// Send trailers on this channel when it is ready.
-    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
-        self.ready().await?;
+    /// Send trailers on trailers channel.
+    pub fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
         let tx = match self.trailers_tx.take() {
             Some(tx) => tx,
             None => return Err(crate::Error::new_closed()),

From 30c71328f933c68de9106795c9201417b1d68014 Mon Sep 17 00:00:00 2001
From: aeryz <abdullaheryz@protonmail.com>
Date: Sun, 10 Jan 2021 16:36:02 +0300
Subject: [PATCH 3/4] fix(benches): remove await

---
 benches/end_to_end.rs | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs
index fde3c670cc..53c6188dfb 100644
--- a/benches/end_to_end.rs
+++ b/benches/end_to_end.rs
@@ -313,9 +313,7 @@ impl Opts {
                     for _ in 0..chunk_cnt {
                         tx.send_data(chunk.into()).await.expect("send_data");
                     }
-                    tx.send_trailers(HeaderMap::new())
-                        .await
-                        .expect("send_trailers");
+                    tx.send_trailers(HeaderMap::new()).expect("send_trailers");
                 });
                 body
             } else {

From dda8b239e242d0289cf375bd2c51853ab083b834 Mon Sep 17 00:00:00 2001
From: aeryz <abdullaheryz@protonmail.com>
Date: Fri, 15 Jan 2021 08:58:07 +0300
Subject: [PATCH 4/4] feat(body): return None if poll on trailers' receiver
 returns 'Canceled'

---
 benches/end_to_end.rs | 3 +--
 src/body/body.rs      | 4 ++--
 src/client/conn.rs    | 5 +----
 3 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs
index 53c6188dfb..6376697afc 100644
--- a/benches/end_to_end.rs
+++ b/benches/end_to_end.rs
@@ -8,7 +8,7 @@ use std::net::SocketAddr;
 use futures_util::future::join_all;
 
 use hyper::client::HttpConnector;
-use hyper::{body::HttpBody as _, Body, HeaderMap, Method, Request, Response, Server};
+use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server};
 
 // HTTP1
 
@@ -313,7 +313,6 @@ impl Opts {
                     for _ in 0..chunk_cnt {
                         tx.send_data(chunk.into()).await.expect("send_data");
                     }
-                    tx.send_trailers(HeaderMap::new()).expect("send_trailers");
                 });
                 body
             } else {
diff --git a/src/body/body.rs b/src/body/body.rs
index c8019a5806..9c199fd2c8 100644
--- a/src/body/body.rs
+++ b/src/body/body.rs
@@ -382,7 +382,7 @@ impl HttpBody for Body {
                 ..
             } => match ready!(Pin::new(trailers_rx).poll(cx)) {
                 Ok(t) => Poll::Ready(Ok(Some(t))),
-                Err(_) => Poll::Ready(Err(crate::Error::new_closed())),
+                Err(_) => Poll::Ready(Ok(None)),
             },
             #[cfg(feature = "ffi")]
             Kind::Ffi(ref mut body) => body.poll_trailers(cx),
@@ -568,7 +568,7 @@ impl Sender {
     }
 
     /// Send trailers on trailers channel.
-    pub fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
+    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
         let tx = match self.trailers_tx.take() {
             Some(tx) => tx,
             None => return Err(crate::Error::new_closed()),
diff --git a/src/client/conn.rs b/src/client/conn.rs
index 64f7f4803a..23b548dacd 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -63,10 +63,7 @@ use tower_service::Service;
 
 use super::dispatch;
 use crate::body::HttpBody;
-use crate::common::{
-    exec::{BoxSendFuture, Exec},
-    task, Future, Pin, Poll,
-};
+use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll};
 use crate::proto;
 use crate::rt::Executor;
 #[cfg(feature = "http1")]