Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE for Rocket 0.4.x #1365

Closed
wants to merge 7 commits into from
Closed

SSE for Rocket 0.4.x #1365

wants to merge 7 commits into from

Conversation

ijackson
Copy link
Contributor

@ijackson ijackson commented Jul 5, 2020

Hi. I'm writing a Rocket application which uses JS SSE. I had some difficulties getting my events to get through in a timely fashion. After some investigation, I came up with this MR.

With this MR, I am able to use SSE with Rocket as expected. I implement a Read which blocks and dribbles out data as needed. (I think I will need to increase my thread pool size and I will probably have to do the multiple-domains-SSE trick to avoid the SSE connection limit bug.)

Please see the commit message of the 2nd commit for lots of discussion about my API design strategy. The API is certainly not as pretty as it might be in some alternative universe, but I think it's about as good as we're going to get for this one. At least it doesn't get in the way of 'normal' use of Rocket.

To save you digging it out of the github UI here it is:

Stream: Provide a way to flush chunks, to support SSE

Problem:

To support Server-Side Events (SSE, aka JS EventSource) it is
necessary for the server to keep open an HTTP request and dribble out
data (the event stream) as it is generated.

Currently, Rocket handles this poorly.  It likes to fill in complete
chunks.  Also there is no way to force a flush of the underlying
stream: in particular, there is a BufWriter underneath hyper.  hyper
would honour a flush request, but there is no way to send one.

Options:

Ideally the code which is producing the data would be able to
explicitly designate when a flush should occur.  Certainly it would
not be acceptable to flush all the time for all readers.

1. Invent a new kind of Body (UnbufferedChunked) which translates the
data from each Read::read() call into a single call to the stream
write, and which always flushes.  This would be a seriously invasive
change.  And it would mean that SSE systems with fast event streams
might work poorly.

2. Invent a new kind of Body which doesn't use Read at all, and
instead has a more sophisticated API.  This would be super-invasive
and heavyweight.

3. Find a way to encode the necessary information in the Read trait
protocol.

Chosen solution:

It turns out that option 3 is quite easy.  The read() call can return
an io::Error.  There are at least some errors that clearly ought not
to occur here.  An obvious one is ErrorKind::WouldBlock.

Rocket expects the reader to block.  WouldBlock is only applicable to
nonblocking objects.  And indeed the reader will generally want to
return it (once) when it is about to block.

We have the Stream treat io::Error with ErrorKind::WouldBlock, from
its reader, as a request to flush.  There are two effects: we stop
trying to accumulate a full chunk, and we issue a flush call to the
underlying writer (which, eventually, makes it all the way down into
hyper and BufWriter).

Implementation:

We provide a method ReadExt::read_max_wfs which is like read_max but
which handles the WouldBlock case specially.  It tells its caller
whether a flush was wanted.

This is implemented by adding a new code to read_max_internal.  with a
boolean to control it.  This seemed better than inventing a trait or
something.  (The other read_max call site is reading http headers in
data.rs, and I think it wants to tread WouldBlock as an error.)

Risks and downsides:

Obviously this ad-hoc extension to the Read protocol is not
particularly pretty.  At least, people who aren't doing SSE (or
similar) won't need it and can ignore it.

If for some reason the data source is actually nonblocking, this new
arrangement would spin, rather than calling the situation a fatal
error.  This possibility seems fairly remote, in production settings
at least.  To migitate this it might be possible for the loop in
Rocket::issue_response to bomb out if it notices it is sending lots of
consecutive empty chunks.

It is possible that async Rocket will want to take a different
approach entirely.  But it will definitely need to solve this problem
somehow, and naively it seems like the obvious transformation to eg
the Tokio read trait would have the same API limitation and admit the
same solution.  (Having a flush occur every time the body stream
future returns Pending would not be good for performance, I think.)

Background and references:

I found these issues already:

PS: Thanks for Rocket. This is my 2nd Rocket application so consider yourselves appreciated :-).

@igalic
Copy link

igalic commented Jul 5, 2020

the amount of documentation and reasoning in this pr is absolutely 😻

@SergioBenitez
Copy link
Member

SergioBenitez commented Jul 23, 2020

This is very clever. Excellent proposal, @ijackson! My primary concern is backwards compatibility. Previously, an Err(WouldBlock) would result in an error, whereas now it does not. This, this is technically a breaking change.

However, the documentation for WouldBlock clearly states (emphasis mine):

The operation needs to block to complete, but the blocking operation was requested to not occur.

That is, the error should only be returned if non-blocking I/O was requested. Thus, we should be in the clear.

Nevertheless, there is no guarantee, especially because we can't guarantee that a non-blocking reader from the user was passed in. Can we mitigate the chance of a true breakage? One idea is to check if the request returns WouldBlock twice and return the Error if so. Presumably, if a true WouldBlock is returned once, it will be returned twice. A Reader in the know would only return it once, of course.

@jebrosen I am in favor of this, pending a resolution to the above as well as your thoughts.

@jebrosen
Copy link
Collaborator

Assuming this actually works as intended (I haven't made any attempt at testing it!), I'm generally on board.

Instead of assigning special meaning to WouldBlock, could we require io::Error::new(_, rocket::response::RequestFlush), and check this against the Error's source() with <dyn Error>::is::<RequestFlush>? I admit that checking the source() is probably slightly more expensive than checking the kind(), but it feels slightly less hacky to me.

@ijackson
Copy link
Contributor Author

ijackson commented Jul 23, 2020 via email

@ijackson
Copy link
Contributor Author

ijackson commented Sep 6, 2020

I thought of a better way to mitigate the risk of breakage: put this behind a feature gate. ISTM that rocket applications that want this will in any case want to opt in to it, so that will be fine. Perhaps some library in SSE-using Rocket application will cause trouble by generating spurious WouldBlock errors, but having to opt into this feature will mean seeing the docs which mention this. And definitely if we make it a non-default feature no existing users will be adversely affected.

If you like this idea I will send an updated MR. I'm quite keen to drop my vendored copy of Rocket...

@SergioBenitez
Copy link
Member

@ijackson Yes, I think that would adequately resolve my concerns.

We are going to want to provide a more sophosticated entrypoint in a
moment.  This new function is going to get a new call site from a new
function in ReadExt.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
Problem:

To support Server-Side Events (SSE, aka JS EventSource) it is
necessary for the server to keep open an HTTP request and dribble out
data (the event stream) as it is generated.

Currently, Rocket handles this poorly.  It likes to fill in complete
chunks.  Also there is no way to force a flush of the underlying
stream: in particular, there is a BufWriter underneath hyper.  hyper
would honour a flush request, but there is no way to send one.

Options:

Ideally the code which is producing the data would be able to
explicitly designate when a flush should occur.  Certainly it would
not be acceptable to flush all the time for all readers.

1. Invent a new kind of Body (UnbufferedChunked) which translates the
data from each Read::read() call into a single call to the stream
write, and which always flushes.  This would be a seriously invasive
change.  And it would mean that SSE systems with fast event streams
might work poorly.

2. Invent a new kind of Body which doesn't use Read at all, and
instead has a more sophisticated API.  This would be super-invasive
and heavyweight.

3. Find a way to encode the necessary information in the Read trait
protocol.

Chosen solution:

It turns out that option 3 is quite easy.  The read() call can return
an io::Error.  There are at least some errors that clearly ought not
to occur here.  An obvious one is ErrorKind::WouldBlock.

Rocket expects the reader to block.  WouldBlock is only applicable to
nonblocking objects.  And indeed the reader will generally want to
return it (once) when it is about to block.

We have the Stream treat io::Error with ErrorKind::WouldBlock, from
its reader, as a request to flush.  There are two effects: we stop
trying to accumulate a full chunk, and we issue a flush call to the
underlying writer (which, eventually, makes it all the way down into
hyper and BufWriter).

Implementation:

We provide a method ReadExt::read_max_wfs which is like read_max but
which handles the WouldBlock case specially.  It tells its caller
whether a flush was wanted.

This is implemented by adding a new code to read_max_internal.  with a
boolean to control it.  This seemed better than inventing a trait or
something.  (The other read_max call site is reading http headers in
data.rs, and I think it wants to tread WouldBlock as an error.)

Risks and downsides:

Obviously this ad-hoc extension to the Read protocol is not
particularly pretty.  At least, people who aren't doing SSE (or
similar) won't need it and can ignore it.

If for some reason the data source is actually nonblocking, this new
arrangement would spin, rather than calling the situation a fatal
error.  This possibility seems fairly remote, in production settings
at least.  To migitate this it might be possible for the loop in
Rocket::issue_response to bomb out if it notices it is sending lots of
consecutive empty chunks.

It is possible that async Rocket will want to take a different
approach entirely.  But it will definitely need to solve this problem
somehow, and naively it seems like the obvious transformation to eg
the Tokio read trait would have the same API limitation and admit the
same solution.  (Having a flush occur every time the body stream
future returns Pending would not be good for performance, I think.)

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
This eliminates the risk of breakage to existing applications.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
@ijackson
Copy link
Contributor Author

Hi. I made that change to make the new behaviour depend on the "sse" feature flag. Please let me know if you'd like me to squash that into the actual implementation commit.

I see that Azure has tried to test this. I've looked at the logs of the failed tests and they seem to have been "cancelled" for no readily apparent reason. I don't think this is anything to do with what is in my branch. If this isn't a known problem, please let me know where to look to find the relevnt log...

Thanks,
Ian.

@inzanez
Copy link

inzanez commented Oct 1, 2020

I'm really looking forward to this! :-)

@ijackson
Copy link
Contributor Author

ijackson commented Oct 1, 2020

Should I do a null rebase and re-force-push to retry the failing CI tests?

@SergioBenitez
Copy link
Member

Should I do a null rebase and re-force-push to retry the failing CI tests?

Please do!

@ijackson
Copy link
Contributor Author

ijackson commented Oct 2, 2020

Please see #1443. That tested a tree identical to current upstream v0.4 and all its tests failed too. So I think these failures are nothing to do with my SSE changes.

Please let me know when you think this is fixed...

@jebrosen
Copy link
Collaborator

jebrosen commented Oct 2, 2020

This branch should be re-triable now; there was a bug in downloading nightly via rustup that was fixed just 10 minutes ago: rust-lang/rustup#2504 (comment)

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
@ijackson
Copy link
Contributor Author

ijackson commented Oct 2, 2020

Aha! That looks better. Thanks for the help.

You'll see I added a commit providing an example. I haven't provided a #[test] for it because I didn't have time (esp. time to think about buffering in http client libraries), but I have tested it locally with a browser.

This example was derived from an earlier test case of mine, where I
had set the chunk size to 1 to try to track down the buffering
problem.  But the low chunk size is not needed.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
Don't use the explicit chunk size for the Stream.
Make std::io::BufReader a `use`, which improves readability.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
The docs say the default is 8kb but "may change".  Also, add a comment
explaining the relevance of the `Bufreader` and its size.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
@ijackson
Copy link
Contributor Author

ijackson commented Oct 2, 2020

I think I have finished tidying this up now. Sorry for the noise and let me know if you would like me to squash any of it.

@ijackson
Copy link
Contributor Author

@SergioBenitez is there anything else you need from me? Thanks.

@SergioBenitez
Copy link
Member

@ijackson A way to conjure time is always appreciated. ;)

The example didn't actually make use of the WouldBlock feature, so it didn't work. I've fixed the example as well as various style issues. Doing a final review now, then pushing.

@SergioBenitez
Copy link
Member

Merged in c24a963 with fixes in 3970783. Will prep a new release soon.

@SergioBenitez SergioBenitez added the pr: merged This pull request was merged manually. label Oct 30, 2020
@ijackson
Copy link
Contributor Author

@ijackson A way to conjure time is always appreciated. ;)

Haha :-).

The example didn't actually make use of the WouldBlock feature, so it didn't work. I've fixed the example as well as various style issues. Doing a final review now, then pushing.

Sorry about that. I must have broken it after I tested it. Thanks for doing the fixup!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr: merged This pull request was merged manually.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants