From 2123298917b420348d0b295dcd49dc3f2958e571 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Fri, 14 Feb 2025 22:34:19 +0000 Subject: [PATCH 1/7] feat: add client debug logging support for unary-stream gRPC calls --- google/api_core/grpc_helpers.py | 29 ++++++++++++++++++++++++- google/api_core/grpc_helpers_async.py | 31 +++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index 07963024..cd1ac3e7 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -17,9 +17,13 @@ import collections import functools +import logging +import pickle import warnings +import google.protobuf.json_format import grpc +import proto from google.api_core import exceptions import google.auth @@ -48,6 +52,7 @@ else: HAS_GRPC_GCP = False +_LOGGER = logging.getLogger(__name__) # The list of gRPC Callable interfaces that return iterators. _STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable) @@ -113,7 +118,29 @@ def __next__(self) -> P: result = self._stored_first_result del self._stored_first_result return result - return next(self._wrapped) + result = next(self._wrapped) + + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) + if logging_enabled: # pragma: NO COVER + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = google.protobuf.json_format.MessageToJson(result) + else: + response_payload = ( + f"{type(result).__name__}: {pickle.dumps(result)}" + ) + grpc_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + f"Received response of type {type(result)} via gRPC stream", + extra={ + "response": grpc_response, + }, + ) + return result except grpc.RpcError as exc: # If the stream has already returned data, we cannot recover here. raise exceptions.from_grpc_error(exc) from exc diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py index af661430..01028d43 100644 --- a/google/api_core/grpc_helpers_async.py +++ b/google/api_core/grpc_helpers_async.py @@ -20,17 +20,23 @@ import asyncio import functools +import logging +import pickle from typing import AsyncGenerator, Generic, Iterator, Optional, TypeVar +import google.protobuf.json_format import grpc from grpc import aio +import proto from google.api_core import exceptions, grpc_helpers # denotes the proto response type for grpc calls P = TypeVar("P") +_LOGGER = logging.getLogger(__name__) + # NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform # automatic patching for us. But that means the overhead of creating an # extra Python function spreads to every single send and receive. @@ -94,7 +100,28 @@ def __init__(self): async def read(self) -> P: try: - return await self._call.read() + result = await self._call.read() + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) + if logging_enabled: # pragma: NO COVER + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = google.protobuf.json_format.MessageToJson(result) + else: + response_payload = ( + f"{type(result).__name__}: {pickle.dumps(result)}" + ) + grpc_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + f"Received response of type {type(result)} via gRPC stream", + extra={ + "response": grpc_response, + }, + ) + return result except grpc.RpcError as rpc_error: raise exceptions.from_grpc_error(rpc_error) from rpc_error @@ -219,7 +246,7 @@ def create_channel( default_host=None, compression=None, attempt_direct_path: Optional[bool] = False, - **kwargs + **kwargs, ): """Create an AsyncIO secure channel with credentials. From 551005610b4009998246196f8f4b9a2fff11879d Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Fri, 14 Feb 2025 22:46:19 +0000 Subject: [PATCH 2/7] mypy --- google/api_core/grpc_helpers.py | 2 +- google/api_core/grpc_helpers_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index cd1ac3e7..f53f6cd4 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -128,7 +128,7 @@ def __next__(self) -> P: response_payload = google.protobuf.json_format.MessageToJson(result) else: response_payload = ( - f"{type(result).__name__}: {pickle.dumps(result)}" + f"{type(result).__name__}: {str(pickle.dumps(result))}" ) grpc_response = { "payload": response_payload, diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py index 01028d43..2d833a77 100644 --- a/google/api_core/grpc_helpers_async.py +++ b/google/api_core/grpc_helpers_async.py @@ -109,7 +109,7 @@ async def read(self) -> P: response_payload = google.protobuf.json_format.MessageToJson(result) else: response_payload = ( - f"{type(result).__name__}: {pickle.dumps(result)}" + f"{type(result).__name__}: {str(pickle.dumps(result))}" ) grpc_response = { "payload": response_payload, From 3fb362fe8f11adb3abd0041f40abf6f8fdbc7257 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Feb 2025 14:06:09 +0000 Subject: [PATCH 3/7] coverage --- tests/asyncio/test_grpc_helpers_async.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/asyncio/test_grpc_helpers_async.py b/tests/asyncio/test_grpc_helpers_async.py index aa8d5d10..6e408ca5 100644 --- a/tests/asyncio/test_grpc_helpers_async.py +++ b/tests/asyncio/test_grpc_helpers_async.py @@ -186,7 +186,7 @@ async def test_wrap_stream_errors_raised(): @pytest.mark.asyncio -async def test_wrap_stream_errors_read(): +async def test_wrap_stream_errors_read_with_grpc_error(): grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) @@ -206,6 +206,23 @@ async def test_wrap_stream_errors_read(): assert exc_info.value.response == grpc_error +@pytest.mark.asyncio +async def test_wrap_stream_errors_read_without_grpc_error(): + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + + mock_call.read = mock.AsyncMock() + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors( + multicallable, grpc_helpers_async._WrappedStreamStreamCall + ) + + wrapped_call = await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + await wrapped_call.read() + + @pytest.mark.asyncio async def test_wrap_stream_errors_aiter(): grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) From d73d18cb5a8afff9c995e244190584e12d7824e4 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Feb 2025 14:34:06 +0000 Subject: [PATCH 4/7] include debug log for first result in grpc unary->stream calls --- google/api_core/grpc_helpers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index f53f6cd4..912413ff 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -117,9 +117,8 @@ def __next__(self) -> P: if hasattr(self, "_stored_first_result"): result = self._stored_first_result del self._stored_first_result - return result - result = next(self._wrapped) - + else: + result = next(self._wrapped) logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) if logging_enabled: # pragma: NO COVER if isinstance(result, proto.Message): From 826c79b533d4a595ac7c8dae9fc9925d11474fb7 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Feb 2025 15:49:57 +0000 Subject: [PATCH 5/7] add debug logs for rest streaming --- google/api_core/_rest_streaming_base.py | 26 +++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/google/api_core/_rest_streaming_base.py b/google/api_core/_rest_streaming_base.py index 3bc87a96..394cd191 100644 --- a/google/api_core/_rest_streaming_base.py +++ b/google/api_core/_rest_streaming_base.py @@ -15,6 +15,7 @@ """Helpers for server-side streaming in REST.""" from collections import deque +import logging import string from typing import Deque, Union import types @@ -23,6 +24,8 @@ import google.protobuf.message from google.protobuf.json_format import Parse +_LOGGER = logging.getLogger(__name__) + class BaseResponseIterator: """Base Iterator over REST API responses. This class should not be used directly. @@ -97,19 +100,38 @@ def _process_chunk(self, chunk: str): self._obj += char self._escape_next = not self._escape_next if char == "\\" else False + def _log_response_payload(self, response_payload: str): + rest_response = { + "payload": response_payload, + "status": "OK", + } + _LOGGER.debug( + "Received response via REST stream", + extra={ + "response": rest_response, + }, + ) + def _create_grab(self): + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) if issubclass(self._response_message_cls, proto.Message): def grab(this): + result = this._ready_objs.popleft() + if logging_enabled: # pragma: NO COVER + self._log_result(result) return this._response_message_cls.from_json( - this._ready_objs.popleft(), ignore_unknown_fields=True + result, ignore_unknown_fields=True ) return grab elif issubclass(self._response_message_cls, google.protobuf.message.Message): def grab(this): - return Parse(this._ready_objs.popleft(), this._response_message_cls()) + result = this._ready_objs.popleft() + if logging_enabled: # pragma: NO COVER + self._log_result(result) + return Parse(result, this._response_message_cls()) return grab else: From 230fe9ab53f73bafcdf5cbed6397d374f9ac6805 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Feb 2025 15:53:05 +0000 Subject: [PATCH 6/7] result->response_payload --- google/api_core/_rest_streaming_base.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/google/api_core/_rest_streaming_base.py b/google/api_core/_rest_streaming_base.py index 394cd191..93e00af9 100644 --- a/google/api_core/_rest_streaming_base.py +++ b/google/api_core/_rest_streaming_base.py @@ -117,21 +117,21 @@ def _create_grab(self): if issubclass(self._response_message_cls, proto.Message): def grab(this): - result = this._ready_objs.popleft() + response_payload = this._ready_objs.popleft() if logging_enabled: # pragma: NO COVER - self._log_result(result) + self._log_response_payload(response_payload) return this._response_message_cls.from_json( - result, ignore_unknown_fields=True + response_payload, ignore_unknown_fields=True ) return grab elif issubclass(self._response_message_cls, google.protobuf.message.Message): def grab(this): - result = this._ready_objs.popleft() + response_payload = this._ready_objs.popleft() if logging_enabled: # pragma: NO COVER - self._log_result(result) - return Parse(result, this._response_message_cls()) + self._log_response_payload(response_payload) + return Parse(response_payload, this._response_message_cls()) return grab else: From 9fac30c01bf4bcba606d0b4af4348b4a7879c74f Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Wed, 19 Feb 2025 15:57:07 +0000 Subject: [PATCH 7/7] cover --- google/api_core/_rest_streaming_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/api_core/_rest_streaming_base.py b/google/api_core/_rest_streaming_base.py index 93e00af9..f76ea179 100644 --- a/google/api_core/_rest_streaming_base.py +++ b/google/api_core/_rest_streaming_base.py @@ -100,7 +100,7 @@ def _process_chunk(self, chunk: str): self._obj += char self._escape_next = not self._escape_next if char == "\\" else False - def _log_response_payload(self, response_payload: str): + def _log_response_payload(self, response_payload: str): # pragma: NO COVER rest_response = { "payload": response_payload, "status": "OK",