Skip to content

Commit

Permalink
feat: Add OperationsRestAsyncTransport to support long running operat…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
parthea committed Sep 23, 2024
1 parent 3f26e81 commit eb78445
Show file tree
Hide file tree
Showing 7 changed files with 572 additions and 58 deletions.
13 changes: 13 additions & 0 deletions google/api_core/operations_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,22 @@
from google.api_core.operations_v1.operations_client import OperationsClient
from google.api_core.operations_v1.transports.rest import OperationsRestTransport

try:
from google.api_core.operations_v1.transports.rest_asyncio import (
OperationsRestAsyncTransport,
)

HAS_ASYNC_TRANSPORT = True
except ImportError:
# This import requires the `async_rest` extra
# Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported
# as other transports are still available
HAS_ASYNC_TRANSPORT = False

__all__ = [
"AbstractOperationsClient",
"OperationsAsyncClient",
"OperationsClient",
"OperationsRestTransport",
"OperationsRestAsyncTransport",
]
13 changes: 13 additions & 0 deletions google/api_core/operations_v1/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,25 @@
from .base import OperationsTransport
from .rest import OperationsRestTransport

try:
from .rest_asyncio import OperationsRestAsyncTransport

HAS_ASYNC_TRANSPORT = True
except ImportError:
# This import requires the `async_rest` extra
# Don't raise an exception if `OperationsRestAsyncTransport` cannot be imported
# as other transports are still available
HAS_ASYNC_TRANSPORT = False

# Compile a registry of transports.
_transport_registry = OrderedDict()
_transport_registry["rest"] = OperationsRestTransport

if HAS_ASYNC_TRANSPORT:
_transport_registry["rest_asyncio"] = OperationsRestAsyncTransport

__all__ = (
"OperationsTransport",
"OperationsRestTransport",
"OperationsRestAsyncTransport",
)
64 changes: 58 additions & 6 deletions google/api_core/operations_v1/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@
# limitations under the License.
#
import abc
import re
from typing import Awaitable, Callable, Optional, Sequence, Union

import google.api_core # type: ignore
from google.api_core import exceptions as core_exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.api_core import retry_async as retries_async # type: ignore
from google.api_core import version
import google.auth # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.longrunning import operations_pb2
from google.oauth2 import service_account # type: ignore
from google.protobuf import empty_pb2 # type: ignore
import google.protobuf
from google.protobuf import empty_pb2, json_format # type: ignore
from grpc import Compression


PROTOBUF_VERSION = google.protobuf.__version__

DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
gapic_version=version.__version__,
)
Expand All @@ -51,6 +56,7 @@ def __init__(
quota_project_id: Optional[str] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
always_use_jwt_access: Optional[bool] = False,
url_scheme="https",
**kwargs,
) -> None:
"""Instantiate the transport.
Expand All @@ -76,7 +82,20 @@ def __init__(
your own client library.
always_use_jwt_access (Optional[bool]): Whether self signed JWT should
be used for service account credentials.
url_scheme: the protocol scheme for the API endpoint. Normally
"https", but for testing or local servers,
"http" can be specified.
"""
maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
if maybe_url_match is None:
raise ValueError(
f"Unexpected hostname structure: {host}"
) # pragma: NO COVER

url_match_items = maybe_url_match.groupdict()

host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host

# Save the hostname. Default to port 443 (HTTPS) if none is specified.
if ":" not in host:
host += ":443"
Expand Down Expand Up @@ -115,12 +134,13 @@ def __init__(
# Save the credentials.
self._credentials = credentials

def _prep_wrapped_messages(self, client_info):
def _prep_wrapped_messages(self, client_info, is_async=False):
# Precompute the wrapped methods.
retry_class = retries_async.AsyncRetry if is_async else retries.Retry
self._wrapped_methods = {
self.list_operations: gapic_v1.method.wrap_method(
self.list_operations,
default_retry=retries.Retry(
default_retry=retry_class(
initial=0.5,
maximum=10.0,
multiplier=2.0,
Expand All @@ -135,7 +155,7 @@ def _prep_wrapped_messages(self, client_info):
),
self.get_operation: gapic_v1.method.wrap_method(
self.get_operation,
default_retry=retries.Retry(
default_retry=retry_class(
initial=0.5,
maximum=10.0,
multiplier=2.0,
Expand All @@ -150,7 +170,7 @@ def _prep_wrapped_messages(self, client_info):
),
self.delete_operation: gapic_v1.method.wrap_method(
self.delete_operation,
default_retry=retries.Retry(
default_retry=retry_class(
initial=0.5,
maximum=10.0,
multiplier=2.0,
Expand All @@ -165,7 +185,7 @@ def _prep_wrapped_messages(self, client_info):
),
self.cancel_operation: gapic_v1.method.wrap_method(
self.cancel_operation,
default_retry=retries.Retry(
default_retry=retry_class(
initial=0.5,
maximum=10.0,
multiplier=2.0,
Expand All @@ -189,6 +209,38 @@ def close(self):
"""
raise NotImplementedError()

def _convert_protobuf_message_to_dict(
self, message: google.protobuf.message.Message
):
r"""Converts protobuf message to a dictionary.
When the dictionary is encoded to JSON, it conforms to proto3 JSON spec.
Args:
message(google.protobuf.message.Message): The protocol buffers message
instance to serialize.
Returns:
A dict representation of the protocol buffer message.
"""
# For backwards compatibility with protobuf 3.x 4.x
# Remove once support for protobuf 3.x and 4.x is dropped
# https://github.com/googleapis/python-api-core/issues/643
if PROTOBUF_VERSION[0:2] in ["3.", "4."]:
result = json_format.MessageToDict(
message,
preserving_proto_field_name=True,
including_default_value_fields=True, # type: ignore # backward compatibility
)
else:
result = json_format.MessageToDict(
message,
preserving_proto_field_name=True,
always_print_fields_with_no_presence=True,
)

return result

@property
def list_operations(
self,
Expand Down
42 changes: 0 additions & 42 deletions google/api_core/operations_v1/transports/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,6 @@ def __init__(
# TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc.
# TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the
# credentials object
maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
if maybe_url_match is None:
raise ValueError(
f"Unexpected hostname structure: {host}"
) # pragma: NO COVER

url_match_items = maybe_url_match.groupdict()

host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host

super().__init__(
host=host,
credentials=credentials,
Expand Down Expand Up @@ -441,38 +431,6 @@ def _cancel_operation(

return empty_pb2.Empty()

def _convert_protobuf_message_to_dict(
self, message: google.protobuf.message.Message
):
r"""Converts protobuf message to a dictionary.
When the dictionary is encoded to JSON, it conforms to proto3 JSON spec.
Args:
message(google.protobuf.message.Message): The protocol buffers message
instance to serialize.
Returns:
A dict representation of the protocol buffer message.
"""
# For backwards compatibility with protobuf 3.x 4.x
# Remove once support for protobuf 3.x and 4.x is dropped
# https://github.com/googleapis/python-api-core/issues/643
if PROTOBUF_VERSION[0:2] in ["3.", "4."]:
result = json_format.MessageToDict(
message,
preserving_proto_field_name=True,
including_default_value_fields=True, # type: ignore # backward compatibility
)
else:
result = json_format.MessageToDict(
message,
preserving_proto_field_name=True,
always_print_fields_with_no_presence=True,
)

return result

@property
def list_operations(
self,
Expand Down
Loading

0 comments on commit eb78445

Please sign in to comment.