diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index 4574ba836ee9..5961ed00b29a 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -186,7 +186,7 @@ def run_query(self, dataset_id, query_pb, namespace=None): Under the hood this is doing... >>> connection.run_query('dataset-id', query.to_protobuf()) - [] + [], cursor, more_results, skipped_results :type dataset_id: string :param dataset_id: The ID of the dataset over which to run the query. @@ -205,7 +205,11 @@ def run_query(self, dataset_id, query_pb, namespace=None): request.query.CopyFrom(query_pb) response = self._rpc(dataset_id, 'runQuery', request, datastore_pb.RunQueryResponse) - return [e.entity for e in response.batch.entity_result] + return ([e.entity for e in response.batch.entity_result], + response.batch.end_cursor, + response.batch.more_results, + response.batch.skipped_results, + ) def lookup(self, dataset_id, key_pbs): """Lookup keys from a dataset in the Cloud Datastore. diff --git a/gcloud/datastore/query.py b/gcloud/datastore/query.py index 5cbed650a0f2..6f1a49472996 100644 --- a/gcloud/datastore/query.py +++ b/gcloud/datastore/query.py @@ -4,6 +4,7 @@ from gcloud.datastore import helpers from gcloud.datastore.entity import Entity from gcloud.datastore.key import Key +import base64 class Query(object): @@ -53,6 +54,7 @@ class Query(object): def __init__(self, kind=None, dataset=None): self._dataset = dataset self._pb = datastore_pb.Query() + self._cursor = None if kind: self._pb.kind.add().name = kind @@ -303,12 +305,55 @@ def fetch(self, limit=None): if limit: clone = self.limit(limit) - entity_pbs = self.dataset().connection().run_query( + (entity_pbs, + end_cursor, + more_results, + skipped_results) = self.dataset().connection().run_query( query_pb=clone.to_protobuf(), dataset_id=self.dataset().id()) + self._cursor = end_cursor return [Entity.from_protobuf(entity, dataset=self.dataset()) for entity in entity_pbs] + def cursor(self): + """Returns cursor ID + + .. Caution:: Invoking this method on a query that has not yet been + executed will raise a RuntimeError. + + :rtype: string + :returns: base64-encoded cursor ID string denoting the last position + consumed in the query's result set. + """ + if not self._cursor: + raise RuntimeError('No cursor') + return base64.b64encode(self._cursor) + + def with_cursor(self, start_cursor, end_cursor=None): + """Specifies the starting / ending positions in a query's result set. + + :type start_cursor: bytes + :param start_cursor: Base64-encoded cursor string specifying where to + start reading query results. + + :type end_cursor: bytes + :param end_cursor: Base64-encoded cursor string specifying where to stop + reading query results. + + :rtype: :class:`Query` + :returns: If neither cursor is passed, returns self; else, returns a + clone of the :class:`Query`, with cursors updated. + + """ + clone = self + if start_cursor or end_cursor: + clone = self._clone() + if start_cursor: + clone._pb.start_cursor = base64.b64decode(start_cursor) + if end_cursor: + clone._pb.end_cursor = base64.b64decode(end_cursor) + return clone + def order(self, *properties): """Adds a sort order to the query. diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index 51bd41e7f63c..b243e52cb2a4 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -322,7 +322,11 @@ def test_run_query_wo_namespace_empty_result(self): 'runQuery', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - self.assertEqual(conn.run_query(DATASET_ID, q_pb), []) + pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb) + self.assertEqual(pbs, []) + self.assertEqual(end, '') + self.assertTrue(more) + self.assertEqual(skipped, 0) cw = http._called_with self.assertEqual(cw['uri'], URI) self.assertEqual(cw['method'], 'POST') @@ -357,8 +361,8 @@ def test_run_query_w_namespace_nonempty_result(self): 'runQuery', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - result = conn.run_query(DATASET_ID, q_pb, 'NS') - returned, = result # One entity. + pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb, 'NS') + returned, = pbs, # One entity. cw = http._called_with self.assertEqual(cw['uri'], URI) self.assertEqual(cw['method'], 'POST') diff --git a/gcloud/datastore/test_query.py b/gcloud/datastore/test_query.py index b3f853b32569..9219bf58d6de 100644 --- a/gcloud/datastore/test_query.py +++ b/gcloud/datastore/test_query.py @@ -202,6 +202,7 @@ def test_fetch_default_limit(self): def test_fetch_explicit_limit(self): from gcloud.datastore.datastore_v1_pb2 import Entity + _CURSOR = 'CURSOR' _DATASET = 'DATASET' _KIND = 'KIND' _ID = 123 @@ -213,10 +214,12 @@ def test_fetch_explicit_limit(self): prop.name = 'foo' prop.value.string_value = 'Foo' connection = _Connection(entity_pb) + connection._cursor = _CURSOR dataset = _Dataset(_DATASET, connection) query = self._makeOne(_KIND, dataset) limited = query.limit(13) entities = query.fetch(13) + self.assertEqual(query._cursor, _CURSOR) self.assertEqual(len(entities), 1) self.assertEqual(entities[0].key().path(), [{'kind': _KIND, 'id': _ID}]) @@ -225,6 +228,80 @@ def test_fetch_explicit_limit(self): 'query_pb': limited.to_protobuf(), }) + def test_cursor_not_fetched(self): + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + self.assertRaises(RuntimeError, query.cursor) + + def test_cursor_fetched(self): + import base64 + _CURSOR = 'CURSOR' + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + query._cursor = _CURSOR + self.assertEqual(query.cursor(), base64.b64encode(_CURSOR)) + + def test_with_cursor_neither(self): + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + self.assertTrue(query.with_cursor(None) is query) + + def test_with_cursor_w_start(self): + import base64 + _CURSOR = 'CURSOR' + _CURSOR_B64 = base64.b64encode(_CURSOR) + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + after = query.with_cursor(_CURSOR_B64) + self.assertFalse(after is query) + q_pb = after.to_protobuf() + self.assertEqual(q_pb.start_cursor, _CURSOR) + self.assertEqual(q_pb.end_cursor, '') + + def test_with_cursor_w_end(self): + import base64 + _CURSOR = 'CURSOR' + _CURSOR_B64 = base64.b64encode(_CURSOR) + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + after = query.with_cursor(None, _CURSOR_B64) + self.assertFalse(after is query) + q_pb = after.to_protobuf() + self.assertEqual(q_pb.start_cursor, '') + self.assertEqual(q_pb.end_cursor, _CURSOR) + + def test_with_cursor_w_both(self): + import base64 + _START = 'START' + _START_B64 = base64.b64encode(_START) + _END = 'CURSOR' + _END_B64 = base64.b64encode(_END) + _DATASET = 'DATASET' + _KIND = 'KIND' + connection = _Connection() + dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset) + after = query.with_cursor(_START_B64, _END_B64) + self.assertFalse(after is query) + q_pb = after.to_protobuf() + self.assertEqual(q_pb.start_cursor, _START) + self.assertEqual(q_pb.end_cursor, _END) + def test_order_empty(self): _KIND = 'KIND' before = self._makeOne(_KIND) @@ -285,10 +362,13 @@ def connection(self): class _Connection(object): _called_with = None + _cursor = '' + _more = True + _skipped = 0 def __init__(self, *result): self._result = list(result) def run_query(self, **kw): self._called_with = kw - return self._result + return self._result, self._cursor, self._more, self._skipped