diff --git a/docs/pubsub-surface.rst b/docs/pubsub-surface.rst index 73708ed90fcb..f1abcba392be 100644 --- a/docs/pubsub-surface.rst +++ b/docs/pubsub-surface.rst @@ -199,11 +199,7 @@ Delete a subscription: Pull messages from a subscription --------------------------------- -Fetch pending messages for a pull subscription - -.. note:: - - The messages will have been ACKed already. +Fetch pending messages for a pull subscription: .. doctest:: @@ -215,14 +211,23 @@ Fetch pending messages for a pull subscription ... topic.publish('this is the first message_payload') ... topic.publish('this is the second message_payload', ... attr1='value1', attr2='value2') - >>> messages = subscription.pull() # API request + >>> received = subscription.pull() # API request + >>> messages = [recv[1] for recv in received] >>> [message.id for message in messages] [, ] >>> [message.data for message in messages] ['this is the first message_payload', 'this is the second message_payload'] - >>> [message.attrs for message in messages] + >>> [message.attributes for message in messages] [{}, {'attr1': 'value1', 'attr2': 'value2'}] +Note that received messages must be acknowledged, or else the back-end +will re-send them later: + +.. doctest:: + + >>> ack_ids = [recv[0] for recv in received] + >>> subscription.acknowledge(ack_ids) + Fetch a limited number of pending messages for a pull subscription: .. doctest:: @@ -235,8 +240,9 @@ Fetch a limited number of pending messages for a pull subscription: ... topic.publish('this is the first message_payload') ... topic.publish('this is the second message_payload', ... attr1='value1', attr2='value2') - >>> [message.id for message in subscription.pull(max_messages=1)] - [] + >>> received = subscription.pull(max_messages=1) # API request + >>> messages = [recv[1] for recv in received] + >>> [message.id for message in messages] Fetch messages for a pull subscription without blocking (none pending): @@ -246,6 +252,7 @@ Fetch messages for a pull subscription without blocking (none pending): >>> from gcloud.pubsub.subscription import Subscription >>> topic = Topic('topic_name') >>> subscription = Subscription('subscription_name', topic) - >>> [message.id for message in subscription.pull(return_immediately=True)] + >>> received = subscription.pull(max_messages=1) # API request + >>> messages = [recv[1] for recv in received] + >>> [message.id for message in messages] [] - diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py index f91fb63dc0f8..4816a5a1d6a5 100644 --- a/gcloud/pubsub/api.py +++ b/gcloud/pubsub/api.py @@ -66,7 +66,7 @@ def list_topics(page_size=None, page_token=None, path = '/projects/%s/topics' % project resp = connection.api_request(method='GET', path=path, query_params=params) - topics = [_topic_from_resource(resource, connection) + topics = [Topic.from_api_repr(resource, connection) for resource in resp['topics']] return topics, resp.get('nextPageToken') @@ -128,47 +128,8 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None, resp = connection.api_request(method='GET', path=path, query_params=params) topics = {} - subscriptions = [_subscription_from_resource(resource, topics, connection) + subscriptions = [Subscription.from_api_repr(resource, + connection=connection, + topics=topics) for resource in resp['subscriptions']] return subscriptions, resp.get('nextPageToken') - - -def _topic_from_resource(resource, connection): - """Construct a topic given its full path-like name. - - :type resource: dict - :param resource: topic resource representation returned from the API - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: connection to use for the topic. - - :rtype: :class:`gcloud.pubsub.topic.Topic` - """ - _, project, _, name = resource['name'].split('/') - return Topic(name, project, connection) - - -def _subscription_from_resource(resource, topics, connection): - """Construct a topic given its full path-like name. - - :type resource: string - :param resource: subscription resource representation returned from the API - - :type topics: dict, full_name -> :class:`gcloud.pubsub.topic.Topic` - :param topics: the topics to which subscriptions have been bound - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: connection to use for the topic. - - :rtype: :class:`gcloud.pubsub.subscription.Subscription` - """ - t_name = resource['topic'] - topic = topics.get(t_name) - if topic is None: - topic = topics[t_name] = _topic_from_resource({'name': t_name}, - connection) - _, _, _, name = resource['name'].split('/') - ack_deadline = resource.get('ackDeadlineSeconds') - push_config = resource.get('pushConfig', {}) - push_endpoint = push_config.get('pushEndpoint') - return Subscription(name, topic, ack_deadline, push_endpoint) diff --git a/gcloud/pubsub/message.py b/gcloud/pubsub/message.py new file mode 100644 index 000000000000..1f087a6c8d83 --- /dev/null +++ b/gcloud/pubsub/message.py @@ -0,0 +1,56 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Topics.""" + +import base64 + + +class Message(object): + """Messages can be published to a topic and received by subscribers. + + See: + https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage + + :type name: bytes + :param name: the payload of the message + + :type message_id: string + :param message_id: An ID assigned to the message by the API. + + :type attrs: dict or None + :param attrs: Extra metadata associated by the publisher with the message. + """ + def __init__(self, data, message_id, attributes=None): + self.data = data + self.message_id = message_id + self._attrs = attributes + + @property + def attrs(self): + """Lazily-constructed attribute dictionary""" + if self._attrs is None: + self._attrs = {} + return self._attrs + + @classmethod + def from_api_repr(cls, api_repr): + """Factory: construct message from API representation. + + :type api_repr: dict or None + :param api_repr: The API representation of the message + """ + data = base64.b64decode(api_repr['data']) + return cls(data=data, message_id=api_repr['messageId'], + attributes=api_repr.get('attributes')) diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 2fab37924ea6..269c15e2415b 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -15,6 +15,8 @@ """Define API Subscriptions.""" from gcloud.exceptions import NotFound +from gcloud.pubsub.message import Message +from gcloud.pubsub.topic import Topic class Subscription(object): @@ -43,6 +45,37 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None): self.ack_deadline = ack_deadline self.push_endpoint = push_endpoint + @classmethod + def from_api_repr(cls, resource, connection=None, topics=None): + """Factory: construct a topic given its API representation + + :type resource: dict + :param resource: topic resource representation returned from the API + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the default inferred from the + environment. + + :type topics: dict or None + :param topics: A mapping of topic names -> topics. If not passed, + the subscription will have a newly-created topic. + + :rtype: :class:`gcloud.pubsub.subscription.Subscription` + """ + if topics is None: + topics = {} + t_name = resource['topic'] + topic = topics.get(t_name) + if topic is None: + topic = topics[t_name] = Topic.from_api_repr({'name': t_name}, + connection) + _, _, _, name = resource['name'].split('/') + ack_deadline = resource.get('ackDeadlineSeconds') + push_config = resource.get('pushConfig', {}) + push_endpoint = push_config.get('pushEndpoint') + return cls(name, topic, ack_deadline, push_endpoint) + @property def path(self): """URL path for the subscription's APIs""" @@ -130,10 +163,10 @@ def pull(self, return_immediately=False, max_messages=1): :type max_messages: int :param max_messages: the maximum number of messages to return. - :rtype: list of dict - :returns: sequence of mappings, each containing keys ``ackId`` (the - ID to be used in a subsequent call to :meth:`acknowledge`) - and ``message``. + :rtype: list of (ack_id, message) tuples + :returns: sequence of tuples: ``ack_id`` is the ID to be used in a + subsequent call to :meth:`acknowledge`, and ``message`` + is an instance of :class:`gcloud.pubsub.message.Message`. """ data = {'returnImmediately': return_immediately, 'maxMessages': max_messages} @@ -141,7 +174,8 @@ def pull(self, return_immediately=False, max_messages=1): response = conn.api_request(method='POST', path='%s:pull' % self.path, data=data) - return response['receivedMessages'] + return [(info['ackId'], Message.from_api_repr(info['message'])) + for info in response['receivedMessages']] def acknowledge(self, ack_ids): """API call: acknowledge retrieved messages for the subscription. diff --git a/gcloud/pubsub/test_message.py b/gcloud/pubsub/test_message.py new file mode 100644 index 000000000000..1e28ab7c9333 --- /dev/null +++ b/gcloud/pubsub/test_message.py @@ -0,0 +1,68 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestMessage(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.message import Message + return Message + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_no_attrs(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + message = self._makeOne(data=DATA, message_id=MESSAGE_ID) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attrs, {}) + + def test_ctor_w_attrs(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + ATTRS = {'a': 'b'} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attrs, ATTRS) + + def test_from_api_repr_no_attrs(self): + from base64 import b64encode as b64 + DATA = b'DEADBEEF' + B64_DATA = b64(DATA) + MESSAGE_ID = '12345' + api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID} + message = self._getTargetClass().from_api_repr(api_repr) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attrs, {}) + + def test_from_api_repr_w_attrs(self): + from base64 import b64encode as b64 + DATA = b'DEADBEEF' + B64_DATA = b64(DATA) + MESSAGE_ID = '12345' + ATTRS = {'a': 'b'} + api_repr = {'data': B64_DATA, + 'messageId': MESSAGE_ID, + 'attributes': ATTRS} + message = self._getTargetClass().from_api_repr(api_repr) + self.assertEqual(message.data, DATA) + self.assertEqual(message.message_id, MESSAGE_ID) + self.assertEqual(message.attrs, ATTRS) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index eb5145cd989d..9852d7c5c38b 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -44,6 +44,82 @@ def test_ctor_explicit(self): self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) + def test_from_api_repr_no_topics_no_connection(self): + from gcloud.pubsub.topic import Topic + from gcloud.pubsub._testing import _monkey_defaults + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + conn = _Connection() + klass = self._getTargetClass() + with _monkey_defaults(connection=conn): + subscription = klass.from_api_repr(resource, connection=conn) + self.assertEqual(subscription.name, SUB_NAME) + topic = subscription.topic + self.assertTrue(isinstance(topic, Topic)) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertTrue(topic.connection is conn) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_from_api_repr_w_topics_no_topic_match(self): + from gcloud.pubsub.topic import Topic + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + conn = _Connection() + topics = {} + klass = self._getTargetClass() + subscription = klass.from_api_repr(resource, connection=conn, + topics=topics) + self.assertEqual(subscription.name, SUB_NAME) + topic = subscription.topic + self.assertTrue(isinstance(topic, Topic)) + self.assertTrue(topic is topics[TOPIC_PATH]) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertTrue(topic.connection is conn) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + + def test_from_api_repr_w_topics_w_topic_match(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_NAME = 'sub_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + DEADLINE = 42 + ENDPOINT = 'https://api.example.com/push' + resource = {'topic': TOPIC_PATH, + 'name': SUB_PATH, + 'ackDeadlineSeconds': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}} + topic = object() + topics = {TOPIC_PATH: topic} + klass = self._getTargetClass() + subscription = klass.from_api_repr(resource, topics=topics) + self.assertEqual(subscription.name, SUB_NAME) + self.assertTrue(subscription.topic is topic) + self.assertEqual(subscription.ack_deadline, DEADLINE) + self.assertEqual(subscription.push_endpoint, ENDPOINT) + def test_create_pull_wo_ack_deadline(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' @@ -172,6 +248,7 @@ def test_modify_push_config_wo_endpoint(self): def test_pull_wo_return_immediately_wo_max_messages(self): import base64 + from gcloud.pubsub.message import Message PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -180,15 +257,19 @@ def test_pull_wo_return_immediately_wo_max_messages(self): MSG_ID = 'BEADCAFE' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD) - MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}} + MESSAGE = {'messageId': MSG_ID, 'data': B64} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) subscription = self._makeOne(SUB_NAME, topic) pulled = subscription.pull() self.assertEqual(len(pulled), 1) - self.assertEqual(pulled[0]['ackId'], ACK_ID) - self.assertEqual(pulled[0]['message'], MESSAGE) + ack_id, message = pulled[0] + self.assertEqual(ack_id, ACK_ID) + self.assertTrue(isinstance(message, Message)) + self.assertEqual(message.data, PAYLOAD) + self.assertEqual(message.message_id, MSG_ID) + self.assertEqual(message.attrs, {}) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') @@ -198,6 +279,7 @@ def test_pull_wo_return_immediately_wo_max_messages(self): def test_pull_w_return_immediately_w_max_messages(self): import base64 + from gcloud.pubsub.message import Message PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -206,15 +288,19 @@ def test_pull_w_return_immediately_w_max_messages(self): MSG_ID = 'BEADCAFE' PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD) - MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}} + MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn) subscription = self._makeOne(SUB_NAME, topic) pulled = subscription.pull(return_immediately=True, max_messages=3) self.assertEqual(len(pulled), 1) - self.assertEqual(pulled[0]['ackId'], ACK_ID) - self.assertEqual(pulled[0]['message'], MESSAGE) + ack_id, message = pulled[0] + self.assertEqual(ack_id, ACK_ID) + self.assertTrue(isinstance(message, Message)) + self.assertEqual(message.data, PAYLOAD) + self.assertEqual(message.message_id, MSG_ID) + self.assertEqual(message.attrs, {'a': 'b'}) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index 273087821f2a..fa1ce55a878c 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -50,6 +50,34 @@ def test_ctor_w_explicit_project_and_connection(self): 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) self.assertTrue(topic.connection is conn) + def test_from_api_repr_wo_connection(self): + from gcloud.pubsub._testing import _monkey_defaults + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + resource = {'name': PATH} + klass = self._getTargetClass() + conn = _Connection() + with _monkey_defaults(connection=conn): + topic = klass.from_api_repr(resource) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(topic.full_name, PATH) + self.assertTrue(topic.connection is conn) + + def test_from_api_repr_w_connection(self): + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + resource = {'name': PATH} + conn = object() + klass = self._getTargetClass() + topic = klass.from_api_repr(resource, connection=conn) + self.assertEqual(topic.name, TOPIC_NAME) + self.assertEqual(topic.project, PROJECT) + self.assertEqual(topic.full_name, PATH) + self.assertTrue(topic.connection is conn) + def test_create(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 018963657e00..9c3482cb4fa9 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -50,6 +50,23 @@ def __init__(self, name, project=None, connection=None): self.project = project self.connection = connection + @classmethod + def from_api_repr(cls, resource, connection=None): + """Factory: construct a topic given its API representation + + :type resource: dict + :param resource: topic resource representation returned from the API + + :type connection: :class:`gcloud.pubsub.connection.Connection` or None + :param connection: the connection to use. If not passed, + falls back to the default inferred from the + environment. + + :rtype: :class:`gcloud.pubsub.topic.Topic` + """ + _, project, _, name = resource['name'].split('/') + return cls(name, project, connection) + @property def full_name(self): """Fully-qualified name used in topic / subscription APIs""" diff --git a/regression/pubsub.py b/regression/pubsub.py index f72bdd0510ab..fb859dfc2d12 100644 --- a/regression/pubsub.py +++ b/regression/pubsub.py @@ -102,7 +102,6 @@ def test_list_subscriptions(self): self.assertEqual(len(created), len(subscriptions_to_create)) def test_message_pull_mode_e2e(self): - from base64 import b64encode as b64 TOPIC_NAME = 'subscribe-me' topic = Topic(TOPIC_NAME) self.assertFalse(topic.exists()) @@ -115,12 +114,13 @@ def test_message_pull_mode_e2e(self): self.to_delete.append(subscription) MESSAGE = b'MESSAGE' - EXTRA = b'EXTRA TWO' + EXTRA = b'EXTRA' topic.publish(MESSAGE, extra=EXTRA) received = subscription.pull() - ack_ids = [msg['ackId'] for msg in received] + ack_ids = [recv[0] for recv in received] subscription.acknowledge(ack_ids) - one, = received - self.assertEqual(one['message']['data'], b64(MESSAGE)) - self.assertEqual(one['message']['attributes'], {'extra': EXTRA}) + messages = [recv[1] for recv in received] + message, = messages + self.assertEqual(message.data, MESSAGE) + self.assertEqual(message.attributes, {'extra': EXTRA})