From 40df9b8b282f80e5ce02247bee4a99cbe69d7707 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Thu, 9 Mar 2023 19:54:11 -0500 Subject: [PATCH 1/5] Add Body::poll_healthy --- http-body/src/lib.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/http-body/src/lib.rs b/http-body/src/lib.rs index 12077a2..e34c0db 100644 --- a/http-body/src/lib.rs +++ b/http-body/src/lib.rs @@ -50,6 +50,21 @@ pub trait Body { cx: &mut Context<'_>, ) -> Poll, Self::Error>>>; + /// Determine if the body is still in a healthy state without polling for the next frame. + /// + /// `Body` consumers can use this method to check if the body has entered an error state even + /// when the consumer is not yet ready to try to read the next frame. Since healthiness is not + /// an operation that completes, this method returns just a `Result` rather than a `Poll`. + /// + /// For example, a `Body` implementation could maintain a timer counting down betwen + /// `poll_frame` calls and report an error from `poll_healthy` when time expires. + /// + /// The default implementation returns `Ok(())`. + fn poll_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> { + let _ = cx; + Ok(()) + } + /// Returns `true` when the end of stream has been reached. /// /// An end of stream means that `poll_frame` will return `None`. From 385a612c5a40a947a2aeb6f94a08203449972d7e Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Sun, 12 Mar 2023 11:23:13 -0400 Subject: [PATCH 2/5] Update http-body/src/lib.rs Co-authored-by: nickelc --- http-body/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-body/src/lib.rs b/http-body/src/lib.rs index e34c0db..f66cf86 100644 --- a/http-body/src/lib.rs +++ b/http-body/src/lib.rs @@ -56,7 +56,7 @@ pub trait Body { /// when the consumer is not yet ready to try to read the next frame. Since healthiness is not /// an operation that completes, this method returns just a `Result` rather than a `Poll`. /// - /// For example, a `Body` implementation could maintain a timer counting down betwen + /// For example, a `Body` implementation could maintain a timer counting down between /// `poll_frame` calls and report an error from `poll_healthy` when time expires. /// /// The default implementation returns `Ok(())`. From 395b18e36584ae5280cf0205ce7e0b4ced219597 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Mon, 22 Jan 2024 19:56:39 -0500 Subject: [PATCH 3/5] Rename to poll_progress --- http-body/src/lib.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/http-body/src/lib.rs b/http-body/src/lib.rs index f66cf86..31b70fb 100644 --- a/http-body/src/lib.rs +++ b/http-body/src/lib.rs @@ -50,19 +50,18 @@ pub trait Body { cx: &mut Context<'_>, ) -> Poll, Self::Error>>>; - /// Determine if the body is still in a healthy state without polling for the next frame. + /// Attempt to progress the body's state without pulling a new frame. /// - /// `Body` consumers can use this method to check if the body has entered an error state even - /// when the consumer is not yet ready to try to read the next frame. Since healthiness is not - /// an operation that completes, this method returns just a `Result` rather than a `Poll`. + /// `Body` consumers can use this method to allow the `Body` implementation to continue to + /// perform work even when the consumer is not yet ready to read the next frame. For example, + /// a `Body` implementation could maintain a timer counting down between `poll_frame` calls and + /// report an error from `poll_progress` when time expires. /// - /// For example, a `Body` implementation could maintain a timer counting down between - /// `poll_frame` calls and report an error from `poll_healthy` when time expires. - /// - /// The default implementation returns `Ok(())`. - fn poll_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> { + /// Consumers are *not* required to call this method. A `Body` implementation should not depend + /// on calls to `poll_progress` to occur. + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let _ = cx; - Ok(()) + Poll::Ready(Ok(())) } /// Returns `true` when the end of stream has been reached. From 79b98c2bbfb371a2a311945e3b1eac37b49cb735 Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Tue, 7 May 2024 09:12:27 -0400 Subject: [PATCH 4/5] document return values --- http-body/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/http-body/src/lib.rs b/http-body/src/lib.rs index 31b70fb..0973143 100644 --- a/http-body/src/lib.rs +++ b/http-body/src/lib.rs @@ -53,12 +53,18 @@ pub trait Body { /// Attempt to progress the body's state without pulling a new frame. /// /// `Body` consumers can use this method to allow the `Body` implementation to continue to - /// perform work even when the consumer is not yet ready to read the next frame. For example, + /// perform work even when the consumer is not yet ready to read the next frame. For example, /// a `Body` implementation could maintain a timer counting down between `poll_frame` calls and /// report an error from `poll_progress` when time expires. /// /// Consumers are *not* required to call this method. A `Body` implementation should not depend /// on calls to `poll_progress` to occur. + /// + /// An error returned from this method is considered to be equivalent to an error returned from + /// `poll_frame`. + /// + /// Implementations must allow additional calls to this method after it returns + /// `Poll::Ready(Ok(())`. fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let _ = cx; Poll::Ready(Ok(())) From d02855c200a2bc313fc445da7c5f19609849903c Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Tue, 7 May 2024 09:19:22 -0400 Subject: [PATCH 5/5] Add poll_progress to combinators --- http-body-util/src/combinators/frame.rs | 1 + http-body-util/src/combinators/map_err.rs | 9 +++++++++ http-body-util/src/combinators/map_frame.rs | 4 ++++ http-body-util/src/combinators/with_trailers.rs | 7 +++++++ http-body-util/src/either.rs | 9 +++++++++ http-body-util/src/limited.rs | 7 +++++++ 6 files changed, 37 insertions(+) diff --git a/http-body-util/src/combinators/frame.rs b/http-body-util/src/combinators/frame.rs index 211fa08..bf8e77a 100644 --- a/http-body-util/src/combinators/frame.rs +++ b/http-body-util/src/combinators/frame.rs @@ -13,6 +13,7 @@ impl<'a, T: Body + Unpin + ?Sized> Future for Frame<'a, T> { type Output = Option, T::Error>>; fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll { + let _ = Pin::new(&mut self.0).poll_progress(ctx)?; Pin::new(&mut self.0).poll_frame(ctx) } } diff --git a/http-body-util/src/combinators/map_err.rs b/http-body-util/src/combinators/map_err.rs index 384cfc5..6edaf74 100644 --- a/http-body-util/src/combinators/map_err.rs +++ b/http-body-util/src/combinators/map_err.rs @@ -67,6 +67,15 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_progress(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(err)) => Poll::Ready(Err((this.f)(err))), + } + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } diff --git a/http-body-util/src/combinators/map_frame.rs b/http-body-util/src/combinators/map_frame.rs index 44886bd..cc55afd 100644 --- a/http-body-util/src/combinators/map_frame.rs +++ b/http-body-util/src/combinators/map_frame.rs @@ -69,6 +69,10 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index 383e1ec..dd74623 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -109,6 +109,13 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().state.project() { + StateProj::PollBody { body, .. } => body.poll_progress(cx), + _ => Poll::Ready(Ok(())), + } + } + #[inline] fn size_hint(&self) -> http_body::SizeHint { match &self.state { diff --git a/http-body-util/src/either.rs b/http-body-util/src/either.rs index 9e0cc43..ec8bccb 100644 --- a/http-body-util/src/either.rs +++ b/http-body-util/src/either.rs @@ -70,6 +70,15 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + EitherProj::Left(left) => left.poll_progress(cx).map(|poll| poll.map_err(Into::into)), + EitherProj::Right(right) => { + right.poll_progress(cx).map(|poll| poll.map_err(Into::into)) + } + } + } + fn is_end_stream(&self) -> bool { match self { Either::Left(left) => left.is_end_stream(), diff --git a/http-body-util/src/limited.rs b/http-body-util/src/limited.rs index c4c5c8b..f4b61a9 100644 --- a/http-body-util/src/limited.rs +++ b/http-body-util/src/limited.rs @@ -64,6 +64,13 @@ where Poll::Ready(res) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project() + .inner + .poll_progress(cx) + .map(|poll| poll.map_err(Into::into)) + } + fn is_end_stream(&self) -> bool { self.inner.is_end_stream() }