-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http stream response via http 1.1 chunked encoding #2233
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2233 +/- ##
=======================================
Coverage 70.40% 70.40%
=======================================
Files 75 75
Lines 3392 3392
Branches 57 57
=======================================
Hits 2388 2388
Misses 1001 1001
Partials 3 3
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Client side receives the chunked data. | ||
``` | ||
def test_echo_stream_inference(): | ||
test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just promote these test_utils functions to the core library they're very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which core library? who is the user of the core library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the ts
namespace
if (job.getCmd() == WorkerCommands.STREAMPREDICT | ||
&& prediction.getHeaders().get(TS_STREAM_NEXT).equals("true")) { | ||
jobDone = false; | ||
if (jobDone) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do all the java side changes need unit tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a tech debt in frontend which is lack of unit tests, only includes integration test . This PR uses regression to test the e2e.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments except one about the logging. I would also like to see the enum used to denote the current state of a request instead of the int (especially with the current choice for which int denotes which state), but this is more of an opinion so therefore not blocking.
@@ -38,6 +45,7 @@ public class RestJob extends Job { | |||
|
|||
private ChannelHandlerContext ctx; | |||
private CompletableFuture<byte[]> responsePromise; | |||
private int numStreams; // -1: stream end; 0+: number of streams received; 0: non stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit hard to follow and slightly counterintuitive in my opinion. Why wouldnt non-streamable = -1, end of stream = 0? Now we have a disjoint set of integers like so:
streaming | no streaming | streaming |
---|---|---|
(-infinity, -1] | [0,0] | [1, infinity] |
which feels hard to keep track of mentally compared to something where the set of streaming states is not split in two by the non-streaming state.
I think this could all be alleviated with the use of an enum. You would still need an integer to track # of parts received, but it would be a bit more clear to read IMO.
Update: looking further in the code, it doesnt appear you even use the number of parts other than to check if it's > 1 (i.e you arent using it as a metric). I think this makes the argument for an enum stronger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the following 4 cases need take different actions.
-1: stream end.
0: non-stream (default).
1: 1st stream.
[2, integer.max): 2nd+ stream.
frontend/server/src/main/java/org/pytorch/serve/wlm/BatchAggregator.java
Show resolved
Hide resolved
frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerThread.java
Outdated
Show resolved
Hide resolved
if ts_stream_next is True: | ||
context.set_response_header(idx, "ts_stream_next", "true") | ||
else: | ||
if "true" == context.get_response_headers(idx).get("ts_stream_next"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: why do we need to check this? When would this be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are 2 scenarios when ts_stream_next is false.
- non-stream response.
- last stream response. Line 85 is to handle this case.
context.get_response_headers(idx).get("ts_stream_next") is a string, not bool. it's value can be:
"true": has next stream
"false": last stream
"": non-stream response.
frontend/server/src/main/java/org/pytorch/serve/wlm/WorkerThread.java
Outdated
Show resolved
Hide resolved
ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(body))); | ||
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor/Curious: Instead of writing twice, can you just write once and tell netty that this is the last chunk. Also assuming you cant do that, do you need to writeAndFlush
here or can you just write knowing that you are immediately writeAndFlush
'ing the LastHttpConent
chunk after?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needs call writeAndFlush twice, kind of similar as grpc (onComplete).
Description
Please read our CONTRIBUTING.md prior to creating your first pull request.
Please include a summary of the feature or issue being fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
Support http streaming inference response via http 1.1 chunked encoding
Fixes #(issue)
#2232
Type of change
Please delete options that are not relevant.
Feature/Issue validation/testing
Please describe the Unit or Integration tests that you ran to verify your changes and relevant result summary. Provide instructions so it can be reproduced.
Please also list any relevant details for your test configuration.
reg.txt
Checklist: