Skip to content

Commit

Permalink
Merge pull request #204 from tseaver/feature-datastore_query_cursor
Browse files Browse the repository at this point in the history
Add cursor handling to datastore.query.Query.
  • Loading branch information
silvolu committed Oct 2, 2014
2 parents f898953 + f7c8066 commit 97278d8
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 7 deletions.
8 changes: 6 additions & 2 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def run_query(self, dataset_id, query_pb, namespace=None):
Under the hood this is doing...
>>> connection.run_query('dataset-id', query.to_protobuf())
[<list of Entity Protobufs>]
[<list of Entity Protobufs>], cursor, more_results, skipped_results
:type dataset_id: string
:param dataset_id: The ID of the dataset over which to run the query.
Expand All @@ -205,7 +205,11 @@ def run_query(self, dataset_id, query_pb, namespace=None):
request.query.CopyFrom(query_pb)
response = self._rpc(dataset_id, 'runQuery', request,
datastore_pb.RunQueryResponse)
return [e.entity for e in response.batch.entity_result]
return ([e.entity for e in response.batch.entity_result],
response.batch.end_cursor,
response.batch.more_results,
response.batch.skipped_results,
)

def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.
Expand Down
47 changes: 46 additions & 1 deletion gcloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from gcloud.datastore import helpers
from gcloud.datastore.entity import Entity
from gcloud.datastore.key import Key
import base64


class Query(object):
Expand Down Expand Up @@ -53,6 +54,7 @@ class Query(object):
def __init__(self, kind=None, dataset=None):
self._dataset = dataset
self._pb = datastore_pb.Query()
self._cursor = None

if kind:
self._pb.kind.add().name = kind
Expand Down Expand Up @@ -303,12 +305,55 @@ def fetch(self, limit=None):
if limit:
clone = self.limit(limit)

entity_pbs = self.dataset().connection().run_query(
(entity_pbs,
end_cursor,
more_results,
skipped_results) = self.dataset().connection().run_query(
query_pb=clone.to_protobuf(), dataset_id=self.dataset().id())

self._cursor = end_cursor
return [Entity.from_protobuf(entity, dataset=self.dataset())
for entity in entity_pbs]

def cursor(self):
"""Returns cursor ID
.. Caution:: Invoking this method on a query that has not yet been
executed will raise a RuntimeError.
:rtype: string
:returns: base64-encoded cursor ID string denoting the last position
consumed in the query's result set.
"""
if not self._cursor:
raise RuntimeError('No cursor')
return base64.b64encode(self._cursor)

def with_cursor(self, start_cursor, end_cursor=None):
"""Specifies the starting / ending positions in a query's result set.
:type start_cursor: bytes
:param start_cursor: Base64-encoded cursor string specifying where to
start reading query results.
:type end_cursor: bytes
:param end_cursor: Base64-encoded cursor string specifying where to stop
reading query results.
:rtype: :class:`Query`
:returns: If neither cursor is passed, returns self; else, returns a
clone of the :class:`Query`, with cursors updated.
"""
clone = self
if start_cursor or end_cursor:
clone = self._clone()
if start_cursor:
clone._pb.start_cursor = base64.b64decode(start_cursor)
if end_cursor:
clone._pb.end_cursor = base64.b64decode(end_cursor)
return clone

def order(self, *properties):
"""Adds a sort order to the query.
Expand Down
10 changes: 7 additions & 3 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ def test_run_query_wo_namespace_empty_result(self):
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.run_query(DATASET_ID, q_pb), [])
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, '')
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self.assertEqual(cw['uri'], URI)
self.assertEqual(cw['method'], 'POST')
Expand Down Expand Up @@ -357,8 +361,8 @@ def test_run_query_w_namespace_nonempty_result(self):
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
result = conn.run_query(DATASET_ID, q_pb, 'NS')
returned, = result # One entity.
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb, 'NS')
returned, = pbs, # One entity.
cw = http._called_with
self.assertEqual(cw['uri'], URI)
self.assertEqual(cw['method'], 'POST')
Expand Down
82 changes: 81 additions & 1 deletion gcloud/datastore/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_fetch_default_limit(self):

def test_fetch_explicit_limit(self):
from gcloud.datastore.datastore_v1_pb2 import Entity
_CURSOR = 'CURSOR'
_DATASET = 'DATASET'
_KIND = 'KIND'
_ID = 123
Expand All @@ -213,10 +214,12 @@ def test_fetch_explicit_limit(self):
prop.name = 'foo'
prop.value.string_value = 'Foo'
connection = _Connection(entity_pb)
connection._cursor = _CURSOR
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
limited = query.limit(13)
entities = query.fetch(13)
self.assertEqual(query._cursor, _CURSOR)
self.assertEqual(len(entities), 1)
self.assertEqual(entities[0].key().path(),
[{'kind': _KIND, 'id': _ID}])
Expand All @@ -225,6 +228,80 @@ def test_fetch_explicit_limit(self):
'query_pb': limited.to_protobuf(),
})

def test_cursor_not_fetched(self):
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
self.assertRaises(RuntimeError, query.cursor)

def test_cursor_fetched(self):
import base64
_CURSOR = 'CURSOR'
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
query._cursor = _CURSOR
self.assertEqual(query.cursor(), base64.b64encode(_CURSOR))

def test_with_cursor_neither(self):
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
self.assertTrue(query.with_cursor(None) is query)

def test_with_cursor_w_start(self):
import base64
_CURSOR = 'CURSOR'
_CURSOR_B64 = base64.b64encode(_CURSOR)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(_CURSOR_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, _CURSOR)
self.assertEqual(q_pb.end_cursor, '')

def test_with_cursor_w_end(self):
import base64
_CURSOR = 'CURSOR'
_CURSOR_B64 = base64.b64encode(_CURSOR)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(None, _CURSOR_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, '')
self.assertEqual(q_pb.end_cursor, _CURSOR)

def test_with_cursor_w_both(self):
import base64
_START = 'START'
_START_B64 = base64.b64encode(_START)
_END = 'CURSOR'
_END_B64 = base64.b64encode(_END)
_DATASET = 'DATASET'
_KIND = 'KIND'
connection = _Connection()
dataset = _Dataset(_DATASET, connection)
query = self._makeOne(_KIND, dataset)
after = query.with_cursor(_START_B64, _END_B64)
self.assertFalse(after is query)
q_pb = after.to_protobuf()
self.assertEqual(q_pb.start_cursor, _START)
self.assertEqual(q_pb.end_cursor, _END)

def test_order_empty(self):
_KIND = 'KIND'
before = self._makeOne(_KIND)
Expand Down Expand Up @@ -285,10 +362,13 @@ def connection(self):

class _Connection(object):
_called_with = None
_cursor = ''
_more = True
_skipped = 0

def __init__(self, *result):
self._result = list(result)

def run_query(self, **kw):
self._called_with = kw
return self._result
return self._result, self._cursor, self._more, self._skipped

0 comments on commit 97278d8

Please sign in to comment.