Skip to content

Commit

Permalink
move subscription methods up a level
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed May 29, 2015
1 parent 5aff671 commit 6c95ee7
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 303 deletions.
2 changes: 1 addition & 1 deletion lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ util.arrayize = arrayize;
*/
function format(template, args) {
return template.replace(/{([^}]*)}/g, function(match, key) {
return args[key] || match;
return key in args ? args[key] : match;
});
}

Expand Down
176 changes: 163 additions & 13 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,123 @@ PubSub.prototype.createTopic = function(name, callback) {
});
};

/**
* Create a subscription to a topic. You may optionally provide an object to
* customize the subscription.
*
* Your provided callback will either be invoked with an error object, if an API
* error occurred, or a {@linkcode module:pubsub/subscription} object.
*
* @throws {Error} If a Topic instance or topic name is not provided.
* @throws {Error} If a subName is not provided.
*
* @param {module:pubsub/topic|string} - topic - The Topic to create a
* subscription to.
* @param {string} subName - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
* a message that you must ack a message before it is redelivered.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {boolean} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {function} callback - The callback function.
*
* @example
* // Without specifying any options.
* var topic = pubsub.topic('messageCenter');
* var name = 'newMessages';
*
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*
* // With options.
* pubsub.subscribe(topic, name, {
* ackDeadlineSeconds: 90,
* autoAck: true,
* interval: 30
* }, function(err, subscription, apiResponse) {});
*/
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
throw new Error('A Topic is required for a new subscription.');
}

if (!subName || !util.is(subName, 'string')) {
throw new Error('A subscription name is required for a new subscription.');
}

if (!callback) {
callback = options;
options = {};
}

options = options || {};

if (util.is(topic, 'string')) {
topic = this.topic(topic);
}

var body = {
topic: topic.name
};

if (options.ackDeadlineSeconds) {
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
}

var subscription = this.subscription(subName, options);

this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
if (err && !(err.code === 409 && options.reuseExisting)) {
callback(err, null, result);
return;
}

callback(null, subscription, result);
});
};

/**
* Create a Subscription object in reference to an existing subscription. This
* command by itself will not run any API requests. You will receive a
* {@linkcode module:pubsub/subscription} object, which will allow you to
* interact with your subscription.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled.
* @param {number} options.interval - Interval in milliseconds to check for new
* messages.
* @return {module:pubsub/subscription}
*
* @example
* var subscription = pubsub.subscription('my-existing-subscription');
*
* // Register a listener for `message` events.
* subscription.on('message', function(message) {
* // Called every time a message is received.
* // message.id = ID used to acknowledge its receival.
* // message.data = Contents of the message.
* // message.attributes = Attributes of the message.
* });
*/
PubSub.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
}

options = options || {};
options.name = name;
return new Subscription(this, options);
};

/**
* Create a Topic object to reference an existing topic.
*
Expand Down Expand Up @@ -194,9 +311,11 @@ PubSub.prototype.topic = function(name, options) {
* You may optionally provide a query object as the first argument to customize
* the response.
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.pageSize - Maximum number of results to return.
* @param {object=} options - Configuration object.
* @param {string} options.topic - The name of the topic to list subscriptions
* from.
* @param {number} options.pageSize - Maximum number of results to return.
* @param {string} options.pageToken - Page token.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -205,38 +324,69 @@ PubSub.prototype.topic = function(name, options) {
* // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
* };
*
* // Get all subscriptions.
* // Get all subscriptions for this project.
* pubsub.getSubscriptions(callback);
*
* // Customize the query.
* pubsub.getSubscriptions({
* pageSize: 10
* pageSize: 3
* }, callback);
*/
PubSub.prototype.getSubscriptions = function(query, callback) {
PubSub.prototype.getSubscriptions = function(options, callback) {
var self = this;

if (!callback) {
callback = query;
query = {};
callback = options;
options = {};
}

var path = this.projectName + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
options = options || {};

var topicName;

if (util.is(options.topic, 'string')) {
topicName = options.topic;
} else if (options.topic instanceof Topic) {
topicName = options.topic.unformattedName;
}

var query = {};

if (options.pageSize) {
query.pageSize = options.pageSize;
}

if (options.pageToken) {
query.pageToken = options.pageToken;
}

var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
projectPath: 'projects/' + this.projectId,
topicPath: topicName ? '/topics/' + topicName : ''
});

this.makeReq_('GET', apiPath, query, null, function(err, result) {
if (err) {
callback(err, null, null, result);
return;
}

var subscriptions = (result.subscriptions || []).map(function(item) {
var subscriptions = (result.subscriptions || []).map(function(sub) {
return new Subscription(self, {
name: item.name
// Depending on if we're using a subscriptions.list or
// topics.subscriptions.list API endpoint, we will get back a
// Subscription resource or just the name of the subscription.
name: sub.name || sub
});
});

var nextQuery = null;

if (result.nextPageToken) {
nextQuery = query;
nextQuery = options;
nextQuery.pageToken = result.nextPageToken;
}

callback(null, subscriptions, nextQuery, result);
});
};
Expand Down
1 change: 1 addition & 0 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ function Subscription(pubsub, options) {
events.EventEmitter.call(this);

this.name = Subscription.formatName_(pubsub.projectId, options.name);

this.makeReq_ = pubsub.makeReq_.bind(pubsub);

this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
Expand Down
Loading

0 comments on commit 6c95ee7

Please sign in to comment.