Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/sub: pass transport w/ custom channel to GAPIC API clients. #7008

Merged
merged 2 commits into from
Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
from google.cloud.pubsub_v1.publisher._batch import thread


Expand Down Expand Up @@ -73,16 +74,22 @@ def __init__(self, batch_settings=(), **kwargs):
# Use a custom channel.
# We need this in order to set appropriate default message size and
# keepalive options.
if "channel" not in kwargs:
kwargs["channel"] = grpc_helpers.create_channel(
credentials=kwargs.pop("credentials", None),
target=self.target,
scopes=publisher_client.PublisherClient._DEFAULT_SCOPES,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
}.items(),
)
if "transport" not in kwargs:
channel = kwargs.pop("channel", None)
if channel is None:
channel = grpc_helpers.create_channel(
credentials=kwargs.pop("credentials", None),
target=self.target,
scopes=publisher_client.PublisherClient._DEFAULT_SCOPES,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
}.items(),
)
# cannot pass both 'channel' and 'credentials'
kwargs.pop("credentials", None)
transport = publisher_grpc_transport.PublisherGrpcTransport(channel=channel)
kwargs["transport"] = transport

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
Expand Down
29 changes: 19 additions & 10 deletions pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.gapic import subscriber_client
from google.cloud.pubsub_v1.gapic.transports import subscriber_grpc_transport
from google.cloud.pubsub_v1.subscriber import futures
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager

Expand Down Expand Up @@ -66,17 +67,25 @@ def __init__(self, **kwargs):
# Use a custom channel.
# We need this in order to set appropriate default message size and
# keepalive options.
if "channel" not in kwargs:
kwargs["channel"] = grpc_helpers.create_channel(
credentials=kwargs.pop("credentials", None),
target=self.target,
scopes=subscriber_client.SubscriberClient._DEFAULT_SCOPES,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
}.items(),
if "transport" not in kwargs:
channel = kwargs.pop("channel", None)
if channel is None:
channel = grpc_helpers.create_channel(
credentials=kwargs.pop("credentials", None),
target=self.target,
scopes=subscriber_client.SubscriberClient._DEFAULT_SCOPES,
options={
"grpc.max_send_message_length": -1,
"grpc.max_receive_message_length": -1,
"grpc.keepalive_time_ms": 30000,
}.items(),
)
# cannot pass both 'channel' and 'credentials'
kwargs.pop("credentials", None)
transport = subscriber_grpc_transport.SubscriberGrpcTransport(
channel=channel
)
kwargs["transport"] = transport

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
Expand Down
13 changes: 13 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ def test_init():
assert client.batch_settings.max_messages == 1000


def test_init_w_custom_transport():
transport = object()
client = publisher.Client(transport=transport)

# A plain client should have an `api` (the underlying GAPIC) and a
# batch settings object, which should have the defaults.
assert isinstance(client.api, publisher_client.PublisherClient)
assert client.api.transport is transport
assert client.batch_settings.max_bytes == 10 * 1000 * 1000
assert client.batch_settings.max_latency == 0.05
assert client.batch_settings.max_messages == 1000


def test_init_emulator(monkeypatch):
monkeypatch.setenv("PUBSUB_EMULATOR_HOST", "/foo/bar/")
# NOTE: When the emulator host is set, a custom channel will be used, so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@
import mock

from google.cloud.pubsub_v1 import subscriber
from google.cloud.pubsub_v1.gapic import subscriber_client
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import futures


def test_init():
creds = mock.Mock(spec=credentials.Credentials)
client = subscriber.Client(credentials=creds)
assert client.api is not None
assert isinstance(client.api, subscriber_client.SubscriberClient)


def test_init_w_custom_transport():
transport = object()
client = subscriber.Client(transport=transport)
assert isinstance(client.api, subscriber_client.SubscriberClient)
assert client.api.transport is transport


def test_init_emulator(monkeypatch):
Expand Down