Skip to content

Commit

Permalink
Merge pull request #800 from tseaver/744-pubsub-message_class
Browse files Browse the repository at this point in the history
#744: Add 'pubsub.message.Message' class
  • Loading branch information
tseaver committed Apr 6, 2015
2 parents f3c3543 + 6a5f480 commit 9c7f1fd
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 71 deletions.
29 changes: 18 additions & 11 deletions docs/pubsub-surface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,7 @@ Delete a subscription:
Pull messages from a subscription
---------------------------------

Fetch pending messages for a pull subscription

.. note::

The messages will have been ACKed already.
Fetch pending messages for a pull subscription:

.. doctest::

Expand All @@ -215,14 +211,23 @@ Fetch pending messages for a pull subscription
... topic.publish('this is the first message_payload')
... topic.publish('this is the second message_payload',
... attr1='value1', attr2='value2')
>>> messages = subscription.pull() # API request
>>> received = subscription.pull() # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
[<message_id1>, <message_id2>]
>>> [message.data for message in messages]
['this is the first message_payload', 'this is the second message_payload']
>>> [message.attrs for message in messages]
>>> [message.attributes for message in messages]
[{}, {'attr1': 'value1', 'attr2': 'value2'}]

Note that received messages must be acknowledged, or else the back-end
will re-send them later:

.. doctest::

>>> ack_ids = [recv[0] for recv in received]
>>> subscription.acknowledge(ack_ids)

Fetch a limited number of pending messages for a pull subscription:

.. doctest::
Expand All @@ -235,8 +240,9 @@ Fetch a limited number of pending messages for a pull subscription:
... topic.publish('this is the first message_payload')
... topic.publish('this is the second message_payload',
... attr1='value1', attr2='value2')
>>> [message.id for message in subscription.pull(max_messages=1)]
[<message_id1>]
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]

Fetch messages for a pull subscription without blocking (none pending):

Expand All @@ -246,6 +252,7 @@ Fetch messages for a pull subscription without blocking (none pending):
>>> from gcloud.pubsub.subscription import Subscription
>>> topic = Topic('topic_name')
>>> subscription = Subscription('subscription_name', topic)
>>> [message.id for message in subscription.pull(return_immediately=True)]
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
[]

47 changes: 4 additions & 43 deletions gcloud/pubsub/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def list_topics(page_size=None, page_token=None,

path = '/projects/%s/topics' % project
resp = connection.api_request(method='GET', path=path, query_params=params)
topics = [_topic_from_resource(resource, connection)
topics = [Topic.from_api_repr(resource, connection)
for resource in resp['topics']]
return topics, resp.get('nextPageToken')

Expand Down Expand Up @@ -128,47 +128,8 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None,

resp = connection.api_request(method='GET', path=path, query_params=params)
topics = {}
subscriptions = [_subscription_from_resource(resource, topics, connection)
subscriptions = [Subscription.from_api_repr(resource,
connection=connection,
topics=topics)
for resource in resp['subscriptions']]
return subscriptions, resp.get('nextPageToken')


def _topic_from_resource(resource, connection):
"""Construct a topic given its full path-like name.
:type resource: dict
:param resource: topic resource representation returned from the API
:type connection: :class:`gcloud.pubsub.connection.Connection`
:param connection: connection to use for the topic.
:rtype: :class:`gcloud.pubsub.topic.Topic`
"""
_, project, _, name = resource['name'].split('/')
return Topic(name, project, connection)


def _subscription_from_resource(resource, topics, connection):
"""Construct a topic given its full path-like name.
:type resource: string
:param resource: subscription resource representation returned from the API
:type topics: dict, full_name -> :class:`gcloud.pubsub.topic.Topic`
:param topics: the topics to which subscriptions have been bound
:type connection: :class:`gcloud.pubsub.connection.Connection`
:param connection: connection to use for the topic.
:rtype: :class:`gcloud.pubsub.subscription.Subscription`
"""
t_name = resource['topic']
topic = topics.get(t_name)
if topic is None:
topic = topics[t_name] = _topic_from_resource({'name': t_name},
connection)
_, _, _, name = resource['name'].split('/')
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
push_endpoint = push_config.get('pushEndpoint')
return Subscription(name, topic, ack_deadline, push_endpoint)
56 changes: 56 additions & 0 deletions gcloud/pubsub/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Define API Topics."""

import base64


class Message(object):
"""Messages can be published to a topic and received by subscribers.
See:
https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage
:type name: bytes
:param name: the payload of the message
:type message_id: string
:param message_id: An ID assigned to the message by the API.
:type attrs: dict or None
:param attrs: Extra metadata associated by the publisher with the message.
"""
def __init__(self, data, message_id, attributes=None):
self.data = data
self.message_id = message_id
self._attrs = attributes

@property
def attrs(self):
"""Lazily-constructed attribute dictionary"""
if self._attrs is None:
self._attrs = {}
return self._attrs

@classmethod
def from_api_repr(cls, api_repr):
"""Factory: construct message from API representation.
:type api_repr: dict or None
:param api_repr: The API representation of the message
"""
data = base64.b64decode(api_repr['data'])
return cls(data=data, message_id=api_repr['messageId'],
attributes=api_repr.get('attributes'))
44 changes: 39 additions & 5 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"""Define API Subscriptions."""

from gcloud.exceptions import NotFound
from gcloud.pubsub.message import Message
from gcloud.pubsub.topic import Topic


class Subscription(object):
Expand Down Expand Up @@ -43,6 +45,37 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
self.ack_deadline = ack_deadline
self.push_endpoint = push_endpoint

@classmethod
def from_api_repr(cls, resource, connection=None, topics=None):
"""Factory: construct a topic given its API representation
:type resource: dict
:param resource: topic resource representation returned from the API
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
:param connection: the connection to use. If not passed,
falls back to the default inferred from the
environment.
:type topics: dict or None
:param topics: A mapping of topic names -> topics. If not passed,
the subscription will have a newly-created topic.
:rtype: :class:`gcloud.pubsub.subscription.Subscription`
"""
if topics is None:
topics = {}
t_name = resource['topic']
topic = topics.get(t_name)
if topic is None:
topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
connection)
_, _, _, name = resource['name'].split('/')
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
push_endpoint = push_config.get('pushEndpoint')
return cls(name, topic, ack_deadline, push_endpoint)

@property
def path(self):
"""URL path for the subscription's APIs"""
Expand Down Expand Up @@ -130,18 +163,19 @@ def pull(self, return_immediately=False, max_messages=1):
:type max_messages: int
:param max_messages: the maximum number of messages to return.
:rtype: list of dict
:returns: sequence of mappings, each containing keys ``ackId`` (the
ID to be used in a subsequent call to :meth:`acknowledge`)
and ``message``.
:rtype: list of (ack_id, message) tuples
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
subsequent call to :meth:`acknowledge`, and ``message``
is an instance of :class:`gcloud.pubsub.message.Message`.
"""
data = {'returnImmediately': return_immediately,
'maxMessages': max_messages}
conn = self.topic.connection
response = conn.api_request(method='POST',
path='%s:pull' % self.path,
data=data)
return response['receivedMessages']
return [(info['ackId'], Message.from_api_repr(info['message']))
for info in response['receivedMessages']]

def acknowledge(self, ack_ids):
"""API call: acknowledge retrieved messages for the subscription.
Expand Down
68 changes: 68 additions & 0 deletions gcloud/pubsub/test_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest2


class TestMessage(unittest2.TestCase):

def _getTargetClass(self):
from gcloud.pubsub.message import Message
return Message

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def test_ctor_no_attrs(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
message = self._makeOne(data=DATA, message_id=MESSAGE_ID)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, {})

def test_ctor_w_attrs(self):
DATA = b'DEADBEEF'
MESSAGE_ID = b'12345'
ATTRS = {'a': 'b'}
message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
attributes=ATTRS)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, ATTRS)

def test_from_api_repr_no_attrs(self):
from base64 import b64encode as b64
DATA = b'DEADBEEF'
B64_DATA = b64(DATA)
MESSAGE_ID = '12345'
api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID}
message = self._getTargetClass().from_api_repr(api_repr)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, {})

def test_from_api_repr_w_attrs(self):
from base64 import b64encode as b64
DATA = b'DEADBEEF'
B64_DATA = b64(DATA)
MESSAGE_ID = '12345'
ATTRS = {'a': 'b'}
api_repr = {'data': B64_DATA,
'messageId': MESSAGE_ID,
'attributes': ATTRS}
message = self._getTargetClass().from_api_repr(api_repr)
self.assertEqual(message.data, DATA)
self.assertEqual(message.message_id, MESSAGE_ID)
self.assertEqual(message.attrs, ATTRS)
Loading

0 comments on commit 9c7f1fd

Please sign in to comment.