Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add grpc Compression argument to channels and methods #451

Merged
merged 31 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e669129
(feat): Add grpc Compression argument
acocuzzo Sep 21, 2022
9d06f0b
Add compression arg to channel creation
acocuzzo Sep 21, 2022
4517ff3
Merge branch 'main' into add-compression
acocuzzo Feb 17, 2023
b255ba3
fix linter errors
acocuzzo Feb 17, 2023
a9cde22
fix linter errors
acocuzzo Feb 17, 2023
c893059
refactor with new lib
acocuzzo Feb 17, 2023
934d6b9
reformat
acocuzzo Feb 17, 2023
69cf59b
fix tests
acocuzzo Feb 17, 2023
07be315
Merge branch 'main' into add-compression
acocuzzo Apr 10, 2023
49e8d9c
add compression after refactor:
acocuzzo Apr 14, 2023
b92254b
fix lint
acocuzzo Apr 14, 2023
3b5c657
fix unit tests
acocuzzo Apr 14, 2023
00cfbb2
fix unit tests
acocuzzo May 6, 2023
8b0c7db
fix operation
acocuzzo May 6, 2023
d2910a0
Merge remote-tracking branch 'upstream/main' into add-compression
acocuzzo May 6, 2023
e91798c
remove unused import
acocuzzo May 6, 2023
fb75225
remove compression for grpc_gcp.secure_channel call
acocuzzo May 6, 2023
909eed1
fix method.py comment
acocuzzo May 8, 2023
af9021a
Merge branch 'main' into add-compression
acocuzzo Jul 7, 2023
c828d80
Merge branch 'main' into add-compression
parthea Aug 30, 2023
735de81
Update grpc_helpers.py
acocuzzo Sep 1, 2023
de9c9de
Update grpc_helpers_async.py
acocuzzo Sep 1, 2023
f5b9901
Merge branch 'main' into add-compression
acocuzzo Sep 1, 2023
b1332d8
Update google/api_core/operations_v1/operations_client.py
acocuzzo Sep 2, 2023
c661ecc
Update google/api_core/operations_v1/operations_client.py
acocuzzo Sep 2, 2023
8ffa4da
Update google/api_core/operations_v1/operations_client.py
acocuzzo Sep 2, 2023
26438ba
Update google/api_core/operations_v1/operations_async_client.py
acocuzzo Sep 2, 2023
21371b0
Update google/api_core/operations_v1/operations_async_client.py
acocuzzo Sep 2, 2023
8aeeffd
Update google/api_core/operations_v1/operations_async_client.py
acocuzzo Sep 2, 2023
6becbeb
Update google/api_core/operations_v1/operations_async_client.py
acocuzzo Sep 2, 2023
0aa0249
Update google/api_core/operations_v1/operations_client.py
acocuzzo Sep 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions google/api_core/gapic_v1/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Helpers for wrapping low-level gRPC methods with common functionality.

This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
compression, pagination, and long-running operations to gRPC methods.
"""

import enum
Expand All @@ -38,7 +38,7 @@ class _MethodDefault(enum.Enum):


DEFAULT = _MethodDefault._DEFAULT_VALUE
"""Sentinel value indicating that a retry or timeout argument was unspecified,
"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
so the default should be used."""


Expand Down Expand Up @@ -72,27 +72,43 @@ class _GapicCallable(object):
after its start, not to be confused with deadline). If ``None``,
this callable will not specify a timeout argument to the low-level
RPC method.
compression (grpc.Compression): The default compression for the callable.
If ``None``, this callable will not specify a compression argument
to the low-level RPC method.
metadata (Sequence[Tuple[str, str]]): Additional metadata that is
provided to the RPC method on every invocation. This is merged with
any metadata specified during invocation. If ``None``, no
additional metadata will be passed to the RPC method.
"""

def __init__(self, target, retry, timeout, metadata=None):
def __init__(
self,
target,
retry,
timeout,
compression,
metadata=None,
):
self._target = target
self._retry = retry
self._timeout = timeout
self._compression = compression
self._metadata = metadata

def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs):
"""Invoke the low-level RPC with retry, timeout, and metadata."""
def __call__(
self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
):
"""Invoke the low-level RPC with retry, timeout, compression, and metadata."""

if retry is DEFAULT:
retry = self._retry

if timeout is DEFAULT:
timeout = self._timeout

if compression is DEFAULT:
compression = self._compression

if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)

Expand All @@ -109,6 +125,8 @@ def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs):
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
if self._compression is not None:
kwargs["compression"] = compression

return wrapped_func(*args, **kwargs)

Expand All @@ -117,19 +135,21 @@ def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
):
"""Wrap an RPC method with common behavior.

This applies common error wrapping, retry, and timeout behavior a function.
The wrapped function will take optional ``retry`` and ``timeout``
This applies common error wrapping, retry, timeout, and compression behavior to a function.
The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
arguments.

For example::

import google.api_core.gapic_v1.method
from google.api_core import retry
from google.api_core import timeout
from grpc import Compression

# The original RPC method.
def get_topic(name, timeout=None):
Expand All @@ -138,6 +158,7 @@ def get_topic(name, timeout=None):

default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
default_compression = Compression.NoCompression
wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
get_topic, default_retry)

Expand Down Expand Up @@ -186,6 +207,9 @@ def get_topic(name, timeout=None):
default_timeout (Optional[google.api_core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
default_compression (Optional[grpc.Compression]): The default
grpc.Compression. If ``None``, the method will not have
compression specified by default.
client_info
(Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
Client information used to create a user-agent string that's
Expand All @@ -194,19 +218,23 @@ def get_topic(name, timeout=None):
metadata will be provided to the RPC method.

Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression``
arguments and applies the common error mapping, retry, timeout, compression,
and metadata behavior to the low-level RPC method.
"""
func = grpc_helpers.wrap_errors(func)

if client_info is not None:
user_agent_metadata = [client_info.to_grpc_metadata()]
else:
user_agent_metadata = None

return functools.wraps(func)(
_GapicCallable(
func, default_retry, default_timeout, metadata=user_agent_metadata
func,
default_retry,
default_timeout,
default_compression,
metadata=user_agent_metadata,
)
)
17 changes: 12 additions & 5 deletions google/api_core/gapic_v1/method_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""AsyncIO helpers for wrapping gRPC methods with common functionality.

This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
compression, pagination, and long-running operations to gRPC methods.
"""

import functools
Expand All @@ -30,19 +30,26 @@ def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
):
"""Wrap an async RPC method with common behavior.

Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
and metadata behavior to the low-level RPC method.
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression`` arguments and applies the common error mapping,
retry, timeout, metadata, and compression behavior to the low-level RPC method.
"""
func = grpc_helpers_async.wrap_errors(func)

metadata = [client_info.to_grpc_metadata()] if client_info is not None else None

return functools.wraps(func)(
_GapicCallable(func, default_retry, default_timeout, metadata=metadata)
_GapicCallable(
func,
default_retry,
default_timeout,
default_compression,
metadata=metadata,
)
)
28 changes: 22 additions & 6 deletions google/api_core/grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import collections
import functools
import logging
import warnings

import grpc
Expand Down Expand Up @@ -51,6 +52,8 @@
# The list of gRPC Callable interfaces that return iterators.
_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable)

_LOGGER = logging.getLogger(__name__)


def _patch_callable_name(callable_):
"""Fix-up gRPC callable attributes.
Expand Down Expand Up @@ -276,7 +279,8 @@ def create_channel(
quota_project_id=None,
default_scopes=None,
default_host=None,
**kwargs
compression=None,
**kwargs,
):
"""Create a secure channel with credentials.

Expand All @@ -297,6 +301,8 @@ def create_channel(
default_scopes (Sequence[str]): Default scopes passed by a Google client
library. Use 'scopes' for user-defined scopes.
default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
compression (grpc.Compression): An optional value indicating the
compression method to be used over the lifetime of the channel.
kwargs: Additional key-word args passed to
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0.
Expand All @@ -319,12 +325,18 @@ def create_channel(
)

if HAS_GRPC_GCP: # pragma: NO COVER
if compression is not None and compression != grpc.Compression.NoCompression:
_LOGGER.debug(
"Compression argument is being ignored for grpc_gcp.secure_channel creation."
)
return grpc_gcp.secure_channel(target, composite_credentials, **kwargs)
return grpc.secure_channel(target, composite_credentials, **kwargs)
return grpc.secure_channel(
target, composite_credentials, compression=compression, **kwargs
)


_MethodCall = collections.namedtuple(
"_MethodCall", ("request", "timeout", "metadata", "credentials")
"_MethodCall", ("request", "timeout", "metadata", "credentials", "compression")
)

_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request"))
Expand All @@ -351,11 +363,15 @@ def __init__(self, method, channel):
"""List[protobuf.Message]: All requests sent to this callable."""
self.calls = []
"""List[Tuple]: All invocations of this callable. Each tuple is the
request, timeout, metadata, and credentials."""
request, timeout, metadata, compression, and credentials."""

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self, request, timeout=None, metadata=None, credentials=None, compression=None
):
self._channel.requests.append(_ChannelRequest(self._method, request))
self.calls.append(_MethodCall(request, timeout, metadata, credentials))
self.calls.append(
_MethodCall(request, timeout, metadata, credentials, compression)
)
self.requests.append(request)

response = self.response
Expand Down
7 changes: 6 additions & 1 deletion google/api_core/grpc_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def create_channel(
quota_project_id=None,
default_scopes=None,
default_host=None,
compression=None,
**kwargs
):
"""Create an AsyncIO secure channel with credentials.
Expand All @@ -233,6 +234,8 @@ def create_channel(
default_scopes (Sequence[str]): Default scopes passed by a Google client
library. Use 'scopes' for user-defined scopes.
default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
compression (grpc.Compression): An optional value indicating the
compression method to be used over the lifetime of the channel.
kwargs: Additional key-word args passed to :func:`aio.secure_channel`.

Returns:
Expand All @@ -252,7 +255,9 @@ def create_channel(
default_host=default_host,
)

return aio.secure_channel(target, composite_credentials, **kwargs)
return aio.secure_channel(
target, composite_credentials, compression=compression, **kwargs
)


class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
Expand Down
18 changes: 14 additions & 4 deletions google/api_core/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,16 @@ def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwa
operation.
"""
refresh = functools.partial(
_refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata
_refresh_grpc,
operations_stub,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
_cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata
_cancel_grpc,
operations_stub,
operation.name,
metadata=grpc_metadata,
)
return Operation(operation, refresh, cancel, result_type, **kwargs)

Expand Down Expand Up @@ -347,9 +353,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **
operation.
"""
refresh = functools.partial(
operations_client.get_operation, operation.name, metadata=grpc_metadata
operations_client.get_operation,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
operations_client.cancel_operation, operation.name, metadata=grpc_metadata
operations_client.cancel_operation,
operation.name,
metadata=grpc_metadata,
)
return Operation(operation, refresh, cancel, result_type, **kwargs)
8 changes: 6 additions & 2 deletions google/api_core/operation_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **
operation.
"""
refresh = functools.partial(
operations_client.get_operation, operation.name, metadata=grpc_metadata
operations_client.get_operation,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
operations_client.cancel_operation, operation.name, metadata=grpc_metadata
operations_client.cancel_operation,
operation.name,
metadata=grpc_metadata,
)
return AsyncOperation(operation, refresh, cancel, result_type, **kwargs)
Loading