Skip to content

Commit

Permalink
pubsub: add support for snapshots + seek (#2200)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and stephenplusplus committed Apr 14, 2017
1 parent 352aa03 commit 3498972
Show file tree
Hide file tree
Showing 8 changed files with 1,025 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
],
"dependencies": {
"@google-cloud/common": "^0.13.0",
"@google-cloud/common-grpc": "^0.3.0",
"@google-cloud/common-grpc": "^0.3.1",
"arrify": "^1.0.0",
"extend": "^3.0.0",
"google-gax": "^0.13.0",
Expand Down
152 changes: 148 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ var extend = require('extend');
var is = require('is');
var util = require('util');

/**
* @type {module:pubsub/snapshot}
* @private
*/
var Snapshot = require('./snapshot.js');

/**
* @type {module:pubsub/subscription}
* @private
Expand Down Expand Up @@ -136,6 +142,120 @@ PubSub.prototype.createTopic = function(name, callback) {
});
};

/**
* Get a list of snapshots.
*
* @param {object=} options - Configuration object.
* @param {boolean} options.autoPaginate - Have pagination handled
* automatically. Default: true.
* @param {number} options.maxApiCalls - Maximum number of API calls to make.
* @param {number} options.maxResults - Maximum number of results to return.
* @param {number} options.pageSize - Maximum number of results to return.
* @param {string} options.pageToken - Page token.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error from the API call, may be null.
* @param {module:pubsub/snapshot[]} callback.snapshots - The list of snapshots
* in your project.
*
* @example
* pubsub.getSnapshots(function(err, snapshots) {
* if (!err) {
* // snapshots is an array of Snapshot objects.
* }
* });
*
* //-
* // To control how many API requests are made and page through the results
* // manually, set `autoPaginate` to `false`.
* //-
* var callback = function(err, snapshots, nextQuery, apiResponse) {
* if (nextQuery) {
* // More results exist.
* pubsub.getSnapshots(nextQuery, callback);
* }
* };
*
* pubsub.getSnapshots({
* autoPaginate: false
* }, callback);
*
* //-
* // If the callback is omitted, we'll return a Promise.
* //-
* pubsub.getSnapshots().then(function(data) {
* var snapshots = data[0];
* });
*/
PubSub.prototype.getSnapshots = function(options, callback) {
var self = this;

if (is.fn(options)) {
callback = options;
options = {};
}

var protoOpts = {
service: 'Subscriber',
method: 'listSnapshots'
};

var reqOpts = extend({}, options);

reqOpts.project = 'projects/' + this.projectId;

this.request(protoOpts, reqOpts, function(err, resp) {
if (err) {
callback(err, null, null, resp);
return;
}

var snapshots = arrify(resp.snapshots).map(function(snapshot) {
var snapshotInstance = self.snapshot(snapshot.name);
snapshotInstance.metadata = snapshot;
return snapshotInstance;
});

var nextQuery = null;

if (resp.nextPageToken) {
nextQuery = options;
nextQuery.pageToken = resp.nextPageToken;
}

callback(null, snapshots, nextQuery, resp);
});
};

/**
* Get a list of the {module:pubsub/snapshot} objects as a readable object
* stream.
*
* @param {object=} options - Configuration object. See
* {module:pubsub#getSnapshots} for a complete list of options.
* @return {stream}
*
* @example
* pubsub.getSnapshotsStream()
* .on('error', console.error)
* .on('data', function(snapshot) {
* // snapshot is a Snapshot object.
* })
* .on('end', function() {
* // All snapshots retrieved.
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* pubsub.getSnapshotsStream()
* .on('data', function(snapshot) {
* this.end();
* });
*/
PubSub.prototype.getSnapshotsStream =
common.paginator.streamify('getSnapshots');

/**
* Get a list of the subscriptions registered to all of your project's topics.
* You may optionally provide a query object as the first argument to customize
Expand Down Expand Up @@ -277,7 +397,7 @@ PubSub.prototype.getSubscriptions = function(options, callback) {
* // unnecessary processing and API requests.
* //-
* pubsub.getSubscriptionsStream()
* .on('data', function(topic) {
* .on('data', function(subscription) {
* this.end();
* });
*/
Expand Down Expand Up @@ -540,6 +660,26 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
});
};

/**
* Create a Snapshot object. See {module:pubsub/subscription#createSnapshot} to
* create a snapshot.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the snapshot.
* @return {module:pubsub/snapshot}
*
* @example
* var snapshot = pubsub.snapshot('my-snapshot');
*/
PubSub.prototype.snapshot = function(name) {
if (!is.string(name)) {
throw new Error('You must supply a valid name for the snapshot.');
}

return new Snapshot(this, name);
};

/**
* Create a Subscription object. This command by itself will not run any API
* requests. You will receive a {module:pubsub/subscription} object,
Expand Down Expand Up @@ -589,7 +729,7 @@ PubSub.prototype.subscription = function(name, options) {
};

/**
* Create a Topic object. See {module:pubsub/createTopic} to create a topic.
* Create a Topic object. See {module:pubsub#createTopic} to create a topic.
*
* @throws {Error} If a name is not provided.
*
Expand Down Expand Up @@ -637,15 +777,19 @@ PubSub.prototype.determineBaseUrl_ = function() {
*
* These methods can be auto-paginated.
*/
common.paginator.extend(PubSub, ['getSubscriptions', 'getTopics']);
common.paginator.extend(PubSub, [
'getSnapshots',
'getSubscriptions',
'getTopics'
]);

/*! Developer Documentation
*
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
common.util.promisifyAll(PubSub, {
exclude: ['subscription', 'topic']
exclude: ['snapshot', 'subscription', 'topic']
});

PubSub.Subscription = Subscription;
Expand Down
Loading

0 comments on commit 3498972

Please sign in to comment.