Skip to content

feat: add client debug logging support for streaming gRPC/REST calls #794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions google/api_core/_rest_streaming_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Helpers for server-side streaming in REST."""

from collections import deque
import logging
import string
from typing import Deque, Union
import types
Expand All @@ -23,6 +24,8 @@
import google.protobuf.message
from google.protobuf.json_format import Parse

_LOGGER = logging.getLogger(__name__)


class BaseResponseIterator:
"""Base Iterator over REST API responses. This class should not be used directly.
Expand Down Expand Up @@ -97,19 +100,38 @@ def _process_chunk(self, chunk: str):
self._obj += char
self._escape_next = not self._escape_next if char == "\\" else False

def _log_response_payload(self, response_payload: str): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to skip coverage here?

rest_response = {
"payload": response_payload,
"status": "OK",
}
_LOGGER.debug(
"Received response via REST stream",
extra={
"response": rest_response,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be using httpResponse here instead for structured logs?

Comment on lines +104 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should be using/reference any sort of HTTP response here at all. This helper is being called from grab, and grab simply passes already received messages reconstituted from chunks. So what we should log in grab is something like passing next message to stream: <message>.

We should log http responses where we actually receive the HTTP chunks.

},
)

def _create_grab(self):
logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if issubclass(self._response_message_cls, proto.Message):

def grab(this):
response_payload = this._ready_objs.popleft()
if logging_enabled: # pragma: NO COVER
self._log_response_payload(response_payload)
return this._response_message_cls.from_json(
this._ready_objs.popleft(), ignore_unknown_fields=True
response_payload, ignore_unknown_fields=True
)

return grab
elif issubclass(self._response_message_cls, google.protobuf.message.Message):

def grab(this):
return Parse(this._ready_objs.popleft(), this._response_message_cls())
response_payload = this._ready_objs.popleft()
if logging_enabled: # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add coverage for this?

self._log_response_payload(response_payload)
return Parse(response_payload, this._response_message_cls())

return grab
else:
Expand Down
30 changes: 28 additions & 2 deletions google/api_core/grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

import collections
import functools
import logging
import pickle
import warnings

import google.protobuf.json_format
import grpc
import proto

from google.api_core import exceptions
import google.auth
Expand Down Expand Up @@ -48,6 +52,7 @@
else:
HAS_GRPC_GCP = False

_LOGGER = logging.getLogger(__name__)

# The list of gRPC Callable interfaces that return iterators.
_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable)
Expand Down Expand Up @@ -112,8 +117,29 @@ def __next__(self) -> P:
if hasattr(self, "_stored_first_result"):
result = self._stored_first_result
del self._stored_first_result
return result
return next(self._wrapped)
else:
result = next(self._wrapped)
logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if logging_enabled: # pragma: NO COVER
if isinstance(result, proto.Message):
response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = google.protobuf.json_format.MessageToJson(result)
else:
response_payload = (
f"{type(result).__name__}: {str(pickle.dumps(result))}"
)
grpc_response = {
"payload": response_payload,
"status": "OK",
Comment on lines +132 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the REST case, I don't think we should be logging things here as though we're receiving a response from the server. We should log that we're passing the next item in the stream. (Because, for example, we might be getting _stored_first_result that was received and stored earlier.)

So let's log that we're returning the next item here, but wherever we do receive the item from the server (is it next(self.wrapped)?), we should log that we received X message from the server: <msg> <msg> ....

IOW, let's not conflate for ourselves or our users receiving the streaming data from the server vs passing eached streamed message to the GAPIC user. This is particularly important for async streaming.

}
_LOGGER.debug(
f"Received response of type {type(result)} via gRPC stream",
extra={
"response": grpc_response,
},
Comment on lines +120 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be added as a helper function and re-used in grpc_helpers_async?

)
return result
except grpc.RpcError as exc:
# If the stream has already returned data, we cannot recover here.
raise exceptions.from_grpc_error(exc) from exc
Expand Down
31 changes: 29 additions & 2 deletions google/api_core/grpc_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@

import asyncio
import functools
import logging
import pickle

from typing import AsyncGenerator, Generic, Iterator, Optional, TypeVar

import google.protobuf.json_format
import grpc
from grpc import aio
import proto

from google.api_core import exceptions, grpc_helpers

# denotes the proto response type for grpc calls
P = TypeVar("P")

_LOGGER = logging.getLogger(__name__)

# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
# automatic patching for us. But that means the overhead of creating an
# extra Python function spreads to every single send and receive.
Expand Down Expand Up @@ -94,7 +100,28 @@ def __init__(self):

async def read(self) -> P:
try:
return await self._call.read()
result = await self._call.read()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this gRPC doing an actual read from the network, so logging it as such below makes sense.

(just pointing this out to contrast with my other comments about logging returning previously streamed messages to the user)

logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if logging_enabled: # pragma: NO COVER
if isinstance(result, proto.Message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you link

{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic or wait for next gen protobuf. #}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can the parsing logic be added as a helper function and re-used in the gapic? We can file a TODO for the latter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

This ties into the issue I still want to get back to b/382299158. Maybe we reference that issue instead of creating a new one, and in that issue we reference these changes so we have easy access.

response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = google.protobuf.json_format.MessageToJson(result)
else:
response_payload = (
f"{type(result).__name__}: {str(pickle.dumps(result))}"
)
grpc_response = {
"payload": response_payload,
"status": "OK",
}
_LOGGER.debug(
f"Received response of type {type(result)} via gRPC stream",
extra={
"response": grpc_response,
},
)
return result
except grpc.RpcError as rpc_error:
raise exceptions.from_grpc_error(rpc_error) from rpc_error

Expand Down Expand Up @@ -219,7 +246,7 @@ def create_channel(
default_host=None,
compression=None,
attempt_direct_path: Optional[bool] = False,
**kwargs
**kwargs,
):
"""Create an AsyncIO secure channel with credentials.

Expand Down
19 changes: 18 additions & 1 deletion tests/asyncio/test_grpc_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def test_wrap_stream_errors_raised():


@pytest.mark.asyncio
async def test_wrap_stream_errors_read():
async def test_wrap_stream_errors_read_with_grpc_error():
grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)

mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)
Expand All @@ -206,6 +206,23 @@ async def test_wrap_stream_errors_read():
assert exc_info.value.response == grpc_error


@pytest.mark.asyncio
async def test_wrap_stream_errors_read_without_grpc_error():
mock_call = mock.Mock(aio.StreamStreamCall, autospec=True)

mock_call.read = mock.AsyncMock()
multicallable = mock.Mock(return_value=mock_call)

wrapped_callable = grpc_helpers_async._wrap_stream_errors(
multicallable, grpc_helpers_async._WrappedStreamStreamCall
)

wrapped_call = await wrapped_callable(1, 2, three="four")
multicallable.assert_called_once_with(1, 2, three="four")
assert mock_call.wait_for_connection.call_count == 1
await wrapped_call.read()


@pytest.mark.asyncio
async def test_wrap_stream_errors_aiter():
grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT)
Expand Down