Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP StreamingService to BlockingStreaming backpressure and error propagation fixes #1328

Merged
merged 5 commits into from
Jan 21, 2021

Conversation

Scottmitch
Copy link
Member

Motivation:
StreamingHttpServiceToBlockingStreamingHttpService (used by
HttpJerseyRouterBuilder.buildBlockingStreaming) doesn't apply any
backpressure and also may not propagate error status correctly in the
event the StreamingHttpResponse fails.

Modifications:

  • PayloadPump uses the new PayloadWriter.close(Throwable) method to
    propagate status in the event the StreamingHttpResponse fails.
  • PayloadPump requests in chunks of 64 by default to apply backpresusre.
    That way if the blocking PayloadWriter APIs block, the source
    providing data will be limited to 64 chunks at most.

Result:
HttpApiConversions.toBlockingStreamingHttpService(StreamingHttpService)
applies backpressure and propagates error status to the PayloadWriter.

…pagation fixes

Motivation:
StreamingHttpServiceToBlockingStreamingHttpService (used by
HttpJerseyRouterBuilder.buildBlockingStreaming) doesn't apply any
backpressure and also may not propagate error status correctly in the
event the StreamingHttpResponse fails.

Modifications:
- PayloadPump uses the new PayloadWriter.close(Throwable) method to
  propagate status in the event the StreamingHttpResponse fails.
- PayloadPump requests in chunks of 64 by default to apply backpresusre.
  That way if the blocking PayloadWriter APIs block, the source
  providing data will be limited to 64 chunks at most.

Result:
HttpApiConversions.toBlockingStreamingHttpService(StreamingHttpService)
applies backpressure and propagates error status to the PayloadWriter.
this.cancel();
payloadWriter.write((Buffer) bufferOrTrailers);
} catch (IOException e) {
throwException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need onError

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If onNext throws the async source is responsible for catching the exception and propagating an error back downstream. Throwing the exception also effectively cancels the subscription and lets the upstream source cleanup.

We generally don't throw from terminal methods (e.g. onComplete(), onError(Throwable)) because the upstream source isn't allowed to invoke terminal methods more than once, so we must handle the error locally.

Copy link
Member

@idelpivnitskiy idelpivnitskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after outstandingDemand is improved:

@Scottmitch
Copy link
Member Author

build failure attributed to #999

Copy link
Member

@idelpivnitskiy idelpivnitskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

@Scottmitch
Copy link
Member Author

build failure attributed to #1323

@Scottmitch
Copy link
Member Author

build failure attributed to #1192

@Scottmitch Scottmitch merged commit f7f4daf into apple:main Jan 21, 2021
@Scottmitch Scottmitch deleted the http_streaming_to_blocking branch January 21, 2021 02:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants