Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Body::poll_progress #90

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

sfackler
Copy link

As described in hyperium/hyper#3121, this allows server implementations to abort body writes even if the client is not reading data.

http-body/src/lib.rs Outdated Show resolved Hide resolved
@LucioFranco
Copy link
Member

@sfackler would it be possible to include a timeout example with this? Or maybe add that example in the doc comment I am curious to see how this would be used.

@sfackler
Copy link
Author

We could add a TimeoutBody wrapper to http-body-util, though that wold require making tokio an optional dependency. I can update the PR tonight.

@LucioFranco
Copy link
Member

Maybe even at least an example so tokio doesn't need to be a public dep.

@sfackler
Copy link
Author

sfackler commented Mar 14, 2023

Here's a TimeoutBody implementation (untested):

#[pin_project]
pub struct TimeoutBody<B> {
    #[pin]
    inner: B,
    #[pin]
    timer: Sleep,
    timeout: Duration,
    waiting: bool,
}

impl<B> Body for TimeoutBody<B>
where
    B: Body,
{
    type Data = B::Data;
    type Error = TimeoutErro<B::Error>;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        if let Poll::Ready(o) = self.as_mut().project().inner.poll_frame(cx) {
            *this.waiting = false;
            return Poll::Ready(o.map(|r| r.map_err(TimeoutError::Inner)));
        }

        self.is_healthy(cx)?;
        Poll::Pending
    }

    fn is_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> {
        let this = self.project();

        if !*this.waiting {
            this.timer.reset(Instant::now() + *this.timeout);
            *this.waiting = true;
        }

        if this.timer.poll(cx).is_ready() {
            return Err(TimeoutError::TimedOut);
        }

        Ok(())
    }

    fn is_end_stream(&self) -> bool {
        self.inner.is_end_stream()
    }

    fn size_hint(&self) -> SizeHint {
        self.inner.size_hint()
    }
}

pub enum TimeoutError<E> {
    Inner(E),
    TimedOut,
}

@sfackler
Copy link
Author

@seanmonstar thoughts on this?

/// `poll_frame` calls and report an error from `poll_healthy` when time expires.
///
/// The default implementation returns `Ok(())`.
fn poll_healthy(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), Self::Error> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted this to be more poll-like we could have a return value of Poll<Result<Void, Self::Error>>>.

sfackler and others added 3 commits January 22, 2024 19:48
@sfackler sfackler changed the title Add Body::poll_healthy Add Body::poll_progress Jan 23, 2024
@sfackler
Copy link
Author

I've updated this to move from poll_healthy to a more general poll_progress, like what boats described for the Stream trait. In addition to the error reporting case I opened this to handle, the same motivations apply here as well.

olix0r added a commit to olix0r/http-body that referenced this pull request Mar 8, 2024
olix0r added a commit to linkerd/linkerd2-proxy that referenced this pull request Apr 11, 2024
hyperium/http-body#90 proposes adding a `Body::poll_progress` method to the
`Body` trait. This PR uses a fork of hyper that uses this proposed API when
awaiting stream send capacity. This supports implementing timeouts on streams
and connections in an unhealthy state to defend servers against resource
exhaustion.
olix0r added a commit to linkerd/linkerd2-proxy that referenced this pull request Apr 12, 2024
hyperium/http-body#90 proposes adding a `Body::poll_progress` method to the
`Body` trait. This PR uses a fork of hyper that uses this proposed API when
awaiting stream send capacity. This supports implementing timeouts on streams
and connections in an unhealthy state to defend servers against resource
exhaustion.
@olix0r
Copy link

olix0r commented Apr 18, 2024

We (Linkerd) are interested in moving this proposal forward.

I've been testing this with patched versions of http-body 0.3 and Hyper 0.14 and the results are promising.

I'm using this to implement a middleware that enforces a progress timeout to cancel stuck streams.

What does the process look like for finalizing this proposal? Is there anything I can do to help?

@seanmonstar
Copy link
Member

Thanks for trying it out, @olix0r! I'm glad to have at least 2 use cases for something so fundamental. We can move this forward.

In my prep for doing so, I went back and read the previous conversations, and also the poll_progress post. I think the problem and solution that withoutboats describes is similar, but different enough that perhaps we shouldn't use the same name/mechanism. That's because, this feature isn't describing making progress on the body separate from producing a frame. It's rather to propagate cancelation while waiting on backpressure to clear up.

It feels closer to oneshot::Sender::poll_closed(). Should we change the name here to poll_closed() or poll_canceled()?

At the same time, I'm writing up a longer blog post about this feature, I'll share a draft with you soon.

@sfackler
Copy link
Author

sfackler commented May 6, 2024

That seems like a reasonable-enough name to me, though it might be a bit strange to have poll_closed return Result<(), Error>? That's why the original name in the PR was poll_healthy.

@seanmonstar
Copy link
Member

Hm, thanks for pointing that out. It made me think through the problem a bit more I at first figured we could just make it poll_closed() -> Poll<()>, like the oneshot sender. But that has some things to work out:

  • A default implementation of the method needs to work, and can't check any state.
    • Well, actually, could it check Self::is_end_stream()? Maybe that could work like Send::poll_closed() and is_closed().
    • But even the default implementation of is_end_stream() will always be false, so something waiting on poll_closed() then would wait forever.
  • Should it indicate a bad condition? Or just closure?

@sfackler
Copy link
Author

sfackler commented May 6, 2024

A default implementation that just returns Poll::Ready(Ok(())) is correct, and equivalent to the current state of the world.

I don't think it would indicate closure, just drive any background IO (like poll_progress does). For convenience, it allows errors to be returned directly, but an error out of poll_healthy or whatever we call it would be equivalent to that error being returned from poll_frame.

@sfackler
Copy link
Author

sfackler commented May 6, 2024

In that sense, it is really pretty similar to Boats's poll_progress, just fallible.

We could make it infallible and force the error to come out of poll_frame but that just seems like it'd make the implementation more annoying for no real benefit.

@seanmonstar
Copy link
Member

So you find that it was needed to make background progress, too? I didn't think that was a goal of the method. Just to detect closure while a frame isn't needed, such as by polling a Sleep.

@sfackler
Copy link
Author

sfackler commented May 6, 2024

Polling a sleep is background progress IMO! :)

My initial use case was purely around detecting disconnects/timeouts and things like that, but Boats's post on poll_progress made me feel like there's no reason you couldn't have some other body implementation that wanted to do some background IO. For example, if you're proxying you may want to internally pull data from the upstream connection while the downstream connection is idle.

@seanmonstar
Copy link
Member

That's a fair point. Perhaps the naming is fine then. Or at least, worth considering if poll_progress is better than the alternatives.

Now, I think one more remaining question is about return type. I do think a poll method should return Poll<T>. But what does each value mean? Specifically, what's the difference between Ready(Ok(())) and Pending? It does feel like their different. The default will just return Ok(()). And if it were checking a Sleep, I assume it'd return Pending. What should the caller (such as inside hyper) do with that information? Whatever we determine for that should end up documented on the method.

@sfackler
Copy link
Author

sfackler commented May 6, 2024

I think that Ready(Ok(()) would mean "I'm done making progress in the background". If we don't require poll_progress to be fused, the caller would need to remember that and not call it again which seems pretty annoying TBH. Pending means that there's more background progress work to be done later, the same as any other future.

In practice, the only thing that callers would actually look for is the Ready(Err) case (see hyperium/hyper#3169).

EDIT: Actually, I think we have to require that it's fused to be able to use it properly.

@sfackler
Copy link
Author

sfackler commented May 7, 2024

Added a few bits of docs on return values, and poll_progress implementations to http-body-util combinators.

@@ -13,6 +13,7 @@ impl<'a, T: Body + Unpin + ?Sized> Future for Frame<'a, T> {
type Output = Option<Result<http_body::Frame<T::Data>, T::Error>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let _ = Pin::new(&mut self.0).poll_progress(ctx)?;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of the slightly silly way this ends up being used to preserve a Poll<Result<_, _>> return type.

@olix0r
Copy link

olix0r commented Jul 29, 2024

@seanmonstar Are you comfortable with this PR? Is there anything else to consider?

@seanmonstar
Copy link
Member

I believe generally yes. I got part way through a write-up to publish as a blog post, to explore the area more and get more feedback, since I think it's a sufficiently interesting change to some fundamental crates. I still plan to finish that up, I just had to pause as I dealt with some other contracting work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants