Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing most (direct) connection usage in Pub / Sub #2874

Merged
merged 3 commits into from
Dec 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,48 +512,54 @@ def _received_message_pb_to_mapping(received_message_pb):
}


def make_gax_publisher_api(connection):
def make_gax_publisher_api(credentials=None, host=None):
"""Create an instance of the GAX Publisher API.

If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
If the ``credentials`` are omitted, then we create an insecure
``channel`` pointing at the local Pub / Sub emulator.

:type connection: :class:`~google.cloud.pubsub._http.Connection`
:param connection: The connection that holds configuration details.
:type credentials: :class:`~google.auth.credentials.Credentials`
:param credentials: (Optional) Credentials for getting access
tokens.

:type host: str
:param host: (Optional) The host for an insecure channel. Only
used if ``credentials`` are omitted.

:rtype: :class:`.publisher_client.PublisherClient`
:returns: A publisher API instance with the proper connection
configuration.
:returns: A publisher API instance with the proper channel.
"""
if connection.in_emulator:
channel = insecure_channel(connection.host)
if credentials is None:
channel = insecure_channel(host)
else:
channel = make_secure_channel(
connection.credentials, DEFAULT_USER_AGENT,
credentials, DEFAULT_USER_AGENT,
PublisherClient.SERVICE_ADDRESS)
return PublisherClient(channel=channel)


def make_gax_subscriber_api(connection):
def make_gax_subscriber_api(credentials=None, host=None):
"""Create an instance of the GAX Subscriber API.

If the ``connection`` is intended for a local emulator, then
an insecure ``channel`` is created pointing at the local
Pub / Sub server.
If the ``credentials`` are omitted, then we create an insecure
``channel`` pointing at the local Pub / Sub emulator.

:type credentials: :class:`~google.auth.credentials.Credentials`
:param credentials: (Optional) Credentials for getting access
tokens.

:type connection: :class:`~google.cloud.pubsub._http.Connection`
:param connection: The connection that holds configuration details.
:type host: str
:param host: (Optional) The host for an insecure channel. Only
used if ``credentials`` are omitted.

:rtype: :class:`.subscriber_client.SubscriberClient`
:returns: A subscriber API instance with the proper connection
configuration.
:returns: A subscriber API instance with the proper channel.
"""
if connection.in_emulator:
channel = insecure_channel(connection.host)
if credentials is None:
channel = insecure_channel(host)
else:
channel = make_secure_channel(
connection.credentials, DEFAULT_USER_AGENT,
credentials, DEFAULT_USER_AGENT,
SubscriberClient.SERVICE_ADDRESS)
return SubscriberClient(channel=channel)

Expand Down
58 changes: 22 additions & 36 deletions pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Create / interact with Google Cloud Pub/Sub connections."""
"""Interact with Google Cloud Pub/Sub via JSON-over-HTTP."""

import base64
import copy
Expand Down Expand Up @@ -109,7 +109,7 @@ class _PublisherAPI(object):

def __init__(self, client):
self._client = client
self._connection = client._connection
self.api_request = client._connection.api_request

def list_topics(self, project, page_size=None, page_token=None):
"""API call: list topics for a given project
Expand All @@ -131,7 +131,7 @@ def list_topics(self, project, page_size=None, page_token=None):

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic`
accessible to the current connection.
accessible to the current client.
"""
extra_params = {}
if page_size is not None:
Expand All @@ -156,8 +156,7 @@ def topic_create(self, topic_path):
:rtype: dict
:returns: ``Topic`` resource returned from the API.
"""
conn = self._connection
return conn.api_request(method='PUT', path='/%s' % (topic_path,))
return self.api_request(method='PUT', path='/%s' % (topic_path,))

def topic_get(self, topic_path):
"""API call: retrieve a topic
Expand All @@ -172,8 +171,7 @@ def topic_get(self, topic_path):
:rtype: dict
:returns: ``Topic`` resource returned from the API.
"""
conn = self._connection
return conn.api_request(method='GET', path='/%s' % (topic_path,))
return self.api_request(method='GET', path='/%s' % (topic_path,))

def topic_delete(self, topic_path):
"""API call: delete a topic
Expand All @@ -185,8 +183,7 @@ def topic_delete(self, topic_path):
:param topic_path: the fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
"""
conn = self._connection
conn.api_request(method='DELETE', path='/%s' % (topic_path,))
self.api_request(method='DELETE', path='/%s' % (topic_path,))

def topic_publish(self, topic_path, messages):
"""API call: publish one or more messages to a topic
Expand All @@ -206,9 +203,8 @@ def topic_publish(self, topic_path, messages):
"""
messages_to_send = copy.deepcopy(messages)
_transform_messages_base64(messages_to_send, _base64_unicode)
conn = self._connection
data = {'messages': messages_to_send}
response = conn.api_request(
response = self.api_request(
method='POST', path='/%s:publish' % (topic_path,), data=data)
return response['messageIds']

Expand Down Expand Up @@ -257,7 +253,7 @@ class _SubscriberAPI(object):

def __init__(self, client):
self._client = client
self._connection = client._connection
self.api_request = client._connection.api_request

def list_subscriptions(self, project, page_size=None, page_token=None):
"""API call: list subscriptions for a given project
Expand Down Expand Up @@ -328,7 +324,6 @@ def subscription_create(self, subscription_path, topic_path,
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
conn = self._connection
path = '/%s' % (subscription_path,)
resource = {'topic': topic_path}

Expand All @@ -338,7 +333,7 @@ def subscription_create(self, subscription_path, topic_path,
if push_endpoint is not None:
resource['pushConfig'] = {'pushEndpoint': push_endpoint}

return conn.api_request(method='PUT', path=path, data=resource)
return self.api_request(method='PUT', path=path, data=resource)

def subscription_get(self, subscription_path):
"""API call: retrieve a subscription
Expand All @@ -354,9 +349,8 @@ def subscription_get(self, subscription_path):
:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
conn = self._connection
path = '/%s' % (subscription_path,)
return conn.api_request(method='GET', path=path)
return self.api_request(method='GET', path=path)

def subscription_delete(self, subscription_path):
"""API call: delete a subscription
Expand All @@ -369,9 +363,8 @@ def subscription_delete(self, subscription_path):
the fully-qualified path of the subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
"""
conn = self._connection
path = '/%s' % (subscription_path,)
conn.api_request(method='DELETE', path=path)
self.api_request(method='DELETE', path=path)

def subscription_modify_push_config(self, subscription_path,
push_endpoint):
Expand All @@ -390,10 +383,9 @@ def subscription_modify_push_config(self, subscription_path,
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.
"""
conn = self._connection
path = '/%s:modifyPushConfig' % (subscription_path,)
resource = {'pushConfig': {'pushEndpoint': push_endpoint}}
conn.api_request(method='POST', path=path, data=resource)
self.api_request(method='POST', path=path, data=resource)

def subscription_pull(self, subscription_path, return_immediately=False,
max_messages=1):
Expand All @@ -419,13 +411,12 @@ def subscription_pull(self, subscription_path, return_immediately=False,
:rtype: list of dict
:returns: the ``receivedMessages`` element of the response.
"""
conn = self._connection
path = '/%s:pull' % (subscription_path,)
data = {
'returnImmediately': return_immediately,
'maxMessages': max_messages,
}
response = conn.api_request(method='POST', path=path, data=data)
response = self.api_request(method='POST', path=path, data=data)
messages = response.get('receivedMessages', ())
_transform_messages_base64(messages, base64.b64decode, 'message')
return messages
Expand All @@ -444,12 +435,11 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
"""
conn = self._connection
path = '/%s:acknowledge' % (subscription_path,)
data = {
'ackIds': ack_ids,
}
conn.api_request(method='POST', path=path, data=data)
self.api_request(method='POST', path=path, data=data)

def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
ack_deadline):
Expand All @@ -470,24 +460,23 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
"""
conn = self._connection
path = '/%s:modifyAckDeadline' % (subscription_path,)
data = {
'ackIds': ack_ids,
'ackDeadlineSeconds': ack_deadline,
}
conn.api_request(method='POST', path=path, data=data)
self.api_request(method='POST', path=path, data=data)


class _IAMPolicyAPI(object):
"""Helper mapping IAM policy-related APIs.

:type connection: :class:`Connection`
:param connection: the connection used to make API requests.
:type client: :class:`~google.cloud.pubsub.client.Client`
:param client: the client used to make API requests.
"""

def __init__(self, connection):
self._connection = connection
def __init__(self, client):
self.api_request = client._connection.api_request

def get_iam_policy(self, target_path):
"""API call: fetch the IAM policy for the target
Expand All @@ -502,9 +491,8 @@ def get_iam_policy(self, target_path):
:rtype: dict
:returns: the resource returned by the ``getIamPolicy`` API request.
"""
conn = self._connection
path = '/%s:getIamPolicy' % (target_path,)
return conn.api_request(method='GET', path=path)
return self.api_request(method='GET', path=path)

def set_iam_policy(self, target_path, policy):
"""API call: update the IAM policy for the target
Expand All @@ -522,10 +510,9 @@ def set_iam_policy(self, target_path, policy):
:rtype: dict
:returns: the resource returned by the ``setIamPolicy`` API request.
"""
conn = self._connection
wrapped = {'policy': policy}
path = '/%s:setIamPolicy' % (target_path,)
return conn.api_request(method='POST', path=path, data=wrapped)
return self.api_request(method='POST', path=path, data=wrapped)

def test_iam_permissions(self, target_path, permissions):
"""API call: test permissions
Expand All @@ -543,10 +530,9 @@ def test_iam_permissions(self, target_path, permissions):
:rtype: dict
:returns: the resource returned by the ``getIamPolicy`` API request.
"""
conn = self._connection
wrapped = {'permissions': permissions}
path = '/%s:testIamPermissions' % (target_path,)
resp = conn.api_request(method='POST', path=path, data=wrapped)
resp = self.api_request(method='POST', path=path, data=wrapped)
return resp.get('permissions', [])


Expand Down
16 changes: 13 additions & 3 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ def publisher_api(self):
"""Helper for publisher-related API calls."""
if self._publisher_api is None:
if self._use_gax:
generated = make_gax_publisher_api(self._connection)
if self._connection.in_emulator:
generated = make_gax_publisher_api(
host=self._connection.host)
else:
generated = make_gax_publisher_api(
credentials=self._credentials)
self._publisher_api = GAXPublisherAPI(generated, self)
else:
self._publisher_api = JSONPublisherAPI(self)
Expand All @@ -102,7 +107,12 @@ def subscriber_api(self):
"""Helper for subscriber-related API calls."""
if self._subscriber_api is None:
if self._use_gax:
generated = make_gax_subscriber_api(self._connection)
if self._connection.in_emulator:
generated = make_gax_subscriber_api(
host=self._connection.host)
else:
generated = make_gax_subscriber_api(
credentials=self._credentials)
self._subscriber_api = GAXSubscriberAPI(generated, self)
else:
self._subscriber_api = JSONSubscriberAPI(self)
Expand All @@ -112,7 +122,7 @@ def subscriber_api(self):
def iam_policy_api(self):
"""Helper for IAM policy-related API calls."""
if self._iam_policy_api is None:
self._iam_policy_api = _IAMPolicyAPI(self._connection)
self._iam_policy_api = _IAMPolicyAPI(self)
return self._iam_policy_api

def list_topics(self, page_size=None, page_token=None):
Expand Down
Loading