Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing topic.subscription factory. #930

Merged
merged 1 commit into from
Jul 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions docs/pubsub-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Create a new pull subscription for a topic:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.create() # API request

Create a new pull subscription for a topic with a non-default ACK deadline:
Expand All @@ -131,8 +131,7 @@ Create a new pull subscription for a topic with a non-default ACK deadline:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic,
... ack_deadline=90)
>>> subscription = topic.subscription('subscription_name', ack_deadline=90)
>>> subscription.create() # API request

Create a new push subscription for a topic:
Expand All @@ -143,8 +142,8 @@ Create a new push subscription for a topic:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic,
... push_endpoint=ENDPOINT)
>>> subscription = topic.subscription('subscription_name',
... push_endpoint=ENDPOINT)
>>> subscription.create() # API request

Check for the existence of a subscription:
Expand All @@ -154,7 +153,7 @@ Check for the existence of a subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.exists() # API request
True

Expand All @@ -166,7 +165,7 @@ Convert a pull subscription to push:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.modify_push_configuration(push_endpoint=ENDPOINT) # API request

Convert a push subscription to pull:
Expand All @@ -177,8 +176,8 @@ Convert a push subscription to pull:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubusb.Subscription('subscription_name', topic,
... push_endpoint=ENDPOINT)
>>> subscription = topic.subscription('subscription_name',
... push_endpoint=ENDPOINT)
>>> subscription.modify_push_configuration(push_endpoint=None) # API request

List subscriptions for a topic:
Expand Down Expand Up @@ -209,7 +208,7 @@ Delete a subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.delete() # API request


Expand All @@ -223,7 +222,7 @@ Fetch pending messages for a pull subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
... batch.publish('this is the first message_payload')
... batch.publish('this is the second message_payload',
Expand Down Expand Up @@ -252,7 +251,7 @@ Fetch a limited number of pending messages for a pull subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
... batch.publish('this is the first message_payload')
... batch.publish('this is the second message_payload',
Expand All @@ -268,7 +267,7 @@ Fetch messages for a pull subscription without blocking (none pending):
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
Expand Down
45 changes: 45 additions & 0 deletions gcloud/pubsub/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper functions for shared behavior."""


def topic_name_from_path(path, project):
"""Validate a topic URI path and get the topic name.

:type path: string
:param path: URI path for a topic API request.

:type project: string
:param project: The project associated with the request. It is
included for validation purposes.

:rtype: string
:returns: Topic name parsed from ``path``.
:raises: :class:`ValueError` if the ``path`` is ill-formed or if
the project from the ``path`` does not agree with the
``project`` passed in.
"""
# PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
path_parts = path.split('/')
if (len(path_parts) != 4 or path_parts[0] != 'projects' or
path_parts[2] != 'topics'):
raise ValueError('Expected path to be of the form '
'projects/{project}/topics/{topic_name}')
if (len(path_parts) != 4 or path_parts[0] != 'projects' or
path_parts[2] != 'topics' or path_parts[1] != project):
raise ValueError('Project from client should agree with '
'project from resource.')

return path_parts[3]
12 changes: 7 additions & 5 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"""Define API Subscriptions."""

from gcloud.exceptions import NotFound
from gcloud.pubsub._helpers import topic_name_from_path
from gcloud.pubsub.message import Message
from gcloud.pubsub.topic import Topic


class Subscription(object):
Expand Down Expand Up @@ -65,11 +65,13 @@ def from_api_repr(cls, resource, client, topics=None):
"""
if topics is None:
topics = {}
t_name = resource['topic']
topic = topics.get(t_name)
topic_path = resource['topic']
topic = topics.get(topic_path)
if topic is None:
topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
client)
# NOTE: This duplicates behavior from Topic.from_api_repr to avoid
# an import cycle.
topic_name = topic_name_from_path(topic_path, client.project)
topic = topics[topic_path] = client.topic(topic_name)
_, _, _, name = resource['name'].split('/')
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
Expand Down
47 changes: 47 additions & 0 deletions gcloud/pubsub/test__helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest2


class Test_topic_name_from_path(unittest2.TestCase):

def _callFUT(self, path, project):
from gcloud.pubsub._helpers import topic_name_from_path
return topic_name_from_path(path, project)

def test_invalid_path_length(self):
PATH = 'projects/foo'
PROJECT = None
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT)

def test_invalid_path_format(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT = 'PROJECT'
PATH = 'foo/%s/bar/%s' % (PROJECT, TOPIC_NAME)
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT)

def test_invalid_project(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT1 = 'PROJECT1'
PROJECT2 = 'PROJECT2'
PATH = 'projects/%s/topics/%s' % (PROJECT1, TOPIC_NAME)
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT2)

def test_valid_data(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT = 'PROJECT'
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
topic_name = self._callFUT(PATH, PROJECT)
self.assertEqual(topic_name, TOPIC_NAME)
4 changes: 4 additions & 0 deletions gcloud/pubsub/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,7 @@ class _Client(object):
def __init__(self, project, connection=None):
self.project = project
self.connection = connection

def topic(self, name, timestamp_messages=False):
from gcloud.pubsub.topic import Topic
return Topic(name, client=self, timestamp_messages=timestamp_messages)
14 changes: 14 additions & 0 deletions gcloud/pubsub/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['method'], 'DELETE')
self.assertEqual(req['path'], '/%s' % PATH)

def test_subscription(self):
from gcloud.pubsub.subscription import Subscription
TOPIC_NAME = 'topic_name'
PROJECT = 'PROJECT'
CLIENT = _Client(project=PROJECT)
topic = self._makeOne(TOPIC_NAME,
client=CLIENT)

SUBSCRIPTION_NAME = 'subscription_name'
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertTrue(isinstance(subscription, Subscription))
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
self.assertTrue(subscription.topic is topic)


class TestBatch(unittest2.TestCase):

Expand Down
27 changes: 22 additions & 5 deletions gcloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from gcloud._helpers import _RFC3339_MICROS
from gcloud.exceptions import NotFound
from gcloud.pubsub._helpers import topic_name_from_path
from gcloud.pubsub.subscription import Subscription

_NOW = datetime.datetime.utcnow

Expand Down Expand Up @@ -48,6 +50,24 @@ def __init__(self, name, client, timestamp_messages=False):
self._client = client
self.timestamp_messages = timestamp_messages

def subscription(self, name, ack_deadline=None, push_endpoint=None):
"""Creates a subscription bound to the current topic.

:type name: string
:param name: the name of the subscription

:type ack_deadline: int
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.

:type push_endpoint: string
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.
"""
return Subscription(name, self, ack_deadline=ack_deadline,
push_endpoint=push_endpoint)

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a topic given its API representation
Expand All @@ -65,11 +85,8 @@ def from_api_repr(cls, resource, client):
project from the resource does not agree with the project
from the client.
"""
_, project, _, name = resource['name'].split('/')
if client.project != project:
raise ValueError('Project from clientshould agree with '
'project from resource.')
return cls(name, client=client)
topic_name = topic_name_from_path(resource['name'], client.project)
return cls(topic_name, client=client)

@property
def project(self):
Expand Down
7 changes: 3 additions & 4 deletions system_tests/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from gcloud import _helpers
from gcloud import pubsub
from gcloud.pubsub.subscription import Subscription


_helpers._PROJECT_ENV_VAR_NAME = 'GCLOUD_TESTS_PROJECT_ID'
Expand Down Expand Up @@ -68,7 +67,7 @@ def test_create_subscription(self):
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now'
subscription = Subscription(SUBSCRIPTION_NAME, topic)
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
Expand All @@ -88,7 +87,7 @@ def test_list_subscriptions(self):
'newest%d' % (1000 * time.time(),),
]
for subscription_name in subscriptions_to_create:
subscription = Subscription(subscription_name, topic)
subscription = topic.subscription(subscription_name)
subscription.create()
self.to_delete.append(subscription)

Expand All @@ -106,7 +105,7 @@ def test_message_pull_mode_e2e(self):
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now'
subscription = Subscription(SUBSCRIPTION_NAME, topic)
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
Expand Down