Skip to content

Commit

Permalink
Merge pull request googleapis#2706 from dhermes/migrate-datastore-ite…
Browse files Browse the repository at this point in the history
…rator

Migrate datastore iterator to use base iterator class
  • Loading branch information
dhermes authored Nov 9, 2016
2 parents 43631a8 + 7a97e00 commit 1abfcbe
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 326 deletions.
29 changes: 20 additions & 9 deletions datastore/google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,30 @@ def query(self, **kwargs):
>>> query = client.query(kind='MyKind')
>>> query.add_filter('property', '=', 'val')
Using the query iterator's
:meth:`~google.cloud.datastore.query.Iterator.next_page` method:
Using the query iterator
.. code-block:: python
>>> query_iter = query.fetch()
>>> entities, more_results, cursor = query_iter.next_page()
>>> entities
[<list of Entity unmarshalled from protobuf>]
>>> more_results
<boolean of more results>
>>> cursor
<string containing cursor where fetch stopped>
>>> for entity in query_iter:
... do_something(entity)
or manually page through results
.. code-block:: python
>>> query_iter = query.fetch(start_cursor='2mdd223i944')
>>> pages = query_iter.pages
>>>
>>> first_page = next(pages)
>>> first_page_entities = list(first_page)
>>> query_iter.next_page_token
'abc-some-cursor'
>>>
>>> second_page = next(pages)
>>> second_page_entities = list(second_page)
>>> query_iter.next_page_token is None
True
Under the hood this is doing:
Expand Down
163 changes: 103 additions & 60 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@
import base64

from google.cloud._helpers import _ensure_tuple_or_list
from google.cloud.iterator import Iterator as BaseIterator
from google.cloud.iterator import Page

from google.cloud.datastore._generated import query_pb2 as _query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key


_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED

_FINISHED = (
_query_pb2.QueryResultBatch.NO_MORE_RESULTS,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
)


class Query(object):
"""A Query against the Cloud Datastore.
Expand Down Expand Up @@ -355,18 +367,19 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
client = self._client

return Iterator(
self, client, limit, offset, start_cursor, end_cursor)
self, client, limit=limit, offset=offset,
start_cursor=start_cursor, end_cursor=end_cursor)


class Iterator(object):
class Iterator(BaseIterator):
"""Represent the state of a given execution of a Query.
:type query: :class:`google.cloud.datastore.query.Query`
:type query: :class:`~google.cloud.datastore.query.Query`
:param query: Query object holding permanent configuration (i.e.
things that don't change on with each page in
a results set).
:type client: :class:`google.cloud.datastore.client.Client`
:type client: :class:`~google.cloud.datastore.client.Client`
:param client: The client used to make a request.
:type limit: int
Expand All @@ -384,97 +397,109 @@ class Iterator(object):
query results.
"""

_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED

_FINISHED = (
_query_pb2.QueryResultBatch.NO_MORE_RESULTS,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
)
next_page_token = None

def __init__(self, query, client, limit=None, offset=None,
start_cursor=None, end_cursor=None):
super(Iterator, self).__init__(
client=client, item_to_value=_item_to_entity,
page_token=start_cursor, max_results=limit)
self._query = query
self._client = client
self._limit = limit
self._offset = offset
self._start_cursor = start_cursor
self._end_cursor = end_cursor
self._page = self._more_results = None
self._skipped_results = None
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0

def next_page(self):
"""Fetch a single "page" of query results.
def _build_protobuf(self):
"""Build a query protobuf.
Low-level API for fine control: the more convenient API is
to iterate on the current Iterator.
Relies on the current state of the iterator.
:rtype: tuple, (entities, more_results, cursor)
:returns: The next page of results.
:rtype:
:class:`google.cloud.datastore._generated.query_pb2.Query`
:returns: The query protobuf object for the current
state of the iterator.
"""
pb = _pb_from_query(self._query)

start_cursor = self._start_cursor
start_cursor = self.next_page_token
if start_cursor is not None:
pb.start_cursor = base64.urlsafe_b64decode(start_cursor)

end_cursor = self._end_cursor
if end_cursor is not None:
pb.end_cursor = base64.urlsafe_b64decode(end_cursor)

if self._limit is not None:
pb.limit.value = self._limit
if self.max_results is not None:
pb.limit.value = self.max_results - self.num_results

if self._offset is not None:
pb.offset = self._offset
# NOTE: The offset goes down relative to the location
# because we are updating the cursor each time.
pb.offset = self._offset - self._skipped_results

transaction = self._client.current_transaction
return pb

query_results = self._client.connection.run_query(
query_pb=pb,
project=self._query.project,
namespace=self._query.namespace,
transaction_id=transaction and transaction.id,
)
(entity_pbs, cursor_as_bytes,
more_results_enum, self._skipped_results) = query_results
def _process_query_results(self, entity_pbs, cursor_as_bytes,
more_results_enum, skipped_results):
"""Process the response from a datastore query.
:type entity_pbs: iterable
:param entity_pbs: The entities returned in the current page.
:type cursor_as_bytes: bytes
:param cursor_as_bytes: The end cursor of the query.
:type more_results_enum:
:class:`._generated.query_pb2.QueryResultBatch.MoreResultsType`
:param more_results_enum: Enum indicating if there are more results.
:type skipped_results: int
:param skipped_results: The number of skipped results.
if cursor_as_bytes == b'':
self._start_cursor = None
:rtype: iterable
:returns: The next page of entity results.
:raises ValueError: If ``more_results`` is an unexpected value.
"""
self._skipped_results = skipped_results

if cursor_as_bytes == b'': # Empty-value for bytes.
self.next_page_token = None
else:
self._start_cursor = base64.urlsafe_b64encode(cursor_as_bytes)
self.next_page_token = base64.urlsafe_b64encode(cursor_as_bytes)
self._end_cursor = None

if more_results_enum == self._NOT_FINISHED:
if more_results_enum == _NOT_FINISHED:
self._more_results = True
elif more_results_enum in self._FINISHED:
elif more_results_enum in _FINISHED:
self._more_results = False
else:
raise ValueError('Unexpected value returned for `more_results`.')

self._page = [
helpers.entity_from_protobuf(entity)
for entity in entity_pbs]
return self._page, self._more_results, self._start_cursor
return entity_pbs

def __iter__(self):
"""Generator yielding all results matching our query.
def _next_page(self):
"""Get the next page in the iterator.
:rtype: sequence of :class:`google.cloud.datastore.entity.Entity`
:rtype: :class:`~google.cloud.iterator.Page`
:returns: The next page in the iterator (or :data:`None` if
there are no pages left).
"""
while True:
self.next_page()
for entity in self._page:
yield entity
if not self._more_results:
break
num_results = len(self._page)
if self._limit is not None:
self._limit -= num_results
if self._offset is not None and self._skipped_results is not None:
# NOTE: The offset goes down relative to the location
# because we are updating the cursor each time.
self._offset -= self._skipped_results
if not self._more_results:
return None

pb = self._build_protobuf()
transaction = self.client.current_transaction

query_results = self.client.connection.run_query(
query_pb=pb,
project=self._query.project,
namespace=self._query.namespace,
transaction_id=transaction and transaction.id,
)
entity_pbs = self._process_query_results(*query_results)
return Page(self, entity_pbs, self._item_to_value)


def _pb_from_query(query):
Expand Down Expand Up @@ -540,3 +565,21 @@ def _pb_from_query(query):
pb.distinct_on.add().name = distinct_on_name

return pb


# pylint: disable=unused-argument
def _item_to_entity(iterator, entity_pb):
"""Convert a raw protobuf entity to the native object.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type entity_pb:
:class:`google.cloud.datastore._generated.entity_pb2.Entity`
:param entity_pb: An entity protobuf to convert to a native entity.
:rtype: :class:`~google.cloud.datastore.entity.Entity`
:returns: The next entity in the page.
"""
return helpers.entity_from_protobuf(entity_pb)
# pylint: enable=unused-argument
Loading

0 comments on commit 1abfcbe

Please sign in to comment.