-
Notifications
You must be signed in to change notification settings - Fork 867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/grpc streaming #2186
Feature/grpc streaming #2186
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2186 +/- ##
==========================================
- Coverage 71.45% 71.32% -0.13%
==========================================
Files 73 73
Lines 3296 3306 +10
Branches 57 57
==========================================
+ Hits 2355 2358 +3
- Misses 941 948 +7
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a bunch of minor feedback but not sure I have enough context on what this PR is trying to do to give system-level feedback
docs/grpc_api.md
Outdated
@@ -70,3 +71,28 @@ python ts_scripts/torchserve_grpc_client.py infer densenet161 examples/image_cla | |||
```bash | |||
python ts_scripts/torchserve_grpc_client.py unregister densenet161 | |||
``` | |||
## GRPC Server Side Streaming | |||
TorchServe GRPC APIs adds a server side streaming of the inference API "StreamPredictions" to allow a sequence of inference responses to be sent over the same GRPC stream. This new API is only recommended for the use case when the inference full response latency is high, and the inference intermediate results are sent to client. This new API automatically forces the batchSize to be one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n00b q: what does intermediate response mean? I initially understood this feature as send partial batches back so what's the scenario in which it'd be useful to use this feature? Or is this ineternal only to the large model work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msaroufim TS backend message protocol does not allow send partial batch(eg, batchSize=10, only send 5 batches result) to frontend. (see code).
This feature is used for use case such as generative AI where the latency is pretty high to generate full result. This feature allows users to send partial result back to client gradually.
if type(data) is list: | ||
for i in range (3): | ||
send_intermediate_predict_response(["hello"], context.request_ids, "Intermediate Prediction success", 200, context) | ||
return ["hello world "] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an async request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be async. Customized handler can decides sync or async based on its real use case.
@@ -56,8 +62,8 @@ public BaseModelRequest getRequest(String threadName, WorkerState state) | |||
} | |||
|
|||
public void sendResponse(ModelWorkerResponse message) { | |||
boolean jobDone = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable name is a bit confusing, outside of the context of streaming - the job is not done yet, maybe streamcomplete or something of the sort would be cleaerer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most use cases are non-streaming. They only require one single message retrieving for a batch of jobs. Var "jobDone" is used to reflect if the message retrieving is completed for a batch of jobs.
@@ -201,8 +200,9 @@ public void pollBatch(String threadId, long waitTime, Map<String, Job> jobsRepo) | |||
logger.trace("get first job: {}", Objects.requireNonNull(j).getJobId()); | |||
|
|||
jobsRepo.put(j.getJobId(), j); | |||
// describe request job batch size always is 1 | |||
if (j.getCmd() == WorkerCommands.DESCRIBE) { | |||
// batch size always is 1 for describe request job and stream prediction request job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understood this limitation why batch size 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the generative AI, it is expensive to process one single request. It will make latency higher if batch size >1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really true. We might want a batch of streams as well.It is upto the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another issue with batch_size
> 1 is that there isn't a way to differentiate which stream chunk belongs to which request. Maybe we can utilize the requestId
in the job to associate the chunk to request but that is assigned to be a uuid
when the frontend receives a request but the client is unable to differentiate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are two issues when batch size is set >1.
- it breaks current the protocol b/w frontend and backend. eg. some request intermediate result are success, some are failures.
- the latency most likely will be even higher if batch size is larger than 1.
) | ||
) | ||
|
||
print(response.msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these prints necessary? slightly worries that messages will fill out our CI logs which are already long to make search frustrating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is helpful for debugging the regression test failure point (ie. which model registration fails).
for resp in responses: | ||
prediction = resp.prediction.decode("utf-8") | ||
print(prediction) | ||
except grpc.RpcError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also catch the UnicodeDecodeError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not necessary to exit client for utf error. Only rpc error is fatal.
for resp in responses: | ||
prediction.append(resp.prediction.decode("utf-8")) | ||
|
||
return " ".join(prediction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this be a list of partial predictions so a single prediction or a list of multiplee predictions with batch size 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prediction is a list of partial prediction responses. Here returns joining all of the partial response together to make the later comparing with expected values much easier.
docs/grpc_api.md
Outdated
@@ -70,3 +71,28 @@ python ts_scripts/torchserve_grpc_client.py infer densenet161 examples/image_cla | |||
```bash | |||
python ts_scripts/torchserve_grpc_client.py unregister densenet161 | |||
``` | |||
## GRPC Server Side Streaming | |||
TorchServe GRPC APIs adds a server side streaming of the inference API "StreamPredictions" to allow a sequence of inference responses to be sent over the same GRPC stream. This new API is only recommended for the use case when the inference full response latency is high, and the inference intermediate results are sent to client. This new API automatically forces the batchSize to be one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TorchServe GRPC APIs adds a server side streaming of the inference API "StreamPredictions" to allow a sequence of inference responses to be sent over the same GRPC stream. This new API is only recommended for the use case when the inference full response latency is high, and the inference intermediate results are sent to client. This new API automatically forces the batchSize to be one. | |
TorchServe GRPC APIs adds a server side streaming of the inference API "StreamPredictions" to allow a sequence of inference responses to be sent over the same GRPC stream. This new API is only recommended for the use case when the inference latency of the full response is high and the inference intermediate results are sent to client. An example could be LLMs for generative applications, where generating "n" number of tokens can have high latency, in this case user can receive each generated token once ready until the full response completes. This new API automatically forces the batchSize to be one. |
ModelInferenceRequest inferReq = (ModelInferenceRequest) req; | ||
boolean streamNext = true; | ||
while (streamNext) { | ||
reply = replies.poll(responseTimeout, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like responseTimeout is same for streaming and non-streaming as well. clients might want different timeouts for streaming and non streaming api right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
responseTimeout is planned to move to model level config.
Description
Please read our CONTRIBUTING.md prior to creating your first pull request.
Please include a summary of the feature or issue being fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
This PR support the new feature of GRPC server side streaming. it includes:
Fixes #(issue)
#2180
Type of change
Please delete options that are not relevant.
Feature/Issue validation/testing
Please describe the Unit or Integration tests that you ran to verify your changes and relevant result summary. Provide instructions so it can be reproduced.
Please also list any relevant details for your test configuration.
reg.txt
Checklist:
Did you have fun?
Have you added tests that prove your fix is effective or that this feature works?
Has code been commented, particularly in hard-to-understand areas?
Have you made corresponding changes to the documentation?