Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Unify the handle_request_streaming replica path for gRPC #42143

Merged
merged 3 commits into from
Jan 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 20 additions & 45 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
from ray.serve.config import AutoscalingConfig
from ray.serve.deployment import Deployment
from ray.serve.exceptions import RayServeException
from ray.serve.grpc_util import RayServegRPCContext
from ray.serve.schema import LoggingConfig

logger = logging.getLogger(SERVE_LOGGER_NAME)
Expand Down Expand Up @@ -379,15 +378,7 @@ async def handle_request_streaming(
) -> AsyncGenerator[Any, None]:
"""Generator that is the entrypoint for all `stream=True` handle calls."""
request_metadata = pickle.loads(pickled_request_metadata)
if request_metadata.is_grpc_request:
# Ensure the request args are a single gRPCRequest object.
assert len(request_args) == 1 and isinstance(request_args[0], gRPCRequest)
generator = (
self._user_callable_wrapper.call_user_method_with_grpc_unary_stream(
request_metadata, request_args[0]
)
)
elif request_metadata.is_http_request:
if request_metadata.is_http_request:
assert len(request_args) == 1 and isinstance(
request_args[0], StreamingHTTPRequest
)
Expand Down Expand Up @@ -813,39 +804,6 @@ async def wrap_user_method_call(self, request_metadata: RequestMetadata):
self.error_counter.inc(tags={"route": request_metadata.route})
raise user_exception from None

async def call_user_method_with_grpc_unary_stream(
self, request_metadata: RequestMetadata, request: gRPCRequest
) -> AsyncGenerator[Tuple[RayServegRPCContext, bytes], None]:
"""Call a user method that is expected to be a generator.

Deserializes gRPC request into protobuf object and pass into replica's runner
method. Returns a generator of serialized protobuf bytes from the replica.
"""
async with self.wrap_user_method_call(request_metadata):
user_method = self.get_runner_method(request_metadata)
user_request = pickle.loads(request.grpc_user_request)
if GRPC_CONTEXT_ARG_NAME in inspect.signature(user_method).parameters:
result_generator = user_method(
user_request,
grpc_context=request_metadata.grpc_context,
)
else:
result_generator = user_method(user_request)
if inspect.iscoroutine(result_generator):
result_generator = await result_generator

if inspect.isgenerator(result_generator):
for result in result_generator:
yield request_metadata.grpc_context, result.SerializeToString()
elif inspect.isasyncgen(result_generator):
async for result in result_generator:
yield request_metadata.grpc_context, result.SerializeToString()
else:
raise TypeError(
"When using `stream=True`, the called method must be a generator "
f"function, but '{user_method.__name__}' is not."
)

async def call_user_method(
self,
request_metadata: RequestMetadata,
Expand Down Expand Up @@ -948,17 +906,34 @@ async def call_user_method_generator(
assert (
not request_metadata.is_http_request
), "HTTP requests should go through `call_user_method`."

user_method = self.get_runner_method(request_metadata)
if request_metadata.is_grpc_request:
assert len(request_args) == 1 and isinstance(
request_args[0], gRPCRequest
)
request_args = (pickle.loads(request_args[0].grpc_user_request),)
if GRPC_CONTEXT_ARG_NAME in inspect.signature(user_method).parameters:
request_kwargs = {
GRPC_CONTEXT_ARG_NAME: request_metadata.grpc_context
}

result_generator = user_method(*request_args, **request_kwargs)
if inspect.iscoroutine(result_generator):
result_generator = await result_generator

if inspect.isgenerator(result_generator):
for result in result_generator:
yield result
if request_metadata.is_grpc_request:
yield request_metadata.grpc_context, result.SerializeToString()
else:
yield result
elif inspect.isasyncgen(result_generator):
async for result in result_generator:
yield result
if request_metadata.is_grpc_request:
yield request_metadata.grpc_context, result.SerializeToString()
else:
yield result
else:
raise TypeError(
"When using `stream=True`, the called method must be a generator "
Expand Down
Loading