From 34a70a21fe7be04777a6e94a399e3244c3cbd44b Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 2 Jan 2024 11:53:37 -0600 Subject: [PATCH 1/2] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/replica.py | 64 +++++++++------------------- 1 file changed, 20 insertions(+), 44 deletions(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 8dfa0fda1fb5..2a78ea0e47bf 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -388,15 +388,7 @@ async def handle_request_streaming( ) -> AsyncGenerator[Any, None]: """Generator that is the entrypoint for all `stream=True` handle calls.""" request_metadata = pickle.loads(pickled_request_metadata) - if request_metadata.is_grpc_request: - # Ensure the request args are a single gRPCRequest object. - assert len(request_args) == 1 and isinstance(request_args[0], gRPCRequest) - generator = ( - self._user_callable_wrapper.call_user_method_with_grpc_unary_stream( - request_metadata, request_args[0] - ) - ) - elif request_metadata.is_http_request: + if request_metadata.is_http_request: assert len(request_args) == 1 and isinstance( request_args[0], StreamingHTTPRequest ) @@ -822,39 +814,6 @@ async def wrap_user_method_call(self, request_metadata: RequestMetadata): self.error_counter.inc(tags={"route": request_metadata.route}) raise user_exception from None - async def call_user_method_with_grpc_unary_stream( - self, request_metadata: RequestMetadata, request: gRPCRequest - ) -> AsyncGenerator[Tuple[RayServegRPCContext, bytes], None]: - """Call a user method that is expected to be a generator. - - Deserializes gRPC request into protobuf object and pass into replica's runner - method. Returns a generator of serialized protobuf bytes from the replica. - """ - async with self.wrap_user_method_call(request_metadata): - user_method = self.get_runner_method(request_metadata) - user_request = pickle.loads(request.grpc_user_request) - if GRPC_CONTEXT_ARG_NAME in inspect.signature(user_method).parameters: - result_generator = user_method( - user_request, - grpc_context=request_metadata.grpc_context, - ) - else: - result_generator = user_method(user_request) - if inspect.iscoroutine(result_generator): - result_generator = await result_generator - - if inspect.isgenerator(result_generator): - for result in result_generator: - yield request_metadata.grpc_context, result.SerializeToString() - elif inspect.isasyncgen(result_generator): - async for result in result_generator: - yield request_metadata.grpc_context, result.SerializeToString() - else: - raise TypeError( - "When using `stream=True`, the called method must be a generator " - f"function, but '{user_method.__name__}' is not." - ) - async def call_user_method_grpc_unary( self, request_metadata: RequestMetadata, request: gRPCRequest ) -> Tuple[RayServegRPCContext, bytes]: @@ -977,17 +936,34 @@ async def call_user_method_generator( assert ( not request_metadata.is_http_request ), "HTTP requests should go through `call_user_method`." + user_method = self.get_runner_method(request_metadata) + if request_metadata.is_grpc_request: + assert len(request_args) == 1 and isinstance( + request_args[0], gRPCRequest + ) + request_args = pickle.loads(request_args[0].grpc_user_request) + if GRPC_CONTEXT_ARG_NAME in inspect.signature(user_method).parameters: + request_kwargs = { + GRPC_CONTEXT_ARG_NAME: request_metadata.grpc_context + } + result_generator = user_method(*request_args, **request_kwargs) if inspect.iscoroutine(result_generator): result_generator = await result_generator if inspect.isgenerator(result_generator): for result in result_generator: - yield result + if request_metadata.is_grpc_request: + yield request_metadata.grpc_context, result.SerializeToString() + else: + yield result elif inspect.isasyncgen(result_generator): async for result in result_generator: - yield result + if request_metadata.is_grpc_request: + yield request_metadata.grpc_context, result.SerializeToString() + else: + yield result else: raise TypeError( "When using `stream=True`, the called method must be a generator " From 8ad2650967b9acda52dc5fa712933036d88a504f Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 2 Jan 2024 12:41:38 -0600 Subject: [PATCH 2/2] fix Signed-off-by: Edward Oakes --- python/ray/serve/_private/replica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 2a78ea0e47bf..46dba38c25f4 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -942,7 +942,7 @@ async def call_user_method_generator( assert len(request_args) == 1 and isinstance( request_args[0], gRPCRequest ) - request_args = pickle.loads(request_args[0].grpc_user_request) + request_args = (pickle.loads(request_args[0].grpc_user_request),) if GRPC_CONTEXT_ARG_NAME in inspect.signature(user_method).parameters: request_kwargs = { GRPC_CONTEXT_ARG_NAME: request_metadata.grpc_context