Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(async) Add a Write-like interface for responses #1066

Closed
jebrosen opened this issue Aug 2, 2019 · 9 comments
Closed

(async) Add a Write-like interface for responses #1066

jebrosen opened this issue Aug 2, 2019 · 9 comments
Labels
feedback wanted User feedback is needed

Comments

@jebrosen
Copy link
Collaborator

jebrosen commented Aug 2, 2019

We should have an AsyncWrite responder type and/or change Response to be based on AsyncWrite instead of or alongside AsyncRead. This will make it easier to implement SSE among other things.

For most cases routes should be able to use a channel to work around this while it is not implemented, but it will be less efficient and code might be more difficult to understand.

@lovasoa
Copy link

lovasoa commented Aug 2, 2019

Related: #813

@SergioBenitez
Copy link
Member

I spent some time with @carllerche chatting about bringing such an interface to Hyper in 0.13, and my understanding is that he's onboard. If the main interface in Hyper 0.13 is a write-based one, then implementing this in Rocket should be relatively straightforward. Implementing a read-based interface on-top of a write-based interface is itself straightforward, of course.

To clarify, and for posterity, a write-based interface would allow you to write a Responder as follows:

impl Responder for MyType {
    fn respond_to(self, request: &Request, out: &mut Out) -> impl Future<_> {
        for _ in 0..5 {
            sleep(5).await;
            write!(out, "data: hello\n").await;
        }

        write!(out, "\n").await;
    }
}

This responder writes out 5 server-sent events, each with data of "hello", one every 5 seconds.

A read-based implementation of the above, while not significantly more code, is, in my opinion, significantly more difficult to write and understand:

struct MyType {
    remaining: usize,
    next_write: Time,
}

impl Read for MyType {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        let now = Time::now();
        if now > self.next_write && self.remaining > 0 {
            write!(buf, "data: hello\n")?;
            self.next_write = now + 5.seconds();
            self.remaining -= 1;

            if self.remaining == 0 {
                write!(buf, "\n")?;
            }
        }
    }
}

For many types of responses, a read-based interface is the write approach, especially as many types already have a read-based implementation (Vec, String, and so on). For others, such as SSE, the write-based approach is clearly the way to go. We should support both.

@jebrosen
Copy link
Collaborator Author

jebrosen commented Aug 3, 2019

Implementing a read-based interface on-top of a write-based interface is itself straightforward, of course.

I am concerned about the ergonomics of a read-like operation that has to both consume and expose a write-like interface, or vice versa. One particular example is automatic ETag generation, which needs to know when the data is finished so it can do a checksum. Under AsyncRead, this is when 0 bytes are read from the stream. Under AsyncWrite, this is when poll_close() has completed. Compression will likely have the same or similar problems.

The solution to this point may be "don't do anything that requires the body to actually 'finish'" -- with SSE, it might never finish! -- and that is fine but we should be aware of it and any similar code that already exists and make this point very clear in the documentation and migration guide or CHANGELOG.

@carllerche
Copy link

As I mentioned to @SergioBenitez, I am open to the best solution. I have not spent a lot of time considering the proposed API. For me, it would be a question of weighing the pros / cons of each avenue.

Either way, I don't think it should impact the core tower::Service trait. It would only impact the HttpBody trait.

@jebrosen
Copy link
Collaborator Author

jebrosen commented Sep 2, 2019

I've been trying to draft pseudocode implementations of Responder for several use cases under both current Read-like interface and a proposed Write-like interface and hit a snag.

The flow currently defined by all of tower-service, hyper, and Rocket is approximately fn handle(Request) -> Response. The proposed flow for Responder, if I'm understanding correctly, is fn respond_to(Request, &mut Response) -> (). The problem with this formulation is that (afaict) it prevents fairings from making any alterations to the response unless we significantly change the entire request/response flow. respond_to is called before fairings are run, at which point it's too late.

If we kept the current Read-like interface, I think we could still provide a Write-like adapter used as follows:

async fn streaming_route() -> AsyncWrittenResponse {
    AsyncWrittenResponse::from(|writer| async move {
        for _ in 0..5 {
            sleep(5).await;
            write!(writer, "data: hello\n").await;
        }

        write!(writer, "\n").await;
    })
}

@jebrosen
Copy link
Collaborator Author

I have had this at the back of my mind for a while but not given too much dedicated attention to it. Here are some of my thoughts on the low-level request->response mechanism.

I will refer to operations like SSE as a "pushing response", and reading a file off of the disk as a "pullable response". I will use synchronous terminology, but the arguments almost entirely apply the same to the asynchronous equivalent.

  • (request, mut output) -> () vs (request) -> response aka "push vs pull". If the underlying interface (in hyper, tower, etc.) is push-based as in hyper 0.10, both pullable and pushing responses are easy to adapt: pushing responses can push into the underlying writer, and pullable responses can be streamed fairly efficiently with something like copy_into. If the underlying interface is pull-based as in hyper 0.13, as far as I can see pushing responses are automatically penalized because of the buffering required between the response side pushing and the read side pulling. I see this as a point in favor of an underlying write-like interface for performance.
  • (request<'a>) -> response<'a>. Rocket's responses can borrow from some of the request data, so some task spawning and passing data through a channel is currently required. I actually don't know if any interface change in hyper can help with this, because some of the data in 'a lives on the stack somewhere in rocket. This does not necessitate any additional buffering of response data on top of the previous point, just a bit of indirection. But if hyper uses a primarily Write-based interface, we shouldn't need this indirection anymore.
  • Consequences of a push-based interface for other fairings and responders. This includes my previous comment, where we have to rethink on_response fairings. It also includes the different ergonomics of different response types like Compression, ETag calculation, or SSE. I have sketched a few imaginary responders in the case of either a push-based or pull-based API, and I don't think it's possible to make both ETag calculation (prefers read-like) and SSE (prefers write-like) maximally ergonomic at the same time; one or the other will suffer from an inverted control flow. I see this as a point in favor of a primarily pullable interface for Responder, based on the assumption that most responses are themselves pullable in nature.

@jebrosen
Copy link
Collaborator Author

I think it's time to revive and tackle this this issue now that I've had some more time to work with things and hyper 0.13 is released.

There are actually two/three orthogonal aspects to consider here:

  • The "push-based" vs "pull-based" model I described already.
    • In the pull-based model only, the "response borrows from request" problem appears.
  • The use of chunks (the Bytes) object vs AsyncRead or AsyncWrite.

Note that:

  • hyper 0.10 used a push-based model with a Writer
  • hyper 0.13 uses a pull-based model with Chunks.

Rocket 0.4 exposes a pull-based model with Read. Adapting this to hyper 0.10 is logically "easy": response.copy_to(hyper_response_writer).

Adapting Rocket's current pull-based AsyncRead model (in the async branch) to hyper 0.13 is tricky for two reasons:

  1. hyper wants to run a handler, giving it a request, and get a response back. However in Rocket responses can borrow from data on the stack inside the handler function, and it could be between difficult and impossible to change that. The current unoptimized solution is to spawn a background task that does the "rocket" side of the processing, and use an mpsc channel to pass the chunks back to the "hyper" side.
  2. hyper wants chunks (Bytes). Currently, I made an adapter into_bytes_stream that reads chunks from an AsyncRead and returns Bytes objects. It's pretty much not optimized at all.

I propose to solve these as follows:

  1. We can stop using any channels by leveraging Pin and unsafe to create a single "response unit". This data structure would be along the lines of (Request, Response<'request>, StreamState<'response>).
  2. I want to seriously consider the tradeoffs of using Stream<Item=Bytes> as the underlying interface instead of AsyncRead. As a rough guess, I think this would make rocket.rs simpler and make responder.rs and custom implementations of Responder harder.

Note that similar concerns apply to DataStream. Currently it implements AsyncRead when perhaps it should implement Stream<Item=Bytes> in addition or instead. However, it is much easier from Rocket's to change or add that implementation than it is with Responder.


Another note:

@SergioBenitez mentioned:

For others, such as SSE, the write-based approach is clearly the way to go. We should support both.

I actually found that this is not necessarily the case. I made an SSE implementation (https://git.jebrosen.com/jeb/rocket-rooms/src/commit/28479ce5d73b761f3c32f397726f8d28487ba0d3/src/sse/v2.rs) that converts a Stream<Item=sse::Event> into an AsyncRead. With crates like async_stream, these kinds of interfaces are actually fairly approachable now.

A stream-based version of the "write-like" example might look like this:

impl Responder for MyType {
    fn respond_to(self, request: &Request) -> impl Stream<_> {
        async_stream! {
            for _ in 0..5 {
                delay_for(5 seconds).await;
                yield "data: hello\n";
            }

            yield "\n";
        }
    }
}

@jebrosen jebrosen added the feedback wanted User feedback is needed label Dec 13, 2019
@notriddle
Copy link
Contributor

notriddle commented Dec 15, 2019

One of the main trade-offs you were mentioning is that middleware, like etag and gzip, wants to have a pull interface, while event systems (SSE, WebSocket, long polling) want a push interface.

While it would probably be a little bit of a wart, I've seen other places that will offer a pull interface by default, and use a hijack() function that takes the connection out of the pool, bypasses all of the middleware, and lets you push directly onto the socket. It seems like this is what the event systems want anyway (you sure don't want etags, and gzip is kind of a waste for tiny event frames). Other systems that are really opinionated about how they pump their data, like if you to use sendfile(), or implement an HTTP CONNECT proxy, would also want to forego automatic response transformations.

Though actually grabbing the raw socket like Discourse's Hijack middleware does is probably a non-starter since Rocket supports HTTP/2, and it uses a single socket is used for multiple responders at once. You can code up your own implementation of AsyncWrite, and Rust's future system makes it easy enough to implement backpressure on top of it, but you still don't get the raw socket. You also wouldn't get raw socket access if TLS is used.

@DanielJoyce
Copy link

Websockets and see just don't fit in the normal middleware flow because they aren't necessarily http and so it doesn't apply.

By definition the middleware idea of a response goes out the window. It should just be documented that if you 'hijack' a connection and upgrade it into a websocket, the response stuff just won't fire. But it might make sense to add another middleware hook for these kinds of events so things like logging middleware could could be like 'oh it got converted, I need to do something'.

Also, does this bring in the idea perhaps of websocket or sse 'middleware'? Something that can wrap around their connection lifecycle and provide hooks useful for other things as well?

SergioBenitez added a commit that referenced this issue Apr 28, 2021
This reworks the entire 'response::stream' module for async streams.

Resolves #1066.
SergioBenitez added a commit that referenced this issue Apr 28, 2021
This reworks the entire 'response::stream' module for async streams.

Resolves #1066.
SergioBenitez added a commit that referenced this issue Apr 28, 2021
This reworks the entire 'response::stream' module for async streams.

Resolves #1066.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feedback wanted User feedback is needed
Projects
None yet
Development

No branches or pull requests

6 participants