diff --git a/lib/common/util.js b/lib/common/util.js index 84a4f0d73cb..f454afc3f21 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -124,7 +124,7 @@ util.arrayize = arrayize; */ function format(template, args) { return template.replace(/{([^}]*)}/g, function(match, key) { - return args[key] || match; + return key in args ? args[key] : match; }); } diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 0f15df8e8e5..58faa8072c2 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -162,6 +162,124 @@ PubSub.prototype.createTopic = function(name, callback) { }); }; +/** + * Create a subscription to this topic. You may optionally provide an object to + * customize the subscription. + * + * Your provided callback will either be invoked with an error object, if an API + * error occurred, or a {@linkcode module:pubsub/subscription} object. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the subscription. + * @param {object=} options - Configuration object. + * @param {number=} options.ackDeadlineSeconds - The maximum time after + * receiving a message that you must ack a message before it is redelivered. + * @param {boolean=} options.autoAck - Automatically acknowledge the message + * once it's pulled. (default: false) + * @param {number=} options.interval - Interval in milliseconds to check for new + * messages. (default: 10) + * @param {string=} options.projectId - The projectId where the topic resource + * exists. + * @param {boolean=} options.reuseExisting - If the subscription already exists, + * reuse it. The options of the existing subscription are not changed. If + * false, attempting to create a subscription that already exists will fail. + * (default: false) + * @param {function} callback - The callback function. + * + * @example + * // Without specifying any options. + * var topic = 'messageTopic'; + * var sub = 'messageSubscription'; + * + * pubsub.subscribe(topic, sub, function(err, subscription, apiResponse) {}); + * + * // With options. + * pubsub.subscribe(topic, sub, { + * ackDeadlineSeconds: 90, + * autoAck: true, + * interval: 30 + * }, function(err, subscription, apiResponse) {}); + */ +PubSub.prototype.subscribe = function(topicName, subName, options, callback) { + var self = this; + + if (!topicName) { + throw new Error('A Topic name is required for a new subscription.'); + } + + if (!subName) { + throw new Error('A Subscription name is required for a new subscription.'); + } + + if (!callback) { + callback = options; + options = {}; + } + + var body = { + topic: topicName + }; + + if (options.ackDeadlineSeconds) { + body.ackDeadlineSeconds = options.ackDeadlineSeconds; + } + + var projectId = options.projectId || this.projectId; + + var path = Subscription.formatName_(projectId, subName); + + this.makeReq_('PUT', path, null, body, function(err, result) { + if (options.reuseExisting && err && err.code === 409) { + callback(null, self.subscription(subName, options), result); + } else if (err) { + callback(err, null, result); + } else { + callback(null, self.subscription(subName, options), result); + } + }); +}; + +/** + * Create a Subscription object in reference to an existing subscription. This + * command by itself will not run any API requests. You will receive a + * {@linkcode module:pubsub/subscription} object, which will allow you to + * interact with your subscription. + * + * @throws {Error} If a name is not provided. + * + * @param {string} name - The name of the subscription. + * @param {object=} options - Configuration object. + * @param {boolean=} options.autoAck - Automatically acknowledge the message + * once it's pulled. + * @param {number=} options.interval - Interval in milliseconds to check for new + * messages. + * @param {string=} options.projectId - The projectId where the subscription + * exists. + * @return {module:pubsub/subscription} + * + * @example + * var subscription = pubsub.subscription('my-existing-subscription'); + * + * // Register a listener for `message` events. + * subscription.on('message', function(message) { + * // Called every time a message is received. + * // message.id = ID used to acknowledge its receival. + * // message.data = Contents of the message. + * // message.attributes = Attributes of the message. + * }); + */ +PubSub.prototype.subscription = function(name, options) { + if (!name) { + throw new Error('The name of a subscription is required.'); + } + + options = options || {}; + options.name = name; + + return new Subscription(this, options); +}; + /** * Create a Topic object to reference an existing topic. * @@ -195,9 +313,13 @@ PubSub.prototype.topic = function(name, options) { * You may optionally provide a query object as the first argument to customize * the response. * - * @param {object=} query - Query object. - * @param {string=} query.pageToken - Page token. - * @param {number=} query.pageSize - Maximum number of results to return. + * @param {object=} options - Query object. + * @param {string=} options.projectId - The projectId where the topic + * resource exists. + * @param {string=} options.topic - The name of the topic to list subscriptions + * from. + * @param {number=} options.pageSize - Maximum number of results to return. + * @param {string=} options.pageToken - Page token. * @param {function} callback - The callback function. * * @example @@ -214,30 +336,58 @@ PubSub.prototype.topic = function(name, options) { * pageSize: 10 * }, callback); */ -PubSub.prototype.getSubscriptions = function(query, callback) { +PubSub.prototype.getSubscriptions = function(options, callback) { var self = this; + if (!callback) { - callback = query; - query = {}; + callback = options; + options = {}; } - var path = this.projectName + '/subscriptions'; - this.makeReq_('GET', path, query, true, function(err, result) { + options = options || {}; + + var query = {}; + + if (options.pageSize) { + query.pageSize = options.pageSize; + } + + if (options.pageToken) { + query.pageToken = options.pageToken; + } + + var projectId = options.projectId || this.projectId; + + var apiPath = util.format('{projectPath}{topicPath}/subscriptions', { + projectPath: 'projects/' + projectId, + topicPath: options.topic ? '/topics/' + options.topic : '' + }); + + this.makeReq_('GET', apiPath, query, null, function(err, result) { if (err) { callback(err, null, null, result); return; } - var subscriptions = (result.subscriptions || []).map(function(item) { + var subscriptions = (result.subscriptions || []).map(function(sub) { + // Depending on if we're using a subscriptions.list or + // topics.subscriptions.list API endpoint, we will get back a Subscription + // resource or just the name of the subscription. + var subName = sub.name || sub; + return new Subscription(self, { - name: item.name + projectId: projectId, + name: subName }); }); + var nextQuery = null; + if (result.nextPageToken) { - nextQuery = query; + nextQuery = options; nextQuery.pageToken = result.nextPageToken; } + callback(null, subscriptions, nextQuery, result); }); }; diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 0c91730a584..e87e3077edc 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -114,7 +114,10 @@ var util = require('../common/util.js'); function Subscription(pubsub, options) { events.EventEmitter.call(this); - this.name = Subscription.formatName_(pubsub.projectId, options.name); + var projectId = options.projectId || pubsub.projectId; + + this.name = Subscription.formatName_(projectId, options.name); + this.makeReq_ = pubsub.makeReq_.bind(pubsub); this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index ee913b23b4d..a6289b23fb5 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -26,12 +26,6 @@ */ var util = require('../common/util.js'); -/** - * @type {module:pubsub/subscription} - * @private - */ -var Subscription = require('./subscription.js'); - /*! Developer Documentation * * @param {module:pubsub} pubsub - PubSub object. @@ -60,9 +54,9 @@ function Topic(pubsub, options) { this.name = Topic.formatName_(pubsub.projectId, options.name); this.projectId = pubsub.projectId; this.pubsub = pubsub; + this.unformattedName = options.name; if (options.autoCreate) { - this.unformattedName = options.name; this.origMakeReq_ = this.makeReq_; this.makeReq_ = this.autoCreateWrapper_; } @@ -220,9 +214,9 @@ Topic.prototype.delete = function(callback) { * error occurred, or an array of {@linkcode module:pubsub/subscription} * objects. * - * @param {object=} query - Query object. - * @param {string=} query.pageToken - Page token. - * @param {number=} query.pageSize - Maximum number of results to return. + * @param {object=} options - Configuration object. + * @param {number=} options.pageSize - Maximum number of results to return. + * @param {string=} options.pageToken - Page token. * @param {function} callback - The callback function. * * @example @@ -237,32 +231,16 @@ Topic.prototype.delete = function(callback) { * pageSize: 3 * }, function(err, subscriptions, nextQuery, apiResponse) {}); */ -Topic.prototype.getSubscriptions = function(query, callback) { - var self = this; - if (util.is(query, 'function')) { - callback = query; - query = {}; +Topic.prototype.getSubscriptions = function(options, callback) { + if (!callback) { + callback = options; + options = {}; } - var path = this.name + '/subscriptions'; - this.makeReq_('GET', path, query, true, function(err, result) { - if (err) { - callback(err, null, null, result); - return; - } + options.projectId = this.pubsub.projectId; + options.topic = this.unformattedName; - var subscriptions = (result.subscriptions || []).map(function(name) { - return new Subscription(self, { - name: name - }); - }); - var nextQuery = null; - if (result.nextPageToken) { - nextQuery = query; - nextQuery.pageToken = result.nextPageToken; - } - callback(null, subscriptions, nextQuery, result); - }); + this.pubsub.getSubscriptions(options, callback); }; /** @@ -300,34 +278,17 @@ Topic.prototype.getSubscriptions = function(query, callback) { * }, function(err, subscription, apiResponse) {}); */ Topic.prototype.subscribe = function(name, options, callback) { - var self = this; - if (!name) { - throw new Error('A name is required for a new subscription.'); - } if (!callback) { callback = options; options = {}; } - var body = { - topic: this.name - }; - - if (options.ackDeadlineSeconds) { - body.ackDeadlineSeconds = options.ackDeadlineSeconds; - } + var topicName = this.name; - var path = Subscription.formatName_(this.projectId, name); + options = options || {}; + options.projectId = this.pubsub.projectId; - this.makeReq_('PUT', path, null, body, function(err, result) { - if (options.reuseExisting && err && err.code === 409) { - callback(null, self.subscription(name, options), result); - } else if (err) { - callback(err, null, result); - } else { - callback(null, self.subscription(name, options), result); - } - }); + return this.pubsub.subscribe(topicName, name, options, callback); }; /** @@ -358,12 +319,9 @@ Topic.prototype.subscribe = function(name, options, callback) { * }); */ Topic.prototype.subscription = function(name, options) { - if (!name) { - throw new Error('The name of a subscription is required.'); - } options = options || {}; - options.name = name; - return new Subscription(this.pubsub, options); + options.projectId = this.pubsub.projectId; + return this.pubsub.subscription(name, options); }; module.exports = Topic;