Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add REST Interceptors which support reading metadata #1355

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 92 additions & 36 deletions google/pubsub_v1/services/publisher/async_client.py

Large diffs are not rendered by default.

228 changes: 165 additions & 63 deletions google/pubsub_v1/services/publisher/client.py

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions google/pubsub_v1/services/publisher/pagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -80,8 +80,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -140,7 +142,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -154,8 +156,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -218,7 +222,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -232,8 +236,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -292,7 +298,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -306,8 +312,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -370,7 +378,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.

Expand All @@ -384,8 +392,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down Expand Up @@ -444,7 +454,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.

Expand All @@ -458,8 +468,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down
114 changes: 100 additions & 14 deletions google/pubsub_v1/services/publisher/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging as std_logging
import pickle
import warnings
from typing import Callable, Dict, Optional, Sequence, Tuple, Union

Expand All @@ -21,15 +24,93 @@
import google.auth # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
from google.protobuf.json_format import MessageToJson
import google.protobuf.message

import grpc # type: ignore
import proto # type: ignore

from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore
from google.pubsub_v1.types import pubsub
from .base import PublisherTransport, DEFAULT_CLIENT_INFO

try:
from google.api_core import client_logging # type: ignore

CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
except ImportError: # pragma: NO COVER
CLIENT_LOGGING_SUPPORTED = False

_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
std_logging.DEBUG
)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)

response = continuation(client_call_details, request)
if logging_enabled: # pragma: NO COVER
response_metadata = response.trailing_metadata()
# Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
metadata = (
dict([(k, str(v)) for k, v in response_metadata])
if response_metadata
else None
)
result = response.result()
if isinstance(result, proto.Message):
response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = MessageToJson(result)
else:
response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
grpc_response = {
"payload": response_payload,
"metadata": metadata,
"status": "OK",
}
_LOGGER.debug(
f"Received response for {client_call_details.method}.",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"response": grpc_response,
"metadata": grpc_response["metadata"],
},
)
return response


class PublisherGrpcTransport(PublisherTransport):
"""gRPC backend transport for Publisher.
Expand Down Expand Up @@ -186,7 +267,12 @@ def __init__(
],
)

# Wrap messages. This must be done after self._grpc_channel exists
self._interceptor = _LoggingClientInterceptor()
self._logged_channel = grpc.intercept_channel(
self._grpc_channel, self._interceptor
)

# Wrap messages. This must be done after self._logged_channel exists
self._prep_wrapped_messages(client_info)

@classmethod
Expand Down Expand Up @@ -260,7 +346,7 @@ def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "create_topic" not in self._stubs:
self._stubs["create_topic"] = self.grpc_channel.unary_unary(
self._stubs["create_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/CreateTopic",
request_serializer=pubsub.Topic.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -286,7 +372,7 @@ def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "update_topic" not in self._stubs:
self._stubs["update_topic"] = self.grpc_channel.unary_unary(
self._stubs["update_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/UpdateTopic",
request_serializer=pubsub.UpdateTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -311,7 +397,7 @@ def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "publish" not in self._stubs:
self._stubs["publish"] = self.grpc_channel.unary_unary(
self._stubs["publish"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/Publish",
request_serializer=pubsub.PublishRequest.serialize,
response_deserializer=pubsub.PublishResponse.deserialize,
Expand All @@ -335,7 +421,7 @@ def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_topic" not in self._stubs:
self._stubs["get_topic"] = self.grpc_channel.unary_unary(
self._stubs["get_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/GetTopic",
request_serializer=pubsub.GetTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -361,7 +447,7 @@ def list_topics(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topics" not in self._stubs:
self._stubs["list_topics"] = self.grpc_channel.unary_unary(
self._stubs["list_topics"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopics",
request_serializer=pubsub.ListTopicsRequest.serialize,
response_deserializer=pubsub.ListTopicsResponse.deserialize,
Expand Down Expand Up @@ -390,7 +476,7 @@ def list_topic_subscriptions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_subscriptions" not in self._stubs:
self._stubs["list_topic_subscriptions"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSubscriptions",
request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
Expand Down Expand Up @@ -423,7 +509,7 @@ def list_topic_snapshots(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_snapshots" not in self._stubs:
self._stubs["list_topic_snapshots"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSnapshots",
request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
Expand Down Expand Up @@ -452,7 +538,7 @@ def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "delete_topic" not in self._stubs:
self._stubs["delete_topic"] = self.grpc_channel.unary_unary(
self._stubs["delete_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DeleteTopic",
request_serializer=pubsub.DeleteTopicRequest.serialize,
response_deserializer=empty_pb2.Empty.FromString,
Expand Down Expand Up @@ -484,7 +570,7 @@ def detach_subscription(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "detach_subscription" not in self._stubs:
self._stubs["detach_subscription"] = self.grpc_channel.unary_unary(
self._stubs["detach_subscription"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DetachSubscription",
request_serializer=pubsub.DetachSubscriptionRequest.serialize,
response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
Expand All @@ -509,7 +595,7 @@ def set_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "set_iam_policy" not in self._stubs:
self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/SetIamPolicy",
request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand All @@ -535,7 +621,7 @@ def get_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_iam_policy" not in self._stubs:
self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/GetIamPolicy",
request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand Down Expand Up @@ -564,15 +650,15 @@ def test_iam_permissions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "test_iam_permissions" not in self._stubs:
self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/TestIamPermissions",
request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
)
return self._stubs["test_iam_permissions"]

def close(self):
self.grpc_channel.close()
self._logged_channel.close()

@property
def kind(self) -> str:
Expand Down
Loading
Loading