Skip to content

Commit

Permalink
streamrouter: implement across datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jun 26, 2015
1 parent 71bdb02 commit 0cd7f1c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 186 deletions.
127 changes: 52 additions & 75 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

'use strict';

var streamEvents = require('stream-events');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
}
});
var through = require('through2');

/**
* @type {module:datastore/entity}
Expand All @@ -40,6 +38,12 @@ var entity = require('./entity.js');
*/
var pb = require('./pb.js');

/**
* @type {module:common/streamrouter}
* @private
*/
var streamRouter = require('../common/stream-router.js');

/**
* @type {module:common/util}
* @private
Expand Down Expand Up @@ -461,9 +465,10 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* supported.
*
* If you provide a callback, the query is run, and the results are returned as
* the second argument to your callback. A third argument will also exist, which
* is the `endCursor` of the previously-run query. You can use this to extend
* the query you just ran to see if more results exist.
* the second argument to your callback. A third argument may also exist, which
* is a query object that uses the end cursor from the previous query as the
* starting cursor for the next query. You can pass that object back to this
* method to see if more results exist.
*
* You may also omit the callback to this function to trigger streaming mode.
*
Expand All @@ -472,14 +477,14 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* @param {module:datastore/query} q - Query object.
* @param {function=} callback - The callback function. If omitted, a readable
* stream instance is returned.
* @param {?Error} callback.err - An error returned while making this request
* @param {?error} callback.err - An error returned while making this request
* (may be null).
* @param {Array} callback.entities - The list of entities returned by this
* @param {array} callback.entities - The list of entities returned by this
* query. Note that this is a single page of entities, not necessarily
* all of the entities.
* @param {String} callback.endCursor - The end cursor of this current query,
* which can be used as the starting cursor of the next query.
* @param {Object} callback.apiResponse - The full API response.
* @param {?module:datastore/query} callback.nextQuery - If present, run another
* query with this object to check for more results.
* @param {object} callback.apiResponse - The full API response.
*
* @example
* //-
Expand All @@ -488,90 +493,55 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* //-
* var query = dataset.createQuery('Lion');
*
* // Retrieve 5 companies.
* transaction.runQuery(query, function(err, entities, endCursor, apiResponse) {
* // Use `endCursor` as the starting cursor for your next query.
* var nextQuery = query.start(endCursor);
* var callback = function(err, entities, endCursor, apiResponse) {};
* transaction.runQuery(nextQuery, callback);
* });
* var callback = function(err, entities, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results might exist.
* transaction.runQuery(nextQuery, callback);
* }
* };
*
* transaction.runQuery(query, callback);
*
* //-
* // If you omit the callback, runQuery will automatically call subsequent
* // queries until no results remain. Entity objects will be pushed as they are
* // found.
* //-
* transaction.runQuery(query)
* .on('data', function (entity) {});
* .on('error', console.error)
* .on('data', function (entity) {})
* .on('end', function() {
* // All entities retrieved.
* });
*/
DatastoreRequest.prototype.runQuery = function(q, callback) {
var that = this;
var stream;
var resultsToSend = q.limitVal;

DatastoreRequest.prototype.runQuery = function(query, callback) {
var req = {
read_options: {},
query: entity.queryToQueryProto(q)
query: entity.queryToQueryProto(query)
};

if (q.namespace) {
if (query.namespace) {
req.partition_id = {
namespace: q.namespace
namespace: query.namespace
};
}

if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());
stream.once('reading', runQuery);
return stream;
} else {
runQuery();
}

function runQuery() {
that.makeReq_('runQuery', req, function(err, resp) {
if (err) {
if (stream) {
stream.emit('error', err, resp);
stream.end();
} else {
callback(err, null, null, resp);
}
return;
}

var entities = entity.formatArray(resp.batch.entity_result);

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}

if (!stream) {
callback(null, entities, cursor, resp);
return;
}

if (!cursor || entities.length === 0) {
stream.end();
return;
}
this.makeReq_('runQuery', req, function(err, resp) {
if (err) {
callback(err, null, null, resp);
return;
}

var result;
while ((result = entities.shift()) && resultsToSend !== 0) {
stream.push(result);
resultsToSend--;
}
var entities = entity.formatArray(resp.batch.entity_result);
var nextQuery = null;

if (resultsToSend === 0) {
stream.end();
return;
}
if (resp.batch.end_cursor && entities.length > 0) {
var endCursor = resp.batch.end_cursor.toBase64();
nextQuery = query.start(endCursor).offset(0);
}

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));
runQuery();
});
}
callback(null, entities, nextQuery, resp);
});
};

/**
Expand Down Expand Up @@ -749,4 +719,11 @@ DatastoreRequest.prototype.makeReq_ = function(method, body, callback) {
});
};

/*! Developer Documentation
*
* This method can be used with either a callback or as a readable object
* stream. `streamRouter` is used to add this dual behavior.
*/
streamRouter.extend(DatastoreRequest, 'runQuery');

module.exports = DatastoreRequest;
52 changes: 28 additions & 24 deletions system-test/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,17 @@ describe('datastore', function() {
});

it('should limit queries', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.limit(5);
ds.runQuery(q, function(err, firstEntities, firstEndCursor) {
var q = ds.createQuery('Character').hasAncestor(ancestor).limit(5);

ds.runQuery(q, function(err, firstEntities, secondQuery) {
assert.ifError(err);
assert.equal(firstEntities.length, 5);
var secondQ = q.start(firstEndCursor);
ds.runQuery(secondQ, function(err, secondEntities, secondEndCursor) {

ds.runQuery(secondQuery, function(err, secondEntities, thirdQuery) {
assert.ifError(err);
assert.equal(secondEntities.length, 3);
var thirdQ = q.start(secondEndCursor);
ds.runQuery(thirdQ, function(err, thirdEntities) {

ds.runQuery(thirdQuery, function(err, thirdEntities) {
assert.ifError(err);
assert.equal(thirdEntities.length, 0);
done();
Expand Down Expand Up @@ -398,17 +398,22 @@ describe('datastore', function() {
});

it('should paginate with offset and limit', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.offset(2)
.limit(3)
.order('appearances');
ds.runQuery(q, function(err, entities, endCursor) {
var q = ds.createQuery('Character')
.hasAncestor(ancestor)
.offset(2)
.limit(3)
.order('appearances');

ds.runQuery(q, function(err, entities, secondQuery) {
assert.ifError(err);

assert.equal(entities.length, 3);
assert.equal(entities[0].data.name, 'Robb');
assert.equal(entities[2].data.name, 'Catelyn');
var secondQuery = q.start(endCursor).offset(0);
ds.runQuery(secondQuery, function(err, secondEntities) {

ds.runQuery(secondQuery.offset(0), function(err, secondEntities) {
assert.ifError(err);

assert.equal(secondEntities.length, 3);
assert.equal(secondEntities[0].data.name, 'Sansa');
assert.equal(secondEntities[2].data.name, 'Arya');
Expand All @@ -418,17 +423,16 @@ describe('datastore', function() {
});

it('should resume from a start cursor', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.offset(2)
.limit(2)
.order('appearances');
ds.runQuery(q, function(err, entities, endCursor) {
var q = ds.createQuery('Character')
.hasAncestor(ancestor)
.offset(2)
.limit(2)
.order('appearances');

ds.runQuery(q, function(err, entities, nextQuery) {
assert.ifError(err);
var cursorQuery =
ds.createQuery('Character').hasAncestor(ancestor)
.order('appearances')
.start(endCursor);
ds.runQuery(cursorQuery, function(err, secondEntities) {

ds.runQuery(nextQuery.limit(-1), function(err, secondEntities) {
assert.ifError(err);
assert.equal(secondEntities.length, 4);
assert.equal(secondEntities[0].data.name, 'Catelyn');
Expand Down
Loading

0 comments on commit 0cd7f1c

Please sign in to comment.