Skip to content

Commit

Permalink
Add 'pubsub.message.Message' class.
Browse files Browse the repository at this point in the history
Maps 'pubsub.v1beta2.PubsubMessage', handles base64-decode of payload.

Return 'Message' instances from 'Subscription.pull()'.

Update docs accordingly.
  • Loading branch information
tseaver committed Apr 6, 2015
1 parent 1052aad commit 6a5f480
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 28 deletions.
29 changes: 18 additions & 11 deletions docs/pubsub-surface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,7 @@ Delete a subscription:
Pull messages from a subscription
---------------------------------

Fetch pending messages for a pull subscription

.. note::

The messages will have been ACKed already.
Fetch pending messages for a pull subscription:

.. doctest::

Expand All @@ -215,14 +211,23 @@ Fetch pending messages for a pull subscription
... topic.publish('this is the first message_payload')
... topic.publish('this is the second message_payload',
... attr1='value1', attr2='value2')
>>> messages = subscription.pull() # API request
>>> received = subscription.pull() # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
[<message_id1>, <message_id2>]
>>> [message.data for message in messages]
['this is the first message_payload', 'this is the second message_payload']
>>> [message.attrs for message in messages]
>>> [message.attributes for message in messages]
[{}, {'attr1': 'value1', 'attr2': 'value2'}]

Note that received messages must be acknowledged, or else the back-end
will re-send them later:

.. doctest::

>>> ack_ids = [recv[0] for recv in received]
>>> subscription.acknowledge(ack_ids)

Fetch a limited number of pending messages for a pull subscription:

.. doctest::
Expand All @@ -235,8 +240,9 @@ Fetch a limited number of pending messages for a pull subscription:
... topic.publish('this is the first message_payload')
... topic.publish('this is the second message_payload',
... attr1='value1', attr2='value2')
>>> [message.id for message in subscription.pull(max_messages=1)]
[<message_id1>]
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]

Fetch messages for a pull subscription without blocking (none pending):

Expand All @@ -246,6 +252,7 @@ Fetch messages for a pull subscription without blocking (none pending):
>>> from gcloud.pubsub.subscription import Subscription
>>> topic = Topic('topic_name')
>>> subscription = Subscription('subscription_name', topic)
>>> [message.id for message in subscription.pull(return_immediately=True)]
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
[]

56 changes: 56 additions & 0 deletions gcloud/pubsub/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Define API Topics."""

import base64


class Message(object):
"""Messages can be published to a topic and received by subscribers.
See:
https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage
:type name: bytes
:param name: the payload of the message
:type message_id: string
:param message_id: An ID assigned to the message by the API.
:type attrs: dict or None
:param attrs: Extra metadata associated by the publisher with the message.
"""
def __init__(self, data, message_id, attributes=None):
self.data = data
self.message_id = message_id
self._attrs = attributes

@property
def attrs(self):
"""Lazily-constructed attribute dictionary"""
if self._attrs is None:
self._attrs = {}
return self._attrs

@classmethod
def from_api_repr(cls, api_repr):
"""Factory: construct message from API representation.
:type api_repr: dict or None
:param api_repr: The API representation of the message
"""
data = base64.b64decode(api_repr['data'])
return cls(data=data, message_id=api_repr['messageId'],
attributes=api_repr.get('attributes'))
12 changes: 7 additions & 5 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Define API Subscriptions."""

from gcloud.exceptions import NotFound
from gcloud.pubsub.message import Message
from gcloud.pubsub.topic import Topic


Expand Down Expand Up @@ -162,18 +163,19 @@ def pull(self, return_immediately=False, max_messages=1):
:type max_messages: int
:param max_messages: the maximum number of messages to return.
:rtype: list of dict
:returns: sequence of mappings, each containing keys ``ackId`` (the
ID to be used in a subsequent call to :meth:`acknowledge`)
and ``message``.
:rtype: list of (ack_id, message) tuples
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
subsequent call to :meth:`acknowledge`, and ``message``
is an instance of :class:`gcloud.pubsub.message.Message`.
"""
data = {'returnImmediately': return_immediately,
'maxMessages': max_messages}
conn = self.topic.connection
response = conn.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
return response['receivedMessages']
return [(info['ackId'], Message.from_api_repr(info['message']))
for info in response['receivedMessages']]

def acknowledge(self, ack_ids):
"""API call: acknowledge retrieved messages for the subscription.
Expand Down
68 changes: 68 additions & 0 deletions gcloud/pubsub/test_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest2


class TestMessage(unittest2.TestCase):

def _getTargetClass(self):
from gcloud.pubsub.message import Message
return Message

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def test_ctor_no_attrs(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
message = self._makeOne(data=DATA, message_id=MESSAGE_ID)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, {})

def test_ctor_w_attrs(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
ATTRS = {'a': 'b'}
message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
attributes=ATTRS)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, ATTRS)

def test_from_api_repr_no_attrs(self):
from base64 import b64encode as b64
DATA = b'DEADBEEF'
B64_DATA = b64(DATA)
MESSAGE_ID = '12345'
api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID}
message = self._getTargetClass().from_api_repr(api_repr)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, {})

def test_from_api_repr_w_attrs(self):
from base64 import b64encode as b64
DATA = b'DEADBEEF'
B64_DATA = b64(DATA)
MESSAGE_ID = '12345'
ATTRS = {'a': 'b'}
api_repr = {'data': B64_DATA,
'messageId': MESSAGE_ID,
'attributes': ATTRS}
message = self._getTargetClass().from_api_repr(api_repr)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, ATTRS)
22 changes: 16 additions & 6 deletions gcloud/pubsub/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def test_modify_push_config_wo_endpoint(self):

def test_pull_wo_return_immediately_wo_max_messages(self):
import base64
from gcloud.pubsub.message import Message
PROJECT = 'PROJECT'
SUB_NAME = 'sub_name'
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
Expand All @@ -256,15 +257,19 @@ def test_pull_wo_return_immediately_wo_max_messages(self):
MSG_ID = 'BEADCAFE'
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD)
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}}
MESSAGE = {'messageId': MSG_ID, 'data': B64}
REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
conn = _Connection({'receivedMessages': [REC_MESSAGE]})
topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn)
subscription = self._makeOne(SUB_NAME, topic)
pulled = subscription.pull()
self.assertEqual(len(pulled), 1)
self.assertEqual(pulled[0]['ackId'], ACK_ID)
self.assertEqual(pulled[0]['message'], MESSAGE)
ack_id, message = pulled[0]
self.assertEqual(ack_id, ACK_ID)
self.assertTrue(isinstance(message, Message))
self.assertEqual(message.data, PAYLOAD)
self.assertEqual(message.message_id, MSG_ID)
self.assertEqual(message.attrs, {})
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
Expand All @@ -274,6 +279,7 @@ def test_pull_wo_return_immediately_wo_max_messages(self):

def test_pull_w_return_immediately_w_max_messages(self):
import base64
from gcloud.pubsub.message import Message
PROJECT = 'PROJECT'
SUB_NAME = 'sub_name'
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
Expand All @@ -282,15 +288,19 @@ def test_pull_w_return_immediately_w_max_messages(self):
MSG_ID = 'BEADCAFE'
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD)
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}}
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}}
REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
conn = _Connection({'receivedMessages': [REC_MESSAGE]})
topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn)
subscription = self._makeOne(SUB_NAME, topic)
pulled = subscription.pull(return_immediately=True, max_messages=3)
self.assertEqual(len(pulled), 1)
self.assertEqual(pulled[0]['ackId'], ACK_ID)
self.assertEqual(pulled[0]['message'], MESSAGE)
ack_id, message = pulled[0]
self.assertEqual(ack_id, ACK_ID)
self.assertTrue(isinstance(message, Message))
self.assertEqual(message.data, PAYLOAD)
self.assertEqual(message.message_id, MSG_ID)
self.assertEqual(message.attrs, {'a': 'b'})
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
Expand Down
12 changes: 6 additions & 6 deletions regression/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def test_list_subscriptions(self):
self.assertEqual(len(created), len(subscriptions_to_create))

def test_message_pull_mode_e2e(self):
from base64 import b64encode as b64
TOPIC_NAME = 'subscribe-me'
topic = Topic(TOPIC_NAME)
self.assertFalse(topic.exists())
Expand All @@ -115,12 +114,13 @@ def test_message_pull_mode_e2e(self):
self.to_delete.append(subscription)

MESSAGE = b'MESSAGE'
EXTRA = b'EXTRA TWO'
EXTRA = b'EXTRA'
topic.publish(MESSAGE, extra=EXTRA)

received = subscription.pull()
ack_ids = [msg['ackId'] for msg in received]
ack_ids = [recv[0] for recv in received]
subscription.acknowledge(ack_ids)
one, = received
self.assertEqual(one['message']['data'], b64(MESSAGE))
self.assertEqual(one['message']['attributes'], {'extra': EXTRA})
messages = [recv[1] for recv in received]
message, = messages
self.assertEqual(message.data, MESSAGE)
self.assertEqual(message.attributes, {'extra': EXTRA})

0 comments on commit 6a5f480

Please sign in to comment.