From 2b530a468b4b30b5bf23f299c17c322f5022dc9b Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Dec 2018 14:17:03 -0500 Subject: [PATCH 1/2] Pass transport w/ custom channel to GAPIC API clients. Silences pending deprecation warnings for passing channel. Closes #6887. --- .../cloud/pubsub_v1/publisher/client.py | 27 +++++++++++------ .../cloud/pubsub_v1/subscriber/client.py | 29 ++++++++++++------- .../publisher/test_publisher_client.py | 13 +++++++++ .../subscriber/test_subscriber_client.py | 10 ++++++- 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 76ceb470da24..6da9cd47d458 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -27,6 +27,7 @@ from google.cloud.pubsub_v1 import _gapic from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import publisher_client +from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport from google.cloud.pubsub_v1.publisher._batch import thread @@ -73,16 +74,24 @@ def __init__(self, batch_settings=(), **kwargs): # Use a custom channel. # We need this in order to set appropriate default message size and # keepalive options. - if "channel" not in kwargs: - kwargs["channel"] = grpc_helpers.create_channel( - credentials=kwargs.pop("credentials", None), - target=self.target, - scopes=publisher_client.PublisherClient._DEFAULT_SCOPES, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - }.items(), + if "transport" not in kwargs: + channel = kwargs.pop("channel", None) + if channel is None: + channel = grpc_helpers.create_channel( + credentials=kwargs.pop("credentials", None), + target=self.target, + scopes=publisher_client.PublisherClient._DEFAULT_SCOPES, + options={ + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + }.items(), + ) + # cannot pass both 'channel' and 'credentials' + kwargs.pop("credentials", None) + transport = publisher_grpc_transport.PublisherGrpcTransport( + channel=channel, ) + kwargs["transport"] = transport # Add the metrics headers, and instantiate the underlying GAPIC # client. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/client.py b/pubsub/google/cloud/pubsub_v1/subscriber/client.py index b50a269e99f0..155cd6cf0e85 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/client.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/client.py @@ -25,6 +25,7 @@ from google.cloud.pubsub_v1 import _gapic from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import subscriber_client +from google.cloud.pubsub_v1.gapic.transports import subscriber_grpc_transport from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager @@ -66,17 +67,25 @@ def __init__(self, **kwargs): # Use a custom channel. # We need this in order to set appropriate default message size and # keepalive options. - if "channel" not in kwargs: - kwargs["channel"] = grpc_helpers.create_channel( - credentials=kwargs.pop("credentials", None), - target=self.target, - scopes=subscriber_client.SubscriberClient._DEFAULT_SCOPES, - options={ - "grpc.max_send_message_length": -1, - "grpc.max_receive_message_length": -1, - "grpc.keepalive_time_ms": 30000, - }.items(), + if "transport" not in kwargs: + channel = kwargs.pop("channel", None) + if channel is None: + channel = grpc_helpers.create_channel( + credentials=kwargs.pop("credentials", None), + target=self.target, + scopes=subscriber_client.SubscriberClient._DEFAULT_SCOPES, + options={ + "grpc.max_send_message_length": -1, + "grpc.max_receive_message_length": -1, + "grpc.keepalive_time_ms": 30000, + }.items(), + ) + # cannot pass both 'channel' and 'credentials' + kwargs.pop("credentials", None) + transport = subscriber_grpc_transport.SubscriberGrpcTransport( + channel=channel, ) + kwargs["transport"] = transport # Add the metrics headers, and instantiate the underlying GAPIC # client. diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index a141e1f12187..05e4c8c67209 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -36,6 +36,19 @@ def test_init(): assert client.batch_settings.max_messages == 1000 +def test_init_w_custom_transport(): + transport = object() + client = publisher.Client(transport=transport) + + # A plain client should have an `api` (the underlying GAPIC) and a + # batch settings object, which should have the defaults. + assert isinstance(client.api, publisher_client.PublisherClient) + assert client.api.transport is transport + assert client.batch_settings.max_bytes == 10 * 1000 * 1000 + assert client.batch_settings.max_latency == 0.05 + assert client.batch_settings.max_messages == 1000 + + def test_init_emulator(monkeypatch): monkeypatch.setenv("PUBSUB_EMULATOR_HOST", "/foo/bar/") # NOTE: When the emulator host is set, a custom channel will be used, so diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 5acd5b6f8dd7..d4914fee8f5b 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -16,6 +16,7 @@ import mock from google.cloud.pubsub_v1 import subscriber +from google.cloud.pubsub_v1.gapic import subscriber_client from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber import futures @@ -23,7 +24,14 @@ def test_init(): creds = mock.Mock(spec=credentials.Credentials) client = subscriber.Client(credentials=creds) - assert client.api is not None + assert isinstance(client.api, subscriber_client.SubscriberClient) + + +def test_init_w_custom_transport(): + transport = object() + client = subscriber.Client(transport=transport) + assert isinstance(client.api, subscriber_client.SubscriberClient) + assert client.api.transport is transport def test_init_emulator(monkeypatch): From b473ba1683a4271fbc08e20cc8d73eff2ce41a04 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 19 Dec 2018 17:50:48 -0500 Subject: [PATCH 2/2] Blacken. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 4 +--- pubsub/google/cloud/pubsub_v1/subscriber/client.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 6da9cd47d458..b837de24c6f0 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -88,9 +88,7 @@ def __init__(self, batch_settings=(), **kwargs): ) # cannot pass both 'channel' and 'credentials' kwargs.pop("credentials", None) - transport = publisher_grpc_transport.PublisherGrpcTransport( - channel=channel, - ) + transport = publisher_grpc_transport.PublisherGrpcTransport(channel=channel) kwargs["transport"] = transport # Add the metrics headers, and instantiate the underlying GAPIC diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/client.py b/pubsub/google/cloud/pubsub_v1/subscriber/client.py index 155cd6cf0e85..0540333ad8ea 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/client.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/client.py @@ -83,7 +83,7 @@ def __init__(self, **kwargs): # cannot pass both 'channel' and 'credentials' kwargs.pop("credentials", None) transport = subscriber_grpc_transport.SubscriberGrpcTransport( - channel=channel, + channel=channel ) kwargs["transport"] = transport