diff --git a/google/api_core/_rest_streaming_base.py b/google/api_core/_rest_streaming_base.py index 3bc87a96..f76ea179 100644 --- a/google/api_core/_rest_streaming_base.py +++ b/google/api_core/_rest_streaming_base.py @@ -15,6 +15,7 @@ """Helpers for server-side streaming in REST.""" from collections import deque +import logging import string from typing import Deque, Union import types @@ -23,6 +24,8 @@ import google.protobuf.message from google.protobuf.json_format import Parse +_LOGGER = logging.getLogger(__name__) + class BaseResponseIterator: """Base Iterator over REST API responses. This class should not be used directly. @@ -97,19 +100,38 @@ def _process_chunk(self, chunk: str): self._obj += char self._escape_next = not self._escape_next if char == "\\" else False + def _log_response_payload(self, response_payload: str): # pragma: NO COVER + rest_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + "Received response via REST stream", + extra={ + "response": rest_response, + }, + ) + def _create_grab(self): + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) if issubclass(self._response_message_cls, proto.Message): def grab(this): + response_payload = this._ready_objs.popleft() + if logging_enabled: # pragma: NO COVER + self._log_response_payload(response_payload) return this._response_message_cls.from_json( - this._ready_objs.popleft(), ignore_unknown_fields=True + response_payload, ignore_unknown_fields=True ) return grab elif issubclass(self._response_message_cls, google.protobuf.message.Message): def grab(this): - return Parse(this._ready_objs.popleft(), this._response_message_cls()) + response_payload = this._ready_objs.popleft() + if logging_enabled: # pragma: NO COVER + self._log_response_payload(response_payload) + return Parse(response_payload, this._response_message_cls()) return grab else: diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index 07963024..912413ff 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -17,9 +17,13 @@ import collections import functools +import logging +import pickle import warnings +import google.protobuf.json_format import grpc +import proto from google.api_core import exceptions import google.auth @@ -48,6 +52,7 @@ else: HAS_GRPC_GCP = False +_LOGGER = logging.getLogger(__name__) # The list of gRPC Callable interfaces that return iterators. _STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable) @@ -112,8 +117,29 @@ def __next__(self) -> P: if hasattr(self, "_stored_first_result"): result = self._stored_first_result del self._stored_first_result - return result - return next(self._wrapped) + else: + result = next(self._wrapped) + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) + if logging_enabled: # pragma: NO COVER + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = google.protobuf.json_format.MessageToJson(result) + else: + response_payload = ( + f"{type(result).__name__}: {str(pickle.dumps(result))}" + ) + grpc_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + f"Received response of type {type(result)} via gRPC stream", + extra={ + "response": grpc_response, + }, + ) + return result except grpc.RpcError as exc: # If the stream has already returned data, we cannot recover here. raise exceptions.from_grpc_error(exc) from exc diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py index af661430..2d833a77 100644 --- a/google/api_core/grpc_helpers_async.py +++ b/google/api_core/grpc_helpers_async.py @@ -20,17 +20,23 @@ import asyncio import functools +import logging +import pickle from typing import AsyncGenerator, Generic, Iterator, Optional, TypeVar +import google.protobuf.json_format import grpc from grpc import aio +import proto from google.api_core import exceptions, grpc_helpers # denotes the proto response type for grpc calls P = TypeVar("P") +_LOGGER = logging.getLogger(__name__) + # NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform # automatic patching for us. But that means the overhead of creating an # extra Python function spreads to every single send and receive. @@ -94,7 +100,28 @@ def __init__(self): async def read(self) -> P: try: - return await self._call.read() + result = await self._call.read() + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) + if logging_enabled: # pragma: NO COVER + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = google.protobuf.json_format.MessageToJson(result) + else: + response_payload = ( + f"{type(result).__name__}: {str(pickle.dumps(result))}" + ) + grpc_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + f"Received response of type {type(result)} via gRPC stream", + extra={ + "response": grpc_response, + }, + ) + return result except grpc.RpcError as rpc_error: raise exceptions.from_grpc_error(rpc_error) from rpc_error @@ -219,7 +246,7 @@ def create_channel( default_host=None, compression=None, attempt_direct_path: Optional[bool] = False, - **kwargs + **kwargs, ): """Create an AsyncIO secure channel with credentials. diff --git a/tests/asyncio/test_grpc_helpers_async.py b/tests/asyncio/test_grpc_helpers_async.py index aa8d5d10..6e408ca5 100644 --- a/tests/asyncio/test_grpc_helpers_async.py +++ b/tests/asyncio/test_grpc_helpers_async.py @@ -186,7 +186,7 @@ async def test_wrap_stream_errors_raised(): @pytest.mark.asyncio -async def test_wrap_stream_errors_read(): +async def test_wrap_stream_errors_read_with_grpc_error(): grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) @@ -206,6 +206,23 @@ async def test_wrap_stream_errors_read(): assert exc_info.value.response == grpc_error +@pytest.mark.asyncio +async def test_wrap_stream_errors_read_without_grpc_error(): + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + + mock_call.read = mock.AsyncMock() + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors( + multicallable, grpc_helpers_async._WrappedStreamStreamCall + ) + + wrapped_call = await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + await wrapped_call.read() + + @pytest.mark.asyncio async def test_wrap_stream_errors_aiter(): grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)