Skip to content

Commit

Permalink
move subscription methods up a level
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed May 20, 2015
1 parent 5dbb86a commit 1a12ede
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 72 deletions.
2 changes: 1 addition & 1 deletion lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ util.arrayize = arrayize;
*/
function format(template, args) {
return template.replace(/{([^}]*)}/g, function(match, key) {
return args[key] || match;
return key in args ? args[key] : match;
});
}

Expand Down
172 changes: 161 additions & 11 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,124 @@ PubSub.prototype.createTopic = function(name, callback) {
});
};

/**
* Create a subscription to this topic. You may optionally provide an object to
* customize the subscription.
*
* Your provided callback will either be invoked with an error object, if an API
* error occurred, or a {@linkcode module:pubsub/subscription} object.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {number=} options.ackDeadlineSeconds - The maximum time after
* receiving a message that you must ack a message before it is redelivered.
* @param {boolean=} options.autoAck - Automatically acknowledge the message
* once it's pulled. (default: false)
* @param {number=} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string=} options.projectId - The projectId where the topic resource
* exists.
* @param {boolean=} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {function} callback - The callback function.
*
* @example
* // Without specifying any options.
* var topic = 'messageTopic';
* var sub = 'messageSubscription';
*
* pubsub.subscribe(topic, sub, function(err, subscription, apiResponse) {});
*
* // With options.
* pubsub.subscribe(topic, sub, {
* ackDeadlineSeconds: 90,
* autoAck: true,
* interval: 30
* }, function(err, subscription, apiResponse) {});
*/
PubSub.prototype.subscribe = function(topicName, subName, options, callback) {
var self = this;

if (!topicName) {
throw new Error('A Topic name is required for a new subscription.');
}

if (!subName) {
throw new Error('A Subscription name is required for a new subscription.');
}

if (!callback) {
callback = options;
options = {};
}

var body = {
topic: topicName
};

if (options.ackDeadlineSeconds) {
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
}

var projectId = options.projectId || this.projectId;

var path = Subscription.formatName_(projectId, subName);

this.makeReq_('PUT', path, null, body, function(err, result) {
if (options.reuseExisting && err && err.code === 409) {
callback(null, self.subscription(subName, options), result);
} else if (err) {
callback(err, null, result);
} else {
callback(null, self.subscription(subName, options), result);
}
});
};

/**
* Create a Subscription object in reference to an existing subscription. This
* command by itself will not run any API requests. You will receive a
* {@linkcode module:pubsub/subscription} object, which will allow you to
* interact with your subscription.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {boolean=} options.autoAck - Automatically acknowledge the message
* once it's pulled.
* @param {number=} options.interval - Interval in milliseconds to check for new
* messages.
* @param {string=} options.projectId - The projectId where the subscription
* exists.
* @return {module:pubsub/subscription}
*
* @example
* var subscription = pubsub.subscription('my-existing-subscription');
*
* // Register a listener for `message` events.
* subscription.on('message', function(message) {
* // Called every time a message is received.
* // message.id = ID used to acknowledge its receival.
* // message.data = Contents of the message.
* // message.attributes = Attributes of the message.
* });
*/
PubSub.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
}

options = options || {};
options.name = name;

return new Subscription(this, options);
};

/**
* Create a Topic object to reference an existing topic.
*
Expand Down Expand Up @@ -195,9 +313,13 @@ PubSub.prototype.topic = function(name, options) {
* You may optionally provide a query object as the first argument to customize
* the response.
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.pageSize - Maximum number of results to return.
* @param {object=} options - Query object.
* @param {string=} options.projectId - The projectId where the topic
* resource exists.
* @param {string=} options.topic - The name of the topic to list subscriptions
* from.
* @param {number=} options.pageSize - Maximum number of results to return.
* @param {string=} options.pageToken - Page token.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -214,30 +336,58 @@ PubSub.prototype.topic = function(name, options) {
* pageSize: 10
* }, callback);
*/
PubSub.prototype.getSubscriptions = function(query, callback) {
PubSub.prototype.getSubscriptions = function(options, callback) {
var self = this;

if (!callback) {
callback = query;
query = {};
callback = options;
options = {};
}

var path = this.projectName + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
options = options || {};

var query = {};

if (options.pageSize) {
query.pageSize = options.pageSize;
}

if (options.pageToken) {
query.pageToken = options.pageToken;
}

var projectId = options.projectId || this.projectId;

var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
projectPath: 'projects/' + projectId,
topicPath: options.topic ? '/topics/' + options.topic : ''
});

this.makeReq_('GET', apiPath, query, null, function(err, result) {
if (err) {
callback(err, null, null, result);
return;
}

var subscriptions = (result.subscriptions || []).map(function(item) {
var subscriptions = (result.subscriptions || []).map(function(sub) {
// Depending on if we're using a subscriptions.list or
// topics.subscriptions.list API endpoint, we will get back a Subscription
// resource or just the name of the subscription.
var subName = sub.name || sub;

return new Subscription(self, {
name: item.name
projectId: projectId,
name: subName
});
});

var nextQuery = null;

if (result.nextPageToken) {
nextQuery = query;
nextQuery = options;
nextQuery.pageToken = result.nextPageToken;
}

callback(null, subscriptions, nextQuery, result);
});
};
Expand Down
5 changes: 4 additions & 1 deletion lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ var util = require('../common/util.js');
function Subscription(pubsub, options) {
events.EventEmitter.call(this);

this.name = Subscription.formatName_(pubsub.projectId, options.name);
var projectId = options.projectId || pubsub.projectId;

this.name = Subscription.formatName_(projectId, options.name);

this.makeReq_ = pubsub.makeReq_.bind(pubsub);

this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
Expand Down
76 changes: 17 additions & 59 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
*/
var util = require('../common/util.js');

/**
* @type {module:pubsub/subscription}
* @private
*/
var Subscription = require('./subscription.js');

/*! Developer Documentation
*
* @param {module:pubsub} pubsub - PubSub object.
Expand Down Expand Up @@ -60,9 +54,9 @@ function Topic(pubsub, options) {
this.name = Topic.formatName_(pubsub.projectId, options.name);
this.projectId = pubsub.projectId;
this.pubsub = pubsub;
this.unformattedName = options.name;

if (options.autoCreate) {
this.unformattedName = options.name;
this.origMakeReq_ = this.makeReq_;
this.makeReq_ = this.autoCreateWrapper_;
}
Expand Down Expand Up @@ -220,9 +214,9 @@ Topic.prototype.delete = function(callback) {
* error occurred, or an array of {@linkcode module:pubsub/subscription}
* objects.
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.pageSize - Maximum number of results to return.
* @param {object=} options - Configuration object.
* @param {number=} options.pageSize - Maximum number of results to return.
* @param {string=} options.pageToken - Page token.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -237,32 +231,16 @@ Topic.prototype.delete = function(callback) {
* pageSize: 3
* }, function(err, subscriptions, nextQuery, apiResponse) {});
*/
Topic.prototype.getSubscriptions = function(query, callback) {
var self = this;
if (util.is(query, 'function')) {
callback = query;
query = {};
Topic.prototype.getSubscriptions = function(options, callback) {
if (!callback) {
callback = options;
options = {};
}

var path = this.name + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
if (err) {
callback(err, null, null, result);
return;
}
options.projectId = this.pubsub.projectId;
options.topic = this.unformattedName;

var subscriptions = (result.subscriptions || []).map(function(name) {
return new Subscription(self, {
name: name
});
});
var nextQuery = null;
if (result.nextPageToken) {
nextQuery = query;
nextQuery.pageToken = result.nextPageToken;
}
callback(null, subscriptions, nextQuery, result);
});
this.pubsub.getSubscriptions(options, callback);
};

/**
Expand Down Expand Up @@ -300,34 +278,17 @@ Topic.prototype.getSubscriptions = function(query, callback) {
* }, function(err, subscription, apiResponse) {});
*/
Topic.prototype.subscribe = function(name, options, callback) {
var self = this;
if (!name) {
throw new Error('A name is required for a new subscription.');
}
if (!callback) {
callback = options;
options = {};
}

var body = {
topic: this.name
};

if (options.ackDeadlineSeconds) {
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
}
var topicName = this.name;

var path = Subscription.formatName_(this.projectId, name);
options = options || {};
options.projectId = this.pubsub.projectId;

this.makeReq_('PUT', path, null, body, function(err, result) {
if (options.reuseExisting && err && err.code === 409) {
callback(null, self.subscription(name, options), result);
} else if (err) {
callback(err, null, result);
} else {
callback(null, self.subscription(name, options), result);
}
});
return this.pubsub.subscribe(topicName, name, options, callback);
};

/**
Expand Down Expand Up @@ -358,12 +319,9 @@ Topic.prototype.subscribe = function(name, options, callback) {
* });
*/
Topic.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
}
options = options || {};
options.name = name;
return new Subscription(this.pubsub, options);
options.projectId = this.pubsub.projectId;
return this.pubsub.subscription(name, options);
};

module.exports = Topic;

0 comments on commit 1a12ede

Please sign in to comment.