Skip to content

Commit

Permalink
feat: Add grpc Compression argument to channels and methods (#451)
Browse files Browse the repository at this point in the history
* (feat): Add grpc Compression argument

* Add compression arg to channel creation

* fix linter errors

* fix linter errors

* refactor with new lib

* reformat

* fix tests

* add compression after refactor:

* fix lint

* fix unit tests

* fix unit tests

* fix operation

* remove unused import

* remove compression for grpc_gcp.secure_channel call

* fix method.py comment

* Update grpc_helpers.py

Remove experimental disclaimer

* Update grpc_helpers_async.py

Remove experimental disclaimer

* Update google/api_core/operations_v1/operations_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_async_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_async_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_async_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_async_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Update google/api_core/operations_v1/operations_client.py

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

---------

Co-authored-by: Anthonios Partheniou <partheniou@google.com>
  • Loading branch information
acocuzzo and parthea authored Sep 7, 2023
1 parent 2477ab9 commit bdebd63
Show file tree
Hide file tree
Showing 16 changed files with 377 additions and 89 deletions.
50 changes: 39 additions & 11 deletions google/api_core/gapic_v1/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Helpers for wrapping low-level gRPC methods with common functionality.
This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
compression, pagination, and long-running operations to gRPC methods.
"""

import enum
Expand All @@ -38,7 +38,7 @@ class _MethodDefault(enum.Enum):


DEFAULT = _MethodDefault._DEFAULT_VALUE
"""Sentinel value indicating that a retry or timeout argument was unspecified,
"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
so the default should be used."""


Expand Down Expand Up @@ -72,27 +72,43 @@ class _GapicCallable(object):
after its start, not to be confused with deadline). If ``None``,
this callable will not specify a timeout argument to the low-level
RPC method.
compression (grpc.Compression): The default compression for the callable.
If ``None``, this callable will not specify a compression argument
to the low-level RPC method.
metadata (Sequence[Tuple[str, str]]): Additional metadata that is
provided to the RPC method on every invocation. This is merged with
any metadata specified during invocation. If ``None``, no
additional metadata will be passed to the RPC method.
"""

def __init__(self, target, retry, timeout, metadata=None):
def __init__(
self,
target,
retry,
timeout,
compression,
metadata=None,
):
self._target = target
self._retry = retry
self._timeout = timeout
self._compression = compression
self._metadata = metadata

def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs):
"""Invoke the low-level RPC with retry, timeout, and metadata."""
def __call__(
self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
):
"""Invoke the low-level RPC with retry, timeout, compression, and metadata."""

if retry is DEFAULT:
retry = self._retry

if timeout is DEFAULT:
timeout = self._timeout

if compression is DEFAULT:
compression = self._compression

if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)

Expand All @@ -109,6 +125,8 @@ def __call__(self, *args, timeout=DEFAULT, retry=DEFAULT, **kwargs):
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
if self._compression is not None:
kwargs["compression"] = compression

return wrapped_func(*args, **kwargs)

Expand All @@ -117,19 +135,21 @@ def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
):
"""Wrap an RPC method with common behavior.
This applies common error wrapping, retry, and timeout behavior a function.
The wrapped function will take optional ``retry`` and ``timeout``
This applies common error wrapping, retry, timeout, and compression behavior to a function.
The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
arguments.
For example::
import google.api_core.gapic_v1.method
from google.api_core import retry
from google.api_core import timeout
from grpc import Compression
# The original RPC method.
def get_topic(name, timeout=None):
Expand All @@ -138,6 +158,7 @@ def get_topic(name, timeout=None):
default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
default_compression = Compression.NoCompression
wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
get_topic, default_retry)
Expand Down Expand Up @@ -186,6 +207,9 @@ def get_topic(name, timeout=None):
default_timeout (Optional[google.api_core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
default_compression (Optional[grpc.Compression]): The default
grpc.Compression. If ``None``, the method will not have
compression specified by default.
client_info
(Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
Client information used to create a user-agent string that's
Expand All @@ -194,19 +218,23 @@ def get_topic(name, timeout=None):
metadata will be provided to the RPC method.
Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression``
arguments and applies the common error mapping, retry, timeout, compression,
and metadata behavior to the low-level RPC method.
"""
func = grpc_helpers.wrap_errors(func)

if client_info is not None:
user_agent_metadata = [client_info.to_grpc_metadata()]
else:
user_agent_metadata = None

return functools.wraps(func)(
_GapicCallable(
func, default_retry, default_timeout, metadata=user_agent_metadata
func,
default_retry,
default_timeout,
default_compression,
metadata=user_agent_metadata,
)
)
17 changes: 12 additions & 5 deletions google/api_core/gapic_v1/method_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""AsyncIO helpers for wrapping gRPC methods with common functionality.
This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
compression, pagination, and long-running operations to gRPC methods.
"""

import functools
Expand All @@ -30,19 +30,26 @@ def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
):
"""Wrap an async RPC method with common behavior.
Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
and metadata behavior to the low-level RPC method.
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression`` arguments and applies the common error mapping,
retry, timeout, metadata, and compression behavior to the low-level RPC method.
"""
func = grpc_helpers_async.wrap_errors(func)

metadata = [client_info.to_grpc_metadata()] if client_info is not None else None

return functools.wraps(func)(
_GapicCallable(func, default_retry, default_timeout, metadata=metadata)
_GapicCallable(
func,
default_retry,
default_timeout,
default_compression,
metadata=metadata,
)
)
28 changes: 22 additions & 6 deletions google/api_core/grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import collections
import functools
import logging
import warnings

import grpc
Expand Down Expand Up @@ -51,6 +52,8 @@
# The list of gRPC Callable interfaces that return iterators.
_STREAM_WRAP_CLASSES = (grpc.UnaryStreamMultiCallable, grpc.StreamStreamMultiCallable)

_LOGGER = logging.getLogger(__name__)


def _patch_callable_name(callable_):
"""Fix-up gRPC callable attributes.
Expand Down Expand Up @@ -276,7 +279,8 @@ def create_channel(
quota_project_id=None,
default_scopes=None,
default_host=None,
**kwargs
compression=None,
**kwargs,
):
"""Create a secure channel with credentials.
Expand All @@ -297,6 +301,8 @@ def create_channel(
default_scopes (Sequence[str]): Default scopes passed by a Google client
library. Use 'scopes' for user-defined scopes.
default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
compression (grpc.Compression): An optional value indicating the
compression method to be used over the lifetime of the channel.
kwargs: Additional key-word args passed to
:func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`.
Note: `grpc_gcp` is only supported in environments with protobuf < 4.0.0.
Expand All @@ -319,12 +325,18 @@ def create_channel(
)

if HAS_GRPC_GCP: # pragma: NO COVER
if compression is not None and compression != grpc.Compression.NoCompression:
_LOGGER.debug(
"Compression argument is being ignored for grpc_gcp.secure_channel creation."
)
return grpc_gcp.secure_channel(target, composite_credentials, **kwargs)
return grpc.secure_channel(target, composite_credentials, **kwargs)
return grpc.secure_channel(
target, composite_credentials, compression=compression, **kwargs
)


_MethodCall = collections.namedtuple(
"_MethodCall", ("request", "timeout", "metadata", "credentials")
"_MethodCall", ("request", "timeout", "metadata", "credentials", "compression")
)

_ChannelRequest = collections.namedtuple("_ChannelRequest", ("method", "request"))
Expand All @@ -351,11 +363,15 @@ def __init__(self, method, channel):
"""List[protobuf.Message]: All requests sent to this callable."""
self.calls = []
"""List[Tuple]: All invocations of this callable. Each tuple is the
request, timeout, metadata, and credentials."""
request, timeout, metadata, compression, and credentials."""

def __call__(self, request, timeout=None, metadata=None, credentials=None):
def __call__(
self, request, timeout=None, metadata=None, credentials=None, compression=None
):
self._channel.requests.append(_ChannelRequest(self._method, request))
self.calls.append(_MethodCall(request, timeout, metadata, credentials))
self.calls.append(
_MethodCall(request, timeout, metadata, credentials, compression)
)
self.requests.append(request)

response = self.response
Expand Down
7 changes: 6 additions & 1 deletion google/api_core/grpc_helpers_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def create_channel(
quota_project_id=None,
default_scopes=None,
default_host=None,
compression=None,
**kwargs
):
"""Create an AsyncIO secure channel with credentials.
Expand All @@ -233,6 +234,8 @@ def create_channel(
default_scopes (Sequence[str]): Default scopes passed by a Google client
library. Use 'scopes' for user-defined scopes.
default_host (str): The default endpoint. e.g., "pubsub.googleapis.com".
compression (grpc.Compression): An optional value indicating the
compression method to be used over the lifetime of the channel.
kwargs: Additional key-word args passed to :func:`aio.secure_channel`.
Returns:
Expand All @@ -252,7 +255,9 @@ def create_channel(
default_host=default_host,
)

return aio.secure_channel(target, composite_credentials, **kwargs)
return aio.secure_channel(
target, composite_credentials, compression=compression, **kwargs
)


class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall):
Expand Down
18 changes: 14 additions & 4 deletions google/api_core/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,16 @@ def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwa
operation.
"""
refresh = functools.partial(
_refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata
_refresh_grpc,
operations_stub,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
_cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata
_cancel_grpc,
operations_stub,
operation.name,
metadata=grpc_metadata,
)
return Operation(operation, refresh, cancel, result_type, **kwargs)

Expand Down Expand Up @@ -347,9 +353,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **
operation.
"""
refresh = functools.partial(
operations_client.get_operation, operation.name, metadata=grpc_metadata
operations_client.get_operation,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
operations_client.cancel_operation, operation.name, metadata=grpc_metadata
operations_client.cancel_operation,
operation.name,
metadata=grpc_metadata,
)
return Operation(operation, refresh, cancel, result_type, **kwargs)
8 changes: 6 additions & 2 deletions google/api_core/operation_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,13 @@ def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **
operation.
"""
refresh = functools.partial(
operations_client.get_operation, operation.name, metadata=grpc_metadata
operations_client.get_operation,
operation.name,
metadata=grpc_metadata,
)
cancel = functools.partial(
operations_client.cancel_operation, operation.name, metadata=grpc_metadata
operations_client.cancel_operation,
operation.name,
metadata=grpc_metadata,
)
return AsyncOperation(operation, refresh, cancel, result_type, **kwargs)
Loading

0 comments on commit bdebd63

Please sign in to comment.