diff --git a/docs/_static/js/main.js b/docs/_static/js/main.js
index 11b8202481a0..8f95b772f38a 100755
--- a/docs/_static/js/main.js
+++ b/docs/_static/js/main.js
@@ -16,7 +16,7 @@ $('.headerlink').parent().each(function() {
$('.side-nav').children('ul:nth-child(2)').children().each(function() {
var itemName = $(this).text();
if (itemName !== 'Datastore' && itemName !== 'Storage' &&
- itemName !== 'Pub/Sub') {
+ itemName !== 'Pub/Sub' && itemName !== 'Search') {
$(this).css('padding-left','2em');
}
});
diff --git a/docs/index.rst b/docs/index.rst
index 0480c536f339..87d180d06624 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -10,6 +10,8 @@
datastore-transactions
datastore-batches
datastore-dataset
+ search-api
+ search-usage
storage-api
storage-blobs
storage-buckets
diff --git a/docs/search-api.rst b/docs/search-api.rst
new file mode 100644
index 000000000000..7459630a4cec
--- /dev/null
+++ b/docs/search-api.rst
@@ -0,0 +1,6 @@
+.. toctree::
+ :maxdepth: 1
+ :hidden:
+
+Search
+------
diff --git a/docs/search-usage.rst b/docs/search-usage.rst
new file mode 100644
index 000000000000..e87025e94636
--- /dev/null
+++ b/docs/search-usage.rst
@@ -0,0 +1,314 @@
+Using the API
+=============
+
+Connection / Authorization
+--------------------------
+
+Implicitly use the default client:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> # The search module has the same methods as a client, using the default.
+ >>> search.list_indexes() # API request
+ []
+
+Configure the default client:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> search.set_project_id('project-id')
+ >>> search.set_credentials(credentials)
+ >>> search.list_indexes() # API request
+ []
+
+Explicitly use the default client:
+
+.. doctest::
+
+ >>> from gcloud.search import default_client as client
+ >>> # The default_client is equivalent to search.Client()
+ >>> client.list_indexes() # API request
+ []
+
+Explicitly configure a client:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client(project_id='project-id', credentials=credentials)
+ >>> client.list_indexes() # API request
+ []
+
+Manage indexes for a project
+----------------------------
+
+Create a new index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.create_index('index_id') # API request
+ >>> index.id
+ 'index_id'
+
+Create a new index with a name:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.create_index('index_id', name='Name') # API request
+ >>> index.name
+ 'Name'
+
+Get or create an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_or_create_index('index_id') # API request
+ >>> index.id
+ 'index_id'
+
+List the indexes:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> [index.id for index in client.list_indexes()] # API request
+ ['index_id']
+
+Retrieve an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('missing_index_id') # API request
+ >>> index is None
+ True
+ >>> index = client.get_index('index_id') # API request
+ >>> index.id
+ 'index_id'
+
+Get an index without making an API request
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id', check=False)
+ >>> index.id
+ 'index_id'
+
+Update an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> index.name = 'Name'
+ >>> client.update_index(index)
+
+Delete an index by ID:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> client.delete_index('index_id') # API request
+
+Delete an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> index.id
+ 'index_id'
+ >>> client.delete_index(index) # API request
+
+Manage documents and fields
+---------------------------
+
+Create a document
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> document = search.Document('document_id', rank=0)
+ >>> document.id
+ 'document_id'
+
+Add a field to a document
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> document = search.Document('document_id')
+ >>> document.add_field(search.Field('fieldname'))
+
+Add values to a field
+
+.. doctest::
+
+ >>> from datetime import datetime
+ >>> from gcloud import search
+ >>> field = search.Field('fieldname')
+ >>> field.add_value('string')
+ >>> # Tokenization field ignored for non-string values.
+ >>> field.add_value('
string
', tokenization='html')
+ >>> field.add_value('string', tokenization='atom')
+ >>> field.add_value('string', tokenization='text')
+ >>> field.add_value(1234)
+ >>> field.add_value(datetime.now())
+ >>> len(field.values)
+ 9
+
+Add values to a field at initialization time
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> field = search.Field('fieldname', values=[
+ 'string',
+ search.Value('string2
', tokenization='html')
+ search.Value('string', tokenization='atom')])
+
+Add a single document to an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> document = search.Document('document_id', rank=0)
+ >>> index.add_document(document) # API request
+
+Add multiple documents to an index:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> documents = [search.Document('document_id')]
+ >>> index.add_documents(documents) # API request
+
+Get a single document by ID:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> document = index.get_document('missing_document_id') # API request
+ >>> document is None
+ True
+ >>> document = index.get_document('document_id') # API request
+ >>> document.fields
+ []
+
+Delete a document by ID:
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> index.delete_document('document_id') # API request
+ >>> index.delete_document('missing_document_id') # API request
+
+Searching
+---------
+
+Create a query
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> query = search.Query('query text here')
+ >>> query.query
+ 'query text here'
+
+Specify the fields to return in a query
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> query = search.Query('query text here', fields=['field1', 'field2'])
+ >>> query.fields
+ ['field1', 'field2']
+
+Set the sort order of a query
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> query = search.Query('query text here', order_by='field1')
+ >>> query.order_by
+ 'field1'
+ >>> query2 = search.Query('query text here', order_by=['field2', 'field3'])
+ >>> query2.order_by
+ ['field2', 'field3']
+ >>> # Order descending by field1 and ascending by field2
+ >>> query4 = search.Query('query text here', order_by=['-field1', 'field2'])
+ >>> query3.order_by
+ ['-field1', 'field2']
+
+Set custom field expressions on a query
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> query = search.Query('query text here')
+ >>> query.add_field_expression('total_price', '(price + tax)')
+ >>> # We don't do any checks on the expression. These are checked at query time.
+ >>> query.add_field_expression('invalid', 'is_prime(num)')
+ >>> query.add_field_expression('bad_field', '(missing_field + tax)')
+
+Set custom field expressions at initialization time
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> query = search.Query('query text here', field_expressions={
+ 'total_price': '(price + tax)'})
+
+Search an index
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> matching = index.search(search.Query('query text here')) # API request
+ >>> for document in matching:
+ ... print document.id
+
+Search an index with a limit on number of results
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> matching = index.search(search.Query('query text here'),
+ ... limit=42) # API request
+
+Search an index with a custom page size (advanced)
+
+.. doctest::
+
+ >>> from gcloud import search
+ >>> client = search.Client()
+ >>> index = client.get_index('index_id') # API request
+ >>> matching = index.search(search.Query('query text here'),
+ ... page_size=20) # API request
diff --git a/gcloud/search/__init__.py b/gcloud/search/__init__.py
new file mode 100644
index 000000000000..4616e6075b6c
--- /dev/null
+++ b/gcloud/search/__init__.py
@@ -0,0 +1,27 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Search API wrapper.
+
+The main concepts with this API are:
+
+- :class:`gcloud.pubsub.topic.Topic` represents an endpoint to which messages
+ can be published using the Cloud Storage Pubsub API.
+
+- :class:`gcloud.pubsub.subscription.Subscription` represents a named
+ subscription (either pull or push) to a topic.
+"""
+
+from gcloud.search.client import Client
+from gcloud.search.connection import SCOPE
diff --git a/gcloud/search/client.py b/gcloud/search/client.py
new file mode 100644
index 000000000000..a42f0cc4d322
--- /dev/null
+++ b/gcloud/search/client.py
@@ -0,0 +1,225 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""gcloud pubsub client for interacting with API."""
+
+
+from gcloud._helpers import _get_production_project
+from gcloud.credentials import get_credentials
+from gcloud.credentials import get_for_service_account_json
+from gcloud.credentials import get_for_service_account_p12
+from gcloud.pubsub.connection import Connection
+from gcloud.pubsub.subscription import Subscription
+from gcloud.pubsub.topic import Topic
+
+
+class Client(object):
+ """Client to bundle configuration needed for API requests.
+
+ :type project: string
+ :param project: the project which the client acts on behalf of. Will be
+ passed when creating a topic. If not passed,
+ falls back to the default inferred from the environment.
+
+ :type credentials: :class:`oauth2client.client.OAuth2Credentials` or
+ :class:`NoneType`
+ :param credentials: The OAuth2 Credentials to use for the connection
+ owned by this client. If not passed (and if no ``http``
+ object is passed), falls back to the default inferred
+ from the environment.
+
+ :type http: :class:`httplib2.Http` or class that defines ``request()``.
+ :param http: An optional HTTP object to make requests. If not passed, an
+ ``http`` object is created that is bound to the
+ ``credentials`` for the current object.
+
+ :raises: :class:`ValueError` if the project is neither passed in nor
+ set in the environment.
+ """
+ def __init__(self, project=None, credentials=None, http=None):
+ self._connection = None
+ self._credentials = None
+ self._http = None
+ self._project = project
+
+ @property
+ def project(self):
+ if self._project is None:
+ self._project = _get_production_project()
+ if self._project is None:
+ raise ValueError('Project was not passed and could not be '
+ 'determined from the environment.')
+ return self._project
+
+ @property
+ def connection(self):
+ if self._connection is None:
+ self._connection = self._create_connection(
+ credentials=self._credentials, http=self._http)
+ return self._connection
+
+ @staticmethod
+ def _create_connection(credentials=None, http=None):
+ return Connection(credentials=credentials or get_credentials(),
+ http=http)
+
+ @classmethod
+ def from_service_account_json(cls, json_credentials_path, project=None):
+ """Factory to retrieve JSON credentials while creating client.
+
+ :type json_credentials_path: string
+ :param json_credentials_path: The path to a private key file (this file
+ was given to you when you created the
+ service account). This file must contain
+ a JSON object with a private key and
+ other credentials information (downloaded
+ from the Google APIs console).
+
+ :type project: string
+ :param project: the project which the client acts on behalf of. Will be
+ passed when creating a topic. If not passed, falls
+ back to the default inferred from the environment.
+
+ :rtype: :class:`gcloud.pubsub.client.Client`
+ :returns: The client created with the retrieved JSON credentials.
+ """
+ credentials = get_for_service_account_json(json_credentials_path)
+ return cls(project=project, credentials=credentials)
+
+ @classmethod
+ def from_service_account_p12(cls, client_email, private_key_path,
+ project=None):
+ """Factory to retrieve P12 credentials while creating client.
+
+ .. note::
+ Unless you have an explicit reason to use a PKCS12 key for your
+ service account, we recommend using a JSON key.
+
+ :type client_email: string
+ :param client_email: The e-mail attached to the service account.
+
+ :type private_key_path: string
+ :param private_key_path: The path to a private key file (this file was
+ given to you when you created the service
+ account). This file must be in P12 format.
+
+ :type project: string
+ :param project: the project which the client acts on behalf of. Will be
+ passed when creating a topic. If not passed, falls
+ back to the default inferred from the environment.
+
+ :rtype: :class:`gcloud.pubsub.client.Client`
+ :returns: The client created with the retrieved P12 credentials.
+ """
+ credentials = get_for_service_account_p12(client_email,
+ private_key_path)
+ return cls(project=project, credentials=credentials)
+
+ def list_indexes(self, page_size=None, page_token=None):
+ """List topics for the project associated with this client.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
+
+ :type page_size: int
+ :param page_size: maximum number of topics to return, If not passed,
+ defaults to a value set by the API.
+
+ :type page_token: string
+ :param page_token: opaque marker for the next "page" of topics. If not
+ passed, the API will return the first page of
+ topics.
+
+ :rtype: tuple, (list, str)
+ :returns: list of :class:`gcloud.pubsub.topic.Topic`, plus a
+ "next page token" string: if not None, indicates that
+ more topics can be retrieved with another call (pass that
+ value as ``page_token``).
+ """
+ params = {}
+
+ if page_size is not None:
+ params['pageSize'] = page_size
+
+ if page_token is not None:
+ params['pageToken'] = page_token
+
+ path = '/projects/%s/topics' % (self.project,)
+
+ return IndexIterator(client=self, extra_params=params, path=path)
+
+ def list_subscriptions(self, page_size=None, page_token=None,
+ topic_name=None):
+ """List subscriptions for the project associated with this client.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
+
+ and (where ``topic_name`` is passed):
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list
+
+ :type page_size: int
+ :param page_size: maximum number of topics to return, If not passed,
+ defaults to a value set by the API.
+
+ :type page_token: string
+ :param page_token: opaque marker for the next "page" of topics. If not
+ passed, the API will return the first page of
+ topics.
+
+ :type topic_name: string
+ :param topic_name: limit results to subscriptions bound to the given
+ topic.
+
+ :rtype: tuple, (list, str)
+ :returns: list of :class:`gcloud.pubsub.subscription.Subscription`,
+ plus a "next page token" string: if not None, indicates that
+ more topics can be retrieved with another call (pass that
+ value as ``page_token``).
+ """
+ params = {}
+
+ if page_size is not None:
+ params['pageSize'] = page_size
+
+ if page_token is not None:
+ params['pageToken'] = page_token
+
+ if topic_name is None:
+ path = '/projects/%s/subscriptions' % (self.project,)
+ else:
+ path = '/projects/%s/topics/%s/subscriptions' % (self.project,
+ topic_name)
+
+ resp = self.connection.api_request(method='GET', path=path,
+ query_params=params)
+ topics = {}
+ subscriptions = [Subscription.from_api_repr(resource, self,
+ topics=topics)
+ for resource in resp['subscriptions']]
+ return subscriptions, resp.get('nextPageToken')
+
+ def topic(self, name, timestamp_messages=False):
+ """Creates a topic bound to the current client.
+
+ :type name: string
+ :param name: the name of the topic to be constructed.
+
+ :type timestamp_messages: boolean
+ :param timestamp_messages: To be passed to ``Topic`` constructor.
+
+ :rtype: :class:`gcloud.pubsub.topic.Topic`
+ :returns: Topic created with the current client.
+ """
+ return Topic(name, client=self, timestamp_messages=timestamp_messages)
diff --git a/gcloud/search/connection.py b/gcloud/search/connection.py
new file mode 100644
index 000000000000..d73ed9cfeebd
--- /dev/null
+++ b/gcloud/search/connection.py
@@ -0,0 +1,39 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Create / interact with gcloud pubsub connections."""
+
+from gcloud import connection as base_connection
+
+
+SCOPE = ('https://www.googleapis.com/auth/pubsub',
+ 'https://www.googleapis.com/auth/cloud-platform')
+"""The scopes required for authenticating as a Cloud Pub/Sub consumer."""
+
+
+class Connection(base_connection.JSONConnection):
+ """A connection to Google Cloud Pubsub via the JSON REST API."""
+
+ API_BASE_URL = 'https://pubsub.googleapis.com'
+ """The base of the API call URL."""
+
+ API_VERSION = 'v1beta2'
+ """The version of the API, used in building the API call's URL."""
+
+ API_URL_TEMPLATE = '{api_base_url}/{api_version}{path}'
+ """A template for the URL of a particular API call."""
+
+ def __init__(self, credentials=None, http=None):
+ credentials = self._create_scoped_credentials(credentials, SCOPE)
+ super(Connection, self).__init__(credentials=credentials, http=http)
diff --git a/gcloud/search/index.py b/gcloud/search/index.py
new file mode 100644
index 000000000000..a05dc1ffa87e
--- /dev/null
+++ b/gcloud/search/index.py
@@ -0,0 +1,323 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define Projects."""
+
+from gcloud.exceptions import MissingClientError
+from gcloud.exceptions import NotFound
+
+
+class Project(object):
+ """Projects are containers for your work on Google Cloud Platform.
+
+ .. note::
+
+ It's unlikely that you'd need to instantiate this outside the context
+ of a :class:`.client.Client`, so in general, it's best to get a Project
+ from a Resource Manager client.
+
+ To create a new project::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.project('purple-spaceship-123')
+ >>> project.name = 'Purple Spaceship Project!'
+ >>> project.create()
+
+ To get an existing project::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> print project.name
+ Purple Spaceship Project!
+
+ To manage labels::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> project.labels = {'color': 'purple'}
+ >>> project.labels['environment'] = 'production'
+ >>> project.update()
+
+ See:
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects
+
+ :type id: string
+ :param id: the globally unique id of the project
+ """
+ def __init__(self, client, id):
+ self._client = client
+ self.id = id
+ self.name = None
+ self.number = None
+ self.labels = {}
+ self.status = None
+
+ def __repr__(self):
+ if self.name:
+ display_name = '%s (%s)' % (self.name, self.id)
+ else:
+ display_name = self.id
+ return '' % display_name
+
+ @classmethod
+ def from_api_repr(cls, resource, client):
+ """Factory: construct a project given its API representation.
+
+ :type resource: dict
+ :param resource: project resource representation returned from the API
+
+ :rtype: :class:`gcloud.resource_manager.project.Project`
+ """
+ project = cls(id=resource['projectId'], client=client)
+ project._set_properties_from_api_repr(resource)
+ return project
+
+ def _set_properties_from_api_repr(self, resource):
+ """Update specific properties from its API representation."""
+ self.name = resource.get('name')
+ self.number = resource['projectNumber']
+ self.labels = resource.get('labels', {})
+ self.status = resource['lifecycleState']
+
+ @property
+ def full_name(self):
+ """Fully-qualified name (ie, ``'projects/purple-spaceship-123'``)."""
+ if not self.id:
+ raise ValueError('Missing project ID!')
+ return 'projects/%s' % (self.id)
+
+ @property
+ def path(self):
+ """URL for the project (ie, ``'/projects/purple-spaceship-123'``)."""
+ return '/%s' % (self.full_name)
+
+ @property
+ def client(self):
+ """The :class:`gcloud.resource_manager.client.Client` object."""
+ return self._client
+
+ def _require_client(self, client=None):
+ """Get either a client or raise an exception.
+
+ We need to use this as the various methods could accept a client as a
+ parameter, which we need to evaluate. If the client provided is empty
+ and there is no client set as an instance variable, we'll raise a
+ :class:`gcloud.exceptions.MissingClientError`.
+
+ :type client: :class:`gcloud.resource_manager.client.Client`
+ :param client: An optional client to test for existence.
+ """
+ client = client or self.client
+ if not client:
+ raise MissingClientError()
+ return client
+
+ def create(self, client=None):
+ """API call: create the project via a ``POST`` request.
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.project('new-spaceship-123')
+ >>> project.name = 'New Spaceship Project!'
+ >>> project.create()
+
+ See
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/create
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed, falls back to
+ the ``client`` attribute.
+ """
+ client = client or self.client
+ data = {'projectId': self.id, 'name': self.name, 'labels': self.labels}
+ resp = client.connection.api_request(method='POST', path='/projects',
+ data=data)
+ self._set_properties_from_api_repr(resource=resp)
+
+
+ def reload(self, client=None):
+ """API call: reload the project via a ``GET`` request.
+
+ This method will reload the newest metadata for the project.
+
+ .. warning::
+
+ This will overwrite any local changes you've made and not saved!
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> project.name = 'Locally changed name'
+ >>> print project
+
+ >>> project.reload()
+ >>> print project
+
+
+ See
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/get
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed, falls back to
+ the ``client`` attribute.
+ """
+ client = self._require_client(client=client)
+
+ # We assume the project exists. If it doesn't it will raise a NotFound
+ # exception.
+ resp = client.connection.api_request(method='GET', path=self.path)
+ self._set_properties_from_api_repr(resource=resp)
+
+ def update(self, client=None):
+ """API call: update the project via a ``PUT`` request.
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> project.name = 'New Purple Spaceship'
+ >>> project.labels['environment'] = 'prod'
+ >>> project.update()
+
+ See
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/update
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed, falls back to
+ the ``client`` attribute.
+ """
+ client = self._require_client(client=client)
+
+ data = {'name': self.name, 'labels': self.labels}
+ resp = client.connection.api_request(method='PUT', path=self.path,
+ data=data)
+ self._set_properties_from_api_repr(resp)
+
+ def exists(self, client=None):
+ """API call: test the existence of a project via a ``GET`` request.
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.project('purple-spaceship-456')
+ >>> project.exists()
+ False
+
+ You can also use the
+ :func:`gcloud.resource_manager.client.Client.get_project`
+ method to check whether a project exists, as it will return ``None``
+ if the project doesn't exist::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> print client.get_project('purple-spaceship-456')
+ None
+
+ See
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/projects/get
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed, falls back to
+ the ``client`` attribute.
+ """
+ client = self._require_client(client=client)
+
+ try:
+ client.connection.api_request(method='GET', path=self.path)
+ except NotFound:
+ return False
+ else:
+ return True
+
+ def delete(self, client=None, reload=True):
+ """API call: delete the project via a ``DELETE`` request.
+
+ See:
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/delete
+
+ This actually changes the status (``lifecycleState``) from ``ACTIVE``
+ to ``DELETE_REQUESTED``.
+ Later (it's not specified when), the project will move into the
+ ``DELETE_IN_PROGRESS`` state, which means the deleting has actually
+ begun.
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> project.delete()
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed,
+ falls back to the ``client`` attribute.
+
+ :type reload: bool
+ :param reload: Whether to reload the project with the latest state.
+ Default: ``True``.
+ """
+ client = self._require_client(client)
+ client.connection.api_request(method='DELETE', path=self.path)
+
+ # If the reload flag is True, reload the project.
+ if reload:
+ self.reload()
+
+ def undelete(self, client=None, reload=True):
+ """API call: undelete the project via a ``POST`` request.
+
+ See
+ https://cloud.google.com/resource-manager/reference/rest/v1beta1/projects/undelete
+
+ This actually changes the project status (``lifecycleState``) from
+ ``DELETE_REQUESTED`` to ``ACTIVE``.
+ If the project has already reached a status of ``DELETE_IN_PROGRESS`,
+ this request will fail and the project cannot be restored.
+
+ Example::
+
+ >>> from gcloud import resource_manager
+ >>> client = resource_manager.Client()
+ >>> project = client.get_project('purple-spaceship-123')
+ >>> project.delete()
+ >>> print project.status
+ DELETE_REQUESTED
+ >>> project.undelete()
+ >>> print project.status
+ ACTIVE
+
+ :type client: :class:`gcloud.resource_manager.client.Client` or None
+ :param client: the client to use. If not passed,
+ falls back to the ``client`` attribute.
+
+ :type reload: bool
+ :param reload: Whether to reload the project with the latest state.
+ Default: ``True``.
+ """
+ client = self._require_client(client)
+ client.connection.api_request(method='POST',
+ path=self.path + ':undelete')
+
+ # If the reload flag is True, reload the project.
+ if reload:
+ self.reload()
diff --git a/gcloud/search/iterator.py b/gcloud/search/iterator.py
new file mode 100644
index 000000000000..43cff3b83422
--- /dev/null
+++ b/gcloud/search/iterator.py
@@ -0,0 +1,156 @@
+# Copyright 2014 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Iterators for paging through API responses.
+
+These iterators simplify the process of paging through API responses
+where the response is a list of results with a ``nextPageToken``.
+
+To make an iterator work, just override the ``get_items_from_response``
+method so that given a response (containing a page of results) it parses
+those results into an iterable of the actual objects you want::
+
+ class MyIterator(Iterator):
+ def get_items_from_response(self, response):
+ items = response.get('items', [])
+ for item in items:
+ my_item = MyItemClass(other_arg=True)
+ my_item._set_properties(item)
+ yield my_item
+
+You then can use this to get **all** the results from a resource::
+
+ >>> iterator = MyIterator(...)
+ >>> list(iterator) # Convert to a list (consumes all values).
+
+Or you can walk your way through items and call off the search early if
+you find what you're looking for (resulting in possibly fewer
+requests)::
+
+ >>> for item in MyIterator(...):
+ >>> print item.name
+ >>> if not item.is_valid:
+ >>> break
+"""
+
+from gcloud.resource_manager.project import Project
+
+
+class Iterator(object):
+ """A generic class for iterating through Cloud list responses.
+
+ :type client: :class:`gcloud.resource_manager.client.Client`
+ :param client: The client to use to make requests.
+
+ :type path: string
+ :param path: The path to query for the list of items.
+ """
+
+ PAGE_TOKEN = 'pageToken'
+ RESERVED_PARAMS = frozenset([PAGE_TOKEN])
+
+ def __init__(self, client, path, extra_params=None):
+ self.client = client
+ self.path = path
+ self.page_number = 0
+ self.next_page_token = None
+ self.extra_params = extra_params or {}
+ reserved_in_use = self.RESERVED_PARAMS.intersection(
+ self.extra_params)
+ if reserved_in_use:
+ raise ValueError(('Using a reserved parameter',
+ reserved_in_use))
+
+ def __iter__(self):
+ """Iterate through the list of items."""
+ while self.has_next_page():
+ response = self.get_next_page_response()
+ for item in self.get_items_from_response(response):
+ yield item
+
+ def has_next_page(self):
+ """Determines whether or not this iterator has more pages.
+
+ :rtype: boolean
+ :returns: Whether the iterator has more pages or not.
+ """
+ if self.page_number == 0:
+ return True
+
+ return self.next_page_token is not None
+
+ def get_query_params(self):
+ """Getter for query parameters for the next request.
+
+ :rtype: dict
+ :returns: A dictionary of query parameters.
+ """
+ result = ({self.PAGE_TOKEN: self.next_page_token}
+ if self.next_page_token else {})
+ result.update(self.extra_params)
+ return result
+
+ def get_next_page_response(self):
+ """Requests the next page from the path provided.
+
+ :rtype: dict
+ :returns: The parsed JSON response of the next page's contents.
+ """
+ if not self.has_next_page():
+ raise RuntimeError('No more pages. Try resetting the iterator.')
+
+ response = self.client.connection.api_request(
+ method='GET', path=self.path, query_params=self.get_query_params())
+
+ self.page_number += 1
+ self.next_page_token = response.get('nextPageToken')
+
+ return response
+
+ def reset(self):
+ """Resets the iterator to the beginning."""
+ self.page_number = 0
+ self.next_page_token = None
+
+ def get_items_from_response(self, response):
+ """Factory method called while iterating. This should be overriden.
+
+ This method should be overridden by a subclass. It should
+ accept the API response of a request for the next page of items,
+ and return a list (or other iterable) of items.
+
+ Typically this method will construct a Bucket or a Blob from the
+ page of results in the response.
+
+ :type response: dict
+ :param response: The response of asking for the next page of items.
+
+ :rtype: iterable
+ :returns: Items that the iterator should yield.
+ """
+ raise NotImplementedError
+
+
+class IndexIterator(Iterator):
+ """An iterator over a list of Project resources."""
+
+ def get_items_from_response(self, response):
+ """Yield :class:`gcloud.search.index.Index` items from response.
+
+ :type response: dict
+ :param response: The JSON API response for a page of indexes.
+ """
+ for resource in response.get('projects', []):
+ item = Index.from_api_repr(resource, client=self.client)
+ yield item
diff --git a/gcloud/search/message.py b/gcloud/search/message.py
new file mode 100644
index 000000000000..b01c0e9c29b4
--- /dev/null
+++ b/gcloud/search/message.py
@@ -0,0 +1,79 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define API Topics."""
+
+import base64
+import datetime
+
+import pytz
+
+from gcloud._helpers import _RFC3339_MICROS
+
+
+class Message(object):
+ """Messages can be published to a topic and received by subscribers.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage
+
+ :type name: bytes
+ :param name: the payload of the message
+
+ :type message_id: string
+ :param message_id: An ID assigned to the message by the API.
+
+ :type attrs: dict or None
+ :param attrs: Extra metadata associated by the publisher with the message.
+ """
+ def __init__(self, data, message_id, attributes=None):
+ self.data = data
+ self.message_id = message_id
+ self._attributes = attributes
+
+ @property
+ def attributes(self):
+ """Lazily-constructed attribute dictionary"""
+ if self._attributes is None:
+ self._attributes = {}
+ return self._attributes
+
+ @property
+ def timestamp(self):
+ """Return sortable timestamp from attributes, if passed.
+
+ Allows sorting messages in publication order (assuming consistent
+ clocks across all publishers).
+
+ :rtype: datetime
+ :returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp
+ :raises: ValueError if timestamp not in ``attributes``, or if it does
+ not match the RFC 3339 format.
+ """
+ stamp = self.attributes.get('timestamp')
+ if stamp is None:
+ raise ValueError('No timestamp')
+ return datetime.datetime.strptime(stamp, _RFC3339_MICROS).replace(
+ tzinfo=pytz.UTC)
+
+ @classmethod
+ def from_api_repr(cls, api_repr):
+ """Factory: construct message from API representation.
+
+ :type api_repr: dict or None
+ :param api_repr: The API representation of the message
+ """
+ data = base64.b64decode(api_repr['data'])
+ return cls(data=data, message_id=api_repr['messageId'],
+ attributes=api_repr.get('attributes'))
diff --git a/gcloud/search/subscription.py b/gcloud/search/subscription.py
new file mode 100644
index 000000000000..cb3023a286d7
--- /dev/null
+++ b/gcloud/search/subscription.py
@@ -0,0 +1,263 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define API Subscriptions."""
+
+from gcloud.exceptions import NotFound
+from gcloud.pubsub.message import Message
+from gcloud.pubsub.topic import Topic
+
+
+class Subscription(object):
+ """Subscriptions receive messages published to their topics.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions
+
+ :type name: string
+ :param name: the name of the subscription
+
+ :type topic: :class:`gcloud.pubsub.topic.Topic`
+ :param topic: the topic to which the subscription belongs..
+
+ :type ack_deadline: int
+ :param ack_deadline: the deadline (in seconds) by which messages pulled
+ from the back-end must be acknowledged.
+
+ :type push_endpoint: string
+ :param push_endpoint: URL to which messages will be pushed by the back-end.
+ If not set, the application must pull messages.
+ """
+ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
+ self.name = name
+ self.topic = topic
+ self.ack_deadline = ack_deadline
+ self.push_endpoint = push_endpoint
+
+ @classmethod
+ def from_api_repr(cls, resource, client, topics=None):
+ """Factory: construct a topic given its API representation
+
+ :type resource: dict
+ :param resource: topic resource representation returned from the API
+
+ :type client: :class:`gcloud.pubsub.client.Client`
+ :param client: Client which holds credentials and project
+ configuration for a topic.
+
+ :type topics: dict or None
+ :param topics: A mapping of topic names -> topics. If not passed,
+ the subscription will have a newly-created topic.
+
+ :rtype: :class:`gcloud.pubsub.subscription.Subscription`
+ :returns: Subscription parsed from ``resource``.
+ """
+ if topics is None:
+ topics = {}
+ t_name = resource['topic']
+ topic = topics.get(t_name)
+ if topic is None:
+ topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
+ client)
+ _, _, _, name = resource['name'].split('/')
+ ack_deadline = resource.get('ackDeadlineSeconds')
+ push_config = resource.get('pushConfig', {})
+ push_endpoint = push_config.get('pushEndpoint')
+ return cls(name, topic, ack_deadline, push_endpoint)
+
+ @property
+ def path(self):
+ """URL path for the subscription's APIs"""
+ project = self.topic.project
+ return '/projects/%s/subscriptions/%s' % (project, self.name)
+
+ def _require_client(self, client):
+ """Check client or verify over-ride.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the topic of the
+ current subscription.
+
+ :rtype: :class:`gcloud.pubsub.client.Client`
+ :returns: The client passed in or the currently bound client.
+ """
+ if client is None:
+ client = self.topic._client
+ return client
+
+ def create(self, client=None):
+ """API call: create the subscription via a PUT request
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ data = {'topic': self.topic.full_name}
+
+ if self.ack_deadline is not None:
+ data['ackDeadline'] = self.ack_deadline
+
+ if self.push_endpoint is not None:
+ data['pushConfig'] = {'pushEndpoint': self.push_endpoint}
+
+ client = self._require_client(client)
+ client.connection.api_request(method='PUT', path=self.path, data=data)
+
+ def exists(self, client=None):
+ """API call: test existence of the subscription via a GET request
+
+ See
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ try:
+ client.connection.api_request(method='GET', path=self.path)
+ except NotFound:
+ return False
+ else:
+ return True
+
+ def reload(self, client=None):
+ """API call: sync local subscription configuration via a GET request
+
+ See
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ data = client.connection.api_request(method='GET', path=self.path)
+ self.ack_deadline = data.get('ackDeadline')
+ push_config = data.get('pushConfig', {})
+ self.push_endpoint = push_config.get('pushEndpoint')
+
+ def modify_push_configuration(self, push_endpoint, client=None):
+ """API call: update the push endpoint for the subscription.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/modifyPushConfig
+
+ :type push_endpoint: string
+ :param push_endpoint: URL to which messages will be pushed by the
+ back-end. If None, the application must pull
+ messages.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ data = {}
+ config = data['pushConfig'] = {}
+ if push_endpoint is not None:
+ config['pushEndpoint'] = push_endpoint
+ client.connection.api_request(
+ method='POST', path='%s:modifyPushConfig' % (self.path,),
+ data=data)
+ self.push_endpoint = push_endpoint
+
+ def pull(self, return_immediately=False, max_messages=1, client=None):
+ """API call: retrieve messages for the subscription.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/pull
+
+ :type return_immediately: boolean
+ :param return_immediately: if True, the back-end returns even if no
+ messages are available; if False, the API
+ call blocks until one or more messages are
+ available.
+
+ :type max_messages: int
+ :param max_messages: the maximum number of messages to return.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+
+ :rtype: list of (ack_id, message) tuples
+ :returns: sequence of tuples: ``ack_id`` is the ID to be used in a
+ subsequent call to :meth:`acknowledge`, and ``message``
+ is an instance of :class:`gcloud.pubsub.message.Message`.
+ """
+ client = self._require_client(client)
+ data = {'returnImmediately': return_immediately,
+ 'maxMessages': max_messages}
+ response = client.connection.api_request(
+ method='POST', path='%s:pull' % (self.path,), data=data)
+ return [(info['ackId'], Message.from_api_repr(info['message']))
+ for info in response.get('receivedMessages', ())]
+
+ def acknowledge(self, ack_ids, client=None):
+ """API call: acknowledge retrieved messages for the subscription.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
+
+ :type ack_ids: list of string
+ :param ack_ids: ack IDs of messages being acknowledged
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ data = {'ackIds': ack_ids}
+ client.connection.api_request(
+ method='POST', path='%s:acknowledge' % (self.path,), data=data)
+
+ def modify_ack_deadline(self, ack_id, ack_deadline, client=None):
+ """API call: update acknowledgement deadline for a retrieved message.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
+
+ :type ack_id: string
+ :param ack_id: ack ID of message being updated
+
+ :type ack_deadline: int
+ :param ack_deadline: new deadline for the message, in seconds
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
+ client.connection.api_request(
+ method='POST', path='%s:modifyAckDeadline' % (self.path,),
+ data=data)
+
+ def delete(self, client=None):
+ """API call: delete the subscription via a DELETE request.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current subscription's topic.
+ """
+ client = self._require_client(client)
+ client.connection.api_request(method='DELETE', path=self.path)
diff --git a/gcloud/search/test_client.py b/gcloud/search/test_client.py
new file mode 100644
index 000000000000..f0f1d30a81be
--- /dev/null
+++ b/gcloud/search/test_client.py
@@ -0,0 +1,342 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest2
+
+
+class TestClient(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.client import Client
+ return Client
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_ctor_defaults(self):
+ from gcloud._testing import _Monkey
+ from gcloud.pubsub import SCOPE
+ from gcloud.pubsub import client
+ from gcloud.pubsub.connection import Connection
+
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+ FUNC_CALLS = []
+
+ def mock_get_proj():
+ FUNC_CALLS.append('_get_production_project')
+ return PROJECT
+
+ def mock_get_credentials():
+ FUNC_CALLS.append('get_credentials')
+ return CREDS
+
+ with _Monkey(client, get_credentials=mock_get_credentials,
+ _get_production_project=mock_get_proj):
+ client_obj = self._makeOne()
+
+ self.assertEqual(client_obj.project, PROJECT)
+ self.assertTrue(isinstance(client_obj.connection, Connection))
+ self.assertTrue(client_obj.connection._credentials is CREDS)
+ self.assertEqual(client_obj.connection._credentials._scopes, SCOPE)
+ self.assertEqual(FUNC_CALLS,
+ ['_get_production_project', 'get_credentials'])
+
+ def test_ctor_missing_project(self):
+ from gcloud._testing import _Monkey
+ from gcloud.pubsub import client
+
+ FUNC_CALLS = []
+
+ def mock_get_proj():
+ FUNC_CALLS.append('_get_production_project')
+ return None
+
+ with _Monkey(client, _get_production_project=mock_get_proj):
+ self.assertRaises(ValueError, self._makeOne)
+
+ self.assertEqual(FUNC_CALLS, ['_get_production_project'])
+
+ def test_ctor_explicit(self):
+ from gcloud.pubsub import SCOPE
+ from gcloud.pubsub.connection import Connection
+
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ client_obj = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ self.assertEqual(client_obj.project, PROJECT)
+ self.assertTrue(isinstance(client_obj.connection, Connection))
+ self.assertTrue(client_obj.connection._credentials is CREDS)
+ self.assertEqual(CREDS._scopes, SCOPE)
+
+ def test_from_service_account_json(self):
+ from gcloud._testing import _Monkey
+ from gcloud.pubsub import client
+ from gcloud.pubsub.connection import Connection
+
+ PROJECT = 'PROJECT'
+ KLASS = self._getTargetClass()
+ CREDS = _Credentials()
+ _CALLED = []
+
+ def mock_creds(arg1):
+ _CALLED.append((arg1,))
+ return CREDS
+
+ BOGUS_ARG = object()
+ with _Monkey(client, get_for_service_account_json=mock_creds):
+ client_obj = KLASS.from_service_account_json(
+ BOGUS_ARG, project=PROJECT)
+
+ self.assertEqual(client_obj.project, PROJECT)
+ self.assertTrue(isinstance(client_obj.connection, Connection))
+ self.assertTrue(client_obj.connection._credentials is CREDS)
+ self.assertEqual(_CALLED, [(BOGUS_ARG,)])
+
+ def test_from_service_account_p12(self):
+ from gcloud._testing import _Monkey
+ from gcloud.pubsub import client
+ from gcloud.pubsub.connection import Connection
+
+ PROJECT = 'PROJECT'
+ KLASS = self._getTargetClass()
+ CREDS = _Credentials()
+ _CALLED = []
+
+ def mock_creds(arg1, arg2):
+ _CALLED.append((arg1, arg2))
+ return CREDS
+
+ BOGUS_ARG1 = object()
+ BOGUS_ARG2 = object()
+ with _Monkey(client, get_for_service_account_p12=mock_creds):
+ client_obj = KLASS.from_service_account_p12(
+ BOGUS_ARG1, BOGUS_ARG2, project=PROJECT)
+
+ self.assertEqual(client_obj.project, PROJECT)
+ self.assertTrue(isinstance(client_obj.connection, Connection))
+ self.assertTrue(client_obj.connection._credentials is CREDS)
+ self.assertEqual(_CALLED, [(BOGUS_ARG1, BOGUS_ARG2)])
+
+ def test_list_topics_no_paging(self):
+ from gcloud.pubsub.topic import Topic
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+
+ RETURNED = {'topics': [{'name': TOPIC_PATH}]}
+ # Replace the connection on the client with one of our own.
+ CLIENT_OBJ.connection = _Connection(RETURNED)
+
+ # Execute request.
+ topics, next_page_token = CLIENT_OBJ.list_topics()
+ # Test values are correct.
+ self.assertEqual(len(topics), 1)
+ self.assertTrue(isinstance(topics[0], Topic))
+ self.assertEqual(topics[0].name, TOPIC_NAME)
+ self.assertEqual(next_page_token, None)
+ self.assertEqual(len(CLIENT_OBJ.connection._requested), 1)
+ req = CLIENT_OBJ.connection._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT)
+ self.assertEqual(req['query_params'], {})
+
+ def test_list_topics_with_paging(self):
+ from gcloud.pubsub.topic import Topic
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ TOKEN1 = 'TOKEN1'
+ TOKEN2 = 'TOKEN2'
+ SIZE = 1
+ RETURNED = {'topics': [{'name': TOPIC_PATH}],
+ 'nextPageToken': TOKEN2}
+ # Replace the connection on the client with one of our own.
+ CLIENT_OBJ.connection = _Connection(RETURNED)
+
+ # Execute request.
+ topics, next_page_token = CLIENT_OBJ.list_topics(SIZE, TOKEN1)
+ # Test values are correct.
+ self.assertEqual(len(topics), 1)
+ self.assertTrue(isinstance(topics[0], Topic))
+ self.assertEqual(topics[0].name, TOPIC_NAME)
+ self.assertEqual(next_page_token, TOKEN2)
+ self.assertEqual(len(CLIENT_OBJ.connection._requested), 1)
+ req = CLIENT_OBJ.connection._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT)
+ self.assertEqual(req['query_params'],
+ {'pageSize': SIZE, 'pageToken': TOKEN1})
+
+ def test_list_subscriptions_no_paging(self):
+ from gcloud.pubsub.subscription import Subscription
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ SUB_NAME = 'subscription_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ SUB_INFO = [{'name': SUB_PATH, 'topic': TOPIC_PATH}]
+ RETURNED = {'subscriptions': SUB_INFO}
+ # Replace the connection on the client with one of our own.
+ CLIENT_OBJ.connection = _Connection(RETURNED)
+
+ # Execute request.
+ subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions()
+ # Test values are correct.
+ self.assertEqual(len(subscriptions), 1)
+ self.assertTrue(isinstance(subscriptions[0], Subscription))
+ self.assertEqual(subscriptions[0].name, SUB_NAME)
+ self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME)
+ self.assertEqual(next_page_token, None)
+ self.assertEqual(len(CLIENT_OBJ.connection._requested), 1)
+ req = CLIENT_OBJ.connection._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
+ self.assertEqual(req['query_params'], {})
+
+ def test_list_subscriptions_with_paging(self):
+ from gcloud.pubsub.subscription import Subscription
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ SUB_NAME = 'subscription_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ ACK_DEADLINE = 42
+ PUSH_ENDPOINT = 'https://push.example.com/endpoint'
+ TOKEN1 = 'TOKEN1'
+ TOKEN2 = 'TOKEN2'
+ SIZE = 1
+ SUB_INFO = [{'name': SUB_PATH,
+ 'topic': TOPIC_PATH,
+ 'ackDeadlineSeconds': ACK_DEADLINE,
+ 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}}]
+ RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN2}
+ # Replace the connection on the client with one of our own.
+ CLIENT_OBJ.connection = _Connection(RETURNED)
+
+ # Execute request.
+ subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions(
+ SIZE, TOKEN1)
+ # Test values are correct.
+ self.assertEqual(len(subscriptions), 1)
+ self.assertTrue(isinstance(subscriptions[0], Subscription))
+ self.assertEqual(subscriptions[0].name, SUB_NAME)
+ self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME)
+ self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE)
+ self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT)
+ self.assertEqual(next_page_token, TOKEN2)
+ self.assertEqual(len(CLIENT_OBJ.connection._requested), 1)
+ req = CLIENT_OBJ.connection._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
+ self.assertEqual(req['query_params'],
+ {'pageSize': SIZE, 'pageToken': TOKEN1})
+
+ def test_list_subscriptions_with_topic_name(self):
+ from gcloud.pubsub.subscription import Subscription
+ PROJECT = 'PROJECT'
+ CREDS = _Credentials()
+
+ CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS)
+
+ SUB_NAME_1 = 'subscription_1'
+ SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1)
+ SUB_NAME_2 = 'subscription_2'
+ SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ SUB_INFO = [{'name': SUB_PATH_1, 'topic': TOPIC_PATH},
+ {'name': SUB_PATH_2, 'topic': TOPIC_PATH}]
+ TOKEN = 'TOKEN'
+ RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN}
+ # Replace the connection on the client with one of our own.
+ CLIENT_OBJ.connection = _Connection(RETURNED)
+
+ # Execute request.
+ subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions(
+ topic_name=TOPIC_NAME)
+ # Test values are correct.
+ self.assertEqual(len(subscriptions), 2)
+ self.assertTrue(isinstance(subscriptions[0], Subscription))
+ self.assertEqual(subscriptions[0].name, SUB_NAME_1)
+ self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME)
+ self.assertTrue(isinstance(subscriptions[1], Subscription))
+ self.assertEqual(subscriptions[1].name, SUB_NAME_2)
+ self.assertEqual(subscriptions[1].topic.name, TOPIC_NAME)
+ self.assertTrue(subscriptions[1].topic is subscriptions[0].topic)
+ self.assertEqual(next_page_token, TOKEN)
+ self.assertEqual(len(CLIENT_OBJ.connection._requested), 1)
+ req = CLIENT_OBJ.connection._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'],
+ '/projects/%s/topics/%s/subscriptions'
+ % (PROJECT, TOPIC_NAME))
+ self.assertEqual(req['query_params'], {})
+
+ def test_topic(self):
+ PROJECT = 'PROJECT'
+ TOPIC_NAME = 'TOPIC_NAME'
+ CREDS = _Credentials()
+
+ client_obj = self._makeOne(project=PROJECT, credentials=CREDS)
+ new_topic = client_obj.topic(TOPIC_NAME)
+ self.assertEqual(new_topic.name, TOPIC_NAME)
+ self.assertTrue(new_topic._client is client_obj)
+ self.assertEqual(new_topic.project, PROJECT)
+ self.assertEqual(new_topic.full_name,
+ 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
+ self.assertFalse(new_topic.timestamp_messages)
+
+
+class _Credentials(object):
+
+ _scopes = None
+
+ @staticmethod
+ def create_scoped_required():
+ return True
+
+ def create_scoped(self, scope):
+ self._scopes = scope
+ return self
+
+
+class _Connection(object):
+
+ def __init__(self, *responses):
+ self._responses = responses
+ self._requested = []
+
+ def api_request(self, **kw):
+ self._requested.append(kw)
+ response, self._responses = self._responses[0], self._responses[1:]
+ return response
diff --git a/gcloud/search/test_connection.py b/gcloud/search/test_connection.py
new file mode 100644
index 000000000000..4a8618388e4e
--- /dev/null
+++ b/gcloud/search/test_connection.py
@@ -0,0 +1,46 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest2
+
+
+class TestConnection(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.connection import Connection
+ return Connection
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_build_api_url_no_extra_query_params(self):
+ conn = self._makeOne()
+ URI = '/'.join([
+ conn.API_BASE_URL,
+ conn.API_VERSION,
+ 'foo',
+ ])
+ self.assertEqual(conn.build_api_url('/foo'), URI)
+
+ def test_build_api_url_w_extra_query_params(self):
+ from six.moves.urllib.parse import parse_qsl
+ from six.moves.urllib.parse import urlsplit
+ conn = self._makeOne()
+ uri = conn.build_api_url('/foo', {'bar': 'baz'})
+ scheme, netloc, path, qs, _ = urlsplit(uri)
+ self.assertEqual('%s://%s' % (scheme, netloc), conn.API_BASE_URL)
+ self.assertEqual(path,
+ '/'.join(['', conn.API_VERSION, 'foo']))
+ parms = dict(parse_qsl(qs))
+ self.assertEqual(parms['bar'], 'baz')
diff --git a/gcloud/search/test_message.py b/gcloud/search/test_message.py
new file mode 100644
index 000000000000..38ad240e6199
--- /dev/null
+++ b/gcloud/search/test_message.py
@@ -0,0 +1,104 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest2
+
+
+class TestMessage(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.message import Message
+ return Message
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_ctor_no_attributes(self):
+ DATA = b'DEADBEEF'
+ MESSAGE_ID = b'12345'
+ message = self._makeOne(data=DATA, message_id=MESSAGE_ID)
+ self.assertEqual(message.data, DATA)
+ self.assertEqual(message.message_id, MESSAGE_ID)
+ self.assertEqual(message.attributes, {})
+
+ def test_ctor_w_attributes(self):
+ DATA = b'DEADBEEF'
+ MESSAGE_ID = b'12345'
+ ATTRS = {'a': 'b'}
+ message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
+ attributes=ATTRS)
+ self.assertEqual(message.data, DATA)
+ self.assertEqual(message.message_id, MESSAGE_ID)
+ self.assertEqual(message.attributes, ATTRS)
+
+ def test_from_api_repr_no_attributes(self):
+ from base64 import b64encode as b64
+ DATA = b'DEADBEEF'
+ B64_DATA = b64(DATA)
+ MESSAGE_ID = '12345'
+ api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID}
+ message = self._getTargetClass().from_api_repr(api_repr)
+ self.assertEqual(message.data, DATA)
+ self.assertEqual(message.message_id, MESSAGE_ID)
+ self.assertEqual(message.attributes, {})
+
+ def test_from_api_repr_w_attributes(self):
+ from base64 import b64encode as b64
+ DATA = b'DEADBEEF'
+ B64_DATA = b64(DATA)
+ MESSAGE_ID = '12345'
+ ATTRS = {'a': 'b'}
+ api_repr = {'data': B64_DATA,
+ 'messageId': MESSAGE_ID,
+ 'attributes': ATTRS}
+ message = self._getTargetClass().from_api_repr(api_repr)
+ self.assertEqual(message.data, DATA)
+ self.assertEqual(message.message_id, MESSAGE_ID)
+ self.assertEqual(message.attributes, ATTRS)
+
+ def test_timestamp_no_attributes(self):
+ DATA = b'DEADBEEF'
+ MESSAGE_ID = b'12345'
+ message = self._makeOne(data=DATA, message_id=MESSAGE_ID)
+
+ def _to_fail():
+ return message.timestamp
+
+ self.assertRaises(ValueError, _to_fail)
+
+ def test_timestamp_wo_timestamp_in_attributes(self):
+ DATA = b'DEADBEEF'
+ MESSAGE_ID = b'12345'
+ ATTRS = {'a': 'b'}
+ message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
+ attributes=ATTRS)
+
+ def _to_fail():
+ return message.timestamp
+
+ self.assertRaises(ValueError, _to_fail)
+
+ def test_timestamp_w_timestamp_in_attributes(self):
+ from datetime import datetime
+ from pytz import utc
+ from gcloud._helpers import _RFC3339_MICROS
+ DATA = b'DEADBEEF'
+ MESSAGE_ID = b'12345'
+ TIMESTAMP = '2015-04-10T18:42:27.131956Z'
+ naive = datetime.strptime(TIMESTAMP, _RFC3339_MICROS)
+ timestamp = naive.replace(tzinfo=utc)
+ ATTRS = {'timestamp': TIMESTAMP}
+ message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
+ attributes=ATTRS)
+ self.assertEqual(message.timestamp, timestamp)
diff --git a/gcloud/search/test_subscription.py b/gcloud/search/test_subscription.py
new file mode 100644
index 000000000000..db966ac3c12a
--- /dev/null
+++ b/gcloud/search/test_subscription.py
@@ -0,0 +1,520 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest2
+
+
+class TestSubscription(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.subscription import Subscription
+ return Subscription
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_ctor_defaults(self):
+ SUB_NAME = 'sub_name'
+ topic = object()
+ subscription = self._makeOne(SUB_NAME, topic)
+ self.assertEqual(subscription.name, SUB_NAME)
+ self.assertTrue(subscription.topic is topic)
+ self.assertEqual(subscription.ack_deadline, None)
+ self.assertEqual(subscription.push_endpoint, None)
+
+ def test_ctor_explicit(self):
+ SUB_NAME = 'sub_name'
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ topic = object()
+ subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT)
+ self.assertEqual(subscription.name, SUB_NAME)
+ self.assertTrue(subscription.topic is topic)
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+
+ def test_from_api_repr_no_topics(self):
+ from gcloud.pubsub.topic import Topic
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ resource = {'topic': TOPIC_PATH,
+ 'name': SUB_PATH,
+ 'ackDeadlineSeconds': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}}
+ klass = self._getTargetClass()
+ client = _Client(project=PROJECT)
+ subscription = klass.from_api_repr(resource, client)
+ self.assertEqual(subscription.name, SUB_NAME)
+ topic = subscription.topic
+ self.assertTrue(isinstance(topic, Topic))
+ self.assertEqual(topic.name, TOPIC_NAME)
+ self.assertEqual(topic.project, PROJECT)
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+
+ def test_from_api_repr_w_topics_no_topic_match(self):
+ from gcloud.pubsub.topic import Topic
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ resource = {'topic': TOPIC_PATH,
+ 'name': SUB_PATH,
+ 'ackDeadlineSeconds': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}}
+ topics = {}
+ klass = self._getTargetClass()
+ client = _Client(project=PROJECT)
+ subscription = klass.from_api_repr(resource, client, topics=topics)
+ self.assertEqual(subscription.name, SUB_NAME)
+ topic = subscription.topic
+ self.assertTrue(isinstance(topic, Topic))
+ self.assertTrue(topic is topics[TOPIC_PATH])
+ self.assertEqual(topic.name, TOPIC_NAME)
+ self.assertEqual(topic.project, PROJECT)
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+
+ def test_from_api_repr_w_topics_w_topic_match(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ resource = {'topic': TOPIC_PATH,
+ 'name': SUB_PATH,
+ 'ackDeadlineSeconds': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}}
+ topic = object()
+ topics = {TOPIC_PATH: topic}
+ klass = self._getTargetClass()
+ client = _Client(project=PROJECT)
+ subscription = klass.from_api_repr(resource, client, topics=topics)
+ self.assertEqual(subscription.name, SUB_NAME)
+ self.assertTrue(subscription.topic is topic)
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+
+ def test_create_pull_wo_ack_deadline_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ BODY = {'topic': TOPIC_PATH}
+ conn = _Connection({'name': SUB_PATH})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.create()
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'PUT')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+ self.assertEqual(req['data'], BODY)
+
+ def test_create_push_w_ack_deadline_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ BODY = {'topic': TOPIC_PATH,
+ 'ackDeadline': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}}
+ conn1 = _Connection({'name': SUB_PATH})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'name': SUB_PATH})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT)
+ subscription.create(client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'PUT')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+ self.assertEqual(req['data'], BODY)
+
+ def test_exists_miss_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ conn = _Connection()
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ self.assertFalse(subscription.exists())
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+ self.assertEqual(req.get('query_params'), None)
+
+ def test_exists_hit_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ self.assertTrue(subscription.exists(client=CLIENT2))
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+ self.assertEqual(req.get('query_params'), None)
+
+ def test_reload_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ conn = _Connection({'name': SUB_PATH,
+ 'topic': TOPIC_PATH,
+ 'ackDeadline': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.reload()
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+
+ def test_reload_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ DEADLINE = 42
+ ENDPOINT = 'https://api.example.com/push'
+ conn1 = _Connection()
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'name': SUB_PATH,
+ 'topic': TOPIC_PATH,
+ 'ackDeadline': DEADLINE,
+ 'pushConfig': {'pushEndpoint': ENDPOINT}})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.reload(client=CLIENT2)
+ self.assertEqual(subscription.ack_deadline, DEADLINE)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+
+ def test_modify_push_config_w_endpoint_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ENDPOINT = 'https://api.example.com/push'
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.modify_push_configuration(push_endpoint=ENDPOINT)
+ self.assertEqual(subscription.push_endpoint, ENDPOINT)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'pushConfig': {'pushEndpoint': ENDPOINT}})
+
+ def test_modify_push_config_wo_endpoint_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ENDPOINT = 'https://api.example.com/push'
+ conn1 = _Connection({})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic, push_endpoint=ENDPOINT)
+ subscription.modify_push_configuration(push_endpoint=None,
+ client=CLIENT2)
+ self.assertEqual(subscription.push_endpoint, None)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH)
+ self.assertEqual(req['data'], {'pushConfig': {}})
+
+ def test_pull_wo_return_immediately_max_messages_w_bound_client(self):
+ import base64
+ from gcloud.pubsub.message import Message
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID = 'DEADBEEF'
+ MSG_ID = 'BEADCAFE'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD)
+ MESSAGE = {'messageId': MSG_ID, 'data': B64}
+ REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
+ conn = _Connection({'receivedMessages': [REC_MESSAGE]})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ pulled = subscription.pull()
+ self.assertEqual(len(pulled), 1)
+ ack_id, message = pulled[0]
+ self.assertEqual(ack_id, ACK_ID)
+ self.assertTrue(isinstance(message, Message))
+ self.assertEqual(message.data, PAYLOAD)
+ self.assertEqual(message.message_id, MSG_ID)
+ self.assertEqual(message.attributes, {})
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:pull' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'returnImmediately': False, 'maxMessages': 1})
+
+ def test_pull_w_return_immediately_w_max_messages_w_alt_client(self):
+ import base64
+ from gcloud.pubsub.message import Message
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID = 'DEADBEEF'
+ MSG_ID = 'BEADCAFE'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD)
+ MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}}
+ REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
+ conn1 = _Connection()
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'receivedMessages': [REC_MESSAGE]})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ pulled = subscription.pull(return_immediately=True, max_messages=3,
+ client=CLIENT2)
+ self.assertEqual(len(pulled), 1)
+ ack_id, message = pulled[0]
+ self.assertEqual(ack_id, ACK_ID)
+ self.assertTrue(isinstance(message, Message))
+ self.assertEqual(message.data, PAYLOAD)
+ self.assertEqual(message.message_id, MSG_ID)
+ self.assertEqual(message.attributes, {'a': 'b'})
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:pull' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'returnImmediately': True, 'maxMessages': 3})
+
+ def test_pull_wo_receivedMessages(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ pulled = subscription.pull(return_immediately=False)
+ self.assertEqual(len(pulled), 0)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:pull' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'returnImmediately': False, 'maxMessages': 1})
+
+ def test_acknowledge_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID1 = 'DEADBEEF'
+ ACK_ID2 = 'BEADCAFE'
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.acknowledge([ACK_ID1, ACK_ID2])
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH)
+ self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]})
+
+ def test_acknowledge_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID1 = 'DEADBEEF'
+ ACK_ID2 = 'BEADCAFE'
+ conn1 = _Connection({})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.acknowledge([ACK_ID1, ACK_ID2], client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH)
+ self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]})
+
+ def test_modify_ack_deadline_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID = 'DEADBEEF'
+ DEADLINE = 42
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.modify_ack_deadline(ACK_ID, DEADLINE)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE})
+
+ def test_modify_ack_deadline_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ ACK_ID = 'DEADBEEF'
+ DEADLINE = 42
+ conn1 = _Connection({})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.modify_ack_deadline(ACK_ID, DEADLINE, client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH)
+ self.assertEqual(req['data'],
+ {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE})
+
+ def test_delete_w_bound_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = _Topic(TOPIC_NAME, client=CLIENT)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.delete()
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'DELETE')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+
+ def test_delete_w_alternate_client(self):
+ PROJECT = 'PROJECT'
+ SUB_NAME = 'sub_name'
+ SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
+ TOPIC_NAME = 'topic_name'
+ conn1 = _Connection({})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = _Topic(TOPIC_NAME, client=CLIENT1)
+ subscription = self._makeOne(SUB_NAME, topic)
+ subscription.delete(client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'DELETE')
+ self.assertEqual(req['path'], '/%s' % SUB_PATH)
+
+
+class _Connection(object):
+
+ def __init__(self, *responses):
+ self._responses = responses
+ self._requested = []
+
+ def api_request(self, **kw):
+ from gcloud.exceptions import NotFound
+ self._requested.append(kw)
+
+ try:
+ response, self._responses = self._responses[0], self._responses[1:]
+ except:
+ raise NotFound('miss')
+ else:
+ return response
+
+
+class _Topic(object):
+
+ def __init__(self, name, client):
+ self.name = name
+ self._client = client
+ self.project = client.project
+ self.full_name = 'projects/%s/topics/%s' % (client.project, name)
+ self.path = '/projects/%s/topics/%s' % (client.project, name)
+
+
+class _Client(object):
+
+ def __init__(self, project, connection=None):
+ self.project = project
+ self.connection = connection
diff --git a/gcloud/search/test_topic.py b/gcloud/search/test_topic.py
new file mode 100644
index 000000000000..4d4942db6a23
--- /dev/null
+++ b/gcloud/search/test_topic.py
@@ -0,0 +1,550 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest2
+
+
+class TestTopic(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.topic import Topic
+ return Topic
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_ctor_w_explicit_timestamp(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ CLIENT = _Client(project=PROJECT)
+ topic = self._makeOne(TOPIC_NAME,
+ client=CLIENT,
+ timestamp_messages=True)
+ self.assertEqual(topic.name, TOPIC_NAME)
+ self.assertEqual(topic.project, PROJECT)
+ self.assertEqual(topic.full_name,
+ 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
+ self.assertTrue(topic.timestamp_messages)
+
+ def test_from_api_repr(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ CLIENT = _Client(project=PROJECT)
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ resource = {'name': PATH}
+ klass = self._getTargetClass()
+ topic = klass.from_api_repr(resource, client=CLIENT)
+ self.assertEqual(topic.name, TOPIC_NAME)
+ self.assertTrue(topic._client is CLIENT)
+ self.assertEqual(topic.project, PROJECT)
+ self.assertEqual(topic.full_name, PATH)
+
+ def test_from_api_repr_with_bad_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT1 = 'PROJECT1'
+ PROJECT2 = 'PROJECT2'
+ CLIENT = _Client(project=PROJECT1)
+ PATH = 'projects/%s/topics/%s' % (PROJECT2, TOPIC_NAME)
+ resource = {'name': PATH}
+ klass = self._getTargetClass()
+ self.assertRaises(ValueError, klass.from_api_repr,
+ resource, client=CLIENT)
+
+ def test_create_w_bound_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({'name': PATH})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ topic.create()
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'PUT')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+ def test_create_w_alternate_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({'name': PATH})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'name': PATH})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT1)
+ topic.create(client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'PUT')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+ def test_exists_miss_w_bound_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection()
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ self.assertFalse(topic.exists())
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+ def test_exists_hit_w_alternate_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({'name': PATH})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'name': PATH})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT1)
+ self.assertTrue(topic.exists(client=CLIENT2))
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'GET')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+ def test_publish_single_bytes_wo_attrs_w_bound_client(self):
+ import base64
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MSGID = 'DEADBEEF'
+ MESSAGE = {'data': B64,
+ 'attributes': {}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({'messageIds': [MSGID]})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ msgid = topic.publish(PAYLOAD)
+ self.assertEqual(msgid, MSGID)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE]})
+
+ def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self):
+ import base64
+ import datetime
+ from gcloud.pubsub import topic as MUT
+ from gcloud._helpers import _RFC3339_MICROS
+ from gcloud._testing import _Monkey
+ NOW = datetime.datetime.utcnow()
+
+ def _utcnow():
+ return NOW
+
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MSGID = 'DEADBEEF'
+ MESSAGE = {'data': B64,
+ 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({'messageIds': [MSGID]})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'messageIds': [MSGID]})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT1,
+ timestamp_messages=True)
+ with _Monkey(MUT, _NOW=_utcnow):
+ msgid = topic.publish(PAYLOAD, client=CLIENT2)
+
+ self.assertEqual(msgid, MSGID)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE]})
+
+ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self):
+ import base64
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MSGID = 'DEADBEEF'
+ OVERRIDE = '2015-04-10T16:46:22.868399Z'
+ MESSAGE = {'data': B64,
+ 'attributes': {'timestamp': OVERRIDE}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({'messageIds': [MSGID]})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT,
+ timestamp_messages=True)
+ msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE)
+ self.assertEqual(msgid, MSGID)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE]})
+
+ def test_publish_single_w_attrs(self):
+ import base64
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MSGID = 'DEADBEEF'
+ MESSAGE = {'data': B64,
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({'messageIds': [MSGID]})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2')
+ self.assertEqual(msgid, MSGID)
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE]})
+
+ def test_publish_multiple_w_bound_client(self):
+ import base64
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ with topic.batch() as batch:
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ self.assertEqual(list(batch), [MSGID1, MSGID2])
+ self.assertEqual(list(batch.messages), [])
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]})
+
+ def test_publish_multiple_w_alternate_client(self):
+ import base64
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT1)
+ with topic.batch(client=CLIENT2) as batch:
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ self.assertEqual(list(batch), [MSGID1, MSGID2])
+ self.assertEqual(list(batch.messages), [])
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '/%s:publish' % PATH)
+ self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]})
+
+ def test_publish_multiple_error(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ conn = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT = _Client(project=PROJECT)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ try:
+ with topic.batch() as batch:
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ raise _Bugout()
+ except _Bugout:
+ pass
+ self.assertEqual(list(batch), [])
+ self.assertEqual(len(conn._requested), 0)
+
+ def test_delete_w_bound_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn = _Connection({})
+ CLIENT = _Client(project=PROJECT, connection=conn)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT)
+ topic.delete()
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'DELETE')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+ def test_delete_w_alternate_client(self):
+ TOPIC_NAME = 'topic_name'
+ PROJECT = 'PROJECT'
+ PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
+ conn1 = _Connection({})
+ CLIENT1 = _Client(project=PROJECT, connection=conn1)
+ conn2 = _Connection({})
+ CLIENT2 = _Client(project=PROJECT, connection=conn2)
+ topic = self._makeOne(TOPIC_NAME, client=CLIENT1)
+ topic.delete(client=CLIENT2)
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'DELETE')
+ self.assertEqual(req['path'], '/%s' % PATH)
+
+
+class TestBatch(unittest2.TestCase):
+
+ def _getTargetClass(self):
+ from gcloud.pubsub.topic import Batch
+ return Batch
+
+ def _makeOne(self, *args, **kwargs):
+ return self._getTargetClass()(*args, **kwargs)
+
+ def test_ctor_defaults(self):
+ topic = _Topic()
+ CLIENT = _Client(project='PROJECT')
+ batch = self._makeOne(topic, CLIENT)
+ self.assertTrue(batch.topic is topic)
+ self.assertTrue(batch.client is CLIENT)
+ self.assertEqual(len(batch.messages), 0)
+ self.assertEqual(len(batch.message_ids), 0)
+
+ def test___iter___empty(self):
+ topic = _Topic()
+ client = object()
+ batch = self._makeOne(topic, client)
+ self.assertEqual(list(batch), [])
+
+ def test___iter___non_empty(self):
+ topic = _Topic()
+ client = object()
+ batch = self._makeOne(topic, client)
+ batch.message_ids[:] = ['ONE', 'TWO', 'THREE']
+ self.assertEqual(list(batch), ['ONE', 'TWO', 'THREE'])
+
+ def test_publish_bytes_wo_attrs(self):
+ import base64
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MESSAGE = {'data': B64,
+ 'attributes': {}}
+ connection = _Connection()
+ CLIENT = _Client(project='PROJECT', connection=connection)
+ topic = _Topic()
+ batch = self._makeOne(topic, client=CLIENT)
+ batch.publish(PAYLOAD)
+ self.assertEqual(len(connection._requested), 0)
+ self.assertEqual(batch.messages, [MESSAGE])
+
+ def test_publish_bytes_w_add_timestamp(self):
+ import base64
+ PAYLOAD = b'This is the message text'
+ B64 = base64.b64encode(PAYLOAD).decode('ascii')
+ MESSAGE = {'data': B64,
+ 'attributes': {'timestamp': 'TIMESTAMP'}}
+ connection = _Connection()
+ CLIENT = _Client(project='PROJECT', connection=connection)
+ topic = _Topic(timestamp_messages=True)
+ batch = self._makeOne(topic, client=CLIENT)
+ batch.publish(PAYLOAD)
+ self.assertEqual(len(connection._requested), 0)
+ self.assertEqual(batch.messages, [MESSAGE])
+
+ def test_commit_w_bound_client(self):
+ import base64
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ conn = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT = _Client(project='PROJECT', connection=conn)
+ topic = _Topic()
+ batch = self._makeOne(topic, client=CLIENT)
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ batch.commit()
+ self.assertEqual(list(batch), [MSGID1, MSGID2])
+ self.assertEqual(list(batch.messages), [])
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '%s:publish' % topic.path)
+ self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]})
+
+ def test_commit_w_alternate_client(self):
+ import base64
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ conn1 = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT1 = _Client(project='PROJECT', connection=conn1)
+ conn2 = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT2 = _Client(project='PROJECT', connection=conn2)
+ topic = _Topic()
+ batch = self._makeOne(topic, client=CLIENT1)
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ batch.commit(client=CLIENT2)
+ self.assertEqual(list(batch), [MSGID1, MSGID2])
+ self.assertEqual(list(batch.messages), [])
+ self.assertEqual(len(conn1._requested), 0)
+ self.assertEqual(len(conn2._requested), 1)
+ req = conn2._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '%s:publish' % topic.path)
+ self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]})
+
+ def test_context_mgr_success(self):
+ import base64
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ conn = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT = _Client(project='PROJECT', connection=conn)
+ topic = _Topic()
+ batch = self._makeOne(topic, client=CLIENT)
+
+ with batch as other:
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+
+ self.assertTrue(other is batch)
+ self.assertEqual(list(batch), [MSGID1, MSGID2])
+ self.assertEqual(list(batch.messages), [])
+ self.assertEqual(len(conn._requested), 1)
+ req = conn._requested[0]
+ self.assertEqual(req['method'], 'POST')
+ self.assertEqual(req['path'], '%s:publish' % topic.path)
+ self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]})
+
+ def test_context_mgr_failure(self):
+ import base64
+ PAYLOAD1 = b'This is the first message text'
+ PAYLOAD2 = b'This is the second message text'
+ B64_1 = base64.b64encode(PAYLOAD1)
+ B64_2 = base64.b64encode(PAYLOAD2)
+ MSGID1 = 'DEADBEEF'
+ MSGID2 = 'BEADCAFE'
+ MESSAGE1 = {'data': B64_1.decode('ascii'),
+ 'attributes': {}}
+ MESSAGE2 = {'data': B64_2.decode('ascii'),
+ 'attributes': {'attr1': 'value1', 'attr2': 'value2'}}
+ conn = _Connection({'messageIds': [MSGID1, MSGID2]})
+ CLIENT = _Client(project='PROJECT', connection=conn)
+ topic = _Topic()
+ batch = self._makeOne(topic, client=CLIENT)
+
+ try:
+ with batch as other:
+ batch.publish(PAYLOAD1)
+ batch.publish(PAYLOAD2, attr1='value1', attr2='value2')
+ raise _Bugout()
+ except _Bugout:
+ pass
+
+ self.assertTrue(other is batch)
+ self.assertEqual(list(batch), [])
+ self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2])
+ self.assertEqual(len(conn._requested), 0)
+
+
+class _Connection(object):
+
+ def __init__(self, *responses):
+ self._responses = responses
+ self._requested = []
+
+ def api_request(self, **kw):
+ from gcloud.exceptions import NotFound
+ self._requested.append(kw)
+
+ try:
+ response, self._responses = self._responses[0], self._responses[1:]
+ except:
+ raise NotFound('miss')
+ else:
+ return response
+
+
+class _Topic(object):
+
+ def __init__(self, name="NAME", project="PROJECT",
+ timestamp_messages=False):
+ self.path = '/projects/%s/topics/%s' % (project, name)
+ self.timestamp_messages = timestamp_messages
+
+ def _timestamp_message(self, attrs):
+ if self.timestamp_messages:
+ attrs['timestamp'] = 'TIMESTAMP'
+
+
+class _Client(object):
+
+ def __init__(self, project, connection=None):
+ self.project = project
+ self.connection = connection
+
+
+class _Bugout(Exception):
+ pass
diff --git a/gcloud/search/topic.py b/gcloud/search/topic.py
new file mode 100644
index 000000000000..9a0af105747d
--- /dev/null
+++ b/gcloud/search/topic.py
@@ -0,0 +1,255 @@
+# Copyright 2015 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define API Topics."""
+
+import base64
+import datetime
+
+from gcloud._helpers import _RFC3339_MICROS
+from gcloud.exceptions import NotFound
+
+_NOW = datetime.datetime.utcnow
+
+
+class Topic(object):
+ """Topics are targets to which messages can be published.
+
+ Subscribers then receive those messages.
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics
+
+ :type name: string
+ :param name: the name of the topic
+
+ :type client: :class:`gcloud.pubsub.client.Client`
+ :param client: A client which holds credentials and project configuration
+ for the topic (which requires a project).
+
+ :type timestamp_messages: boolean
+ :param timestamp_messages: If true, the topic will add a ``timestamp`` key
+ to the attributes of each published message:
+ the value will be an RFC 3339 timestamp.
+ """
+ def __init__(self, name, client, timestamp_messages=False):
+ self.name = name
+ self._client = client
+ self.timestamp_messages = timestamp_messages
+
+ @classmethod
+ def from_api_repr(cls, resource, client):
+ """Factory: construct a topic given its API representation
+
+ :type resource: dict
+ :param resource: topic resource representation returned from the API
+
+ :type client: :class:`gcloud.pubsub.client.Client`
+ :param client: Client which holds credentials and project
+ configuration for the topic.
+
+ :rtype: :class:`gcloud.pubsub.topic.Topic`
+ :returns: Topic parsed from ``resource``.
+ :raises: :class:`ValueError` if ``client`` is not ``None`` and the
+ project from the resource does not agree with the project
+ from the client.
+ """
+ _, project, _, name = resource['name'].split('/')
+ if client.project != project:
+ raise ValueError('Project from clientshould agree with '
+ 'project from resource.')
+ return cls(name, client=client)
+
+ @property
+ def project(self):
+ """Project bound to the topic."""
+ return self._client.project
+
+ @property
+ def full_name(self):
+ """Fully-qualified name used in topic / subscription APIs"""
+ return 'projects/%s/topics/%s' % (self.project, self.name)
+
+ @property
+ def path(self):
+ """URL path for the topic's APIs"""
+ return '/%s' % (self.full_name)
+
+ def _require_client(self, client):
+ """Check client or verify over-ride.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+
+ :rtype: :class:`gcloud.pubsub.client.Client`
+ :returns: The client passed in or the currently bound client.
+ """
+ if client is None:
+ client = self._client
+ return client
+
+ def create(self, client=None):
+ """API call: create the topic via a PUT request
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/create
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+ """
+ client = self._require_client(client)
+ client.connection.api_request(method='PUT', path=self.path)
+
+ def exists(self, client=None):
+ """API call: test for the existence of the topic via a GET request
+
+ See
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/get
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+ """
+ client = self._require_client(client)
+
+ try:
+ client.connection.api_request(method='GET', path=self.path)
+ except NotFound:
+ return False
+ else:
+ return True
+
+ def _timestamp_message(self, attrs):
+ """Add a timestamp to ``attrs``, if the topic is so configured.
+
+ If ``attrs`` already has the key, do nothing.
+
+ Helper method for ``publish``/``Batch.publish``.
+ """
+ if self.timestamp_messages and 'timestamp' not in attrs:
+ attrs['timestamp'] = _NOW().strftime(_RFC3339_MICROS)
+
+ def publish(self, message, client=None, **attrs):
+ """API call: publish a message to a topic via a POST request
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/publish
+
+ :type message: bytes
+ :param message: the message payload
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+
+ :type attrs: dict (string -> string)
+ :message attrs: key-value pairs to send as message attributes
+
+ :rtype: str
+ :returns: message ID assigned by the server to the published message
+ """
+ client = self._require_client(client)
+
+ self._timestamp_message(attrs)
+ message_b = base64.b64encode(message).decode('ascii')
+ message_data = {'data': message_b, 'attributes': attrs}
+ data = {'messages': [message_data]}
+ response = client.connection.api_request(
+ method='POST', path='%s:publish' % (self.path,), data=data)
+ return response['messageIds'][0]
+
+ def batch(self, client=None):
+ """Return a batch to use as a context manager.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+
+ :rtype: :class:`Batch`
+ :returns: A batch to use as a context manager.
+ """
+ client = self._require_client(client)
+ return Batch(self, client)
+
+ def delete(self, client=None):
+ """API call: delete the topic via a DELETE request
+
+ See:
+ https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/delete
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current topic.
+ """
+ client = self._require_client(client)
+ client.connection.api_request(method='DELETE', path=self.path)
+
+
+class Batch(object):
+ """Context manager: collect messages to publish via a single API call.
+
+ Helper returned by :meth:Topic.batch
+
+ :type topic: :class:`gcloud.pubsub.topic.Topic`
+ :param topic: the topic being published
+
+ :type client: :class:`gcloud.pubsub.client.Client`
+ :param client: The client to use.
+ """
+ def __init__(self, topic, client):
+ self.topic = topic
+ self.messages = []
+ self.message_ids = []
+ self.client = client
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is None:
+ self.commit()
+
+ def __iter__(self):
+ return iter(self.message_ids)
+
+ def publish(self, message, **attrs):
+ """Emulate publishing a message, but save it.
+
+ :type message: bytes
+ :param message: the message payload
+
+ :type attrs: dict (string -> string)
+ :message attrs: key-value pairs to send as message attributes
+ """
+ self.topic._timestamp_message(attrs)
+ self.messages.append(
+ {'data': base64.b64encode(message).decode('ascii'),
+ 'attributes': attrs})
+
+ def commit(self, client=None):
+ """Send saved messages as a single API call.
+
+ :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
+ :param client: the client to use. If not passed, falls back to the
+ ``client`` stored on the current batch.
+ """
+ if client is None:
+ client = self.client
+ response = client.connection.api_request(
+ method='POST', path='%s:publish' % self.topic.path,
+ data={'messages': self.messages[:]})
+ self.message_ids.extend(response['messageIds'])
+ del self.messages[:]