Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move subscription methods up a level #618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ util.arrayize = arrayize;
*/
function format(template, args) {
return template.replace(/{([^}]*)}/g, function(match, key) {
return args[key] || match;
return key in args ? args[key] : match;

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

});
}

Expand Down
200 changes: 187 additions & 13 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,138 @@ PubSub.prototype.createTopic = function(name, callback) {
});
};

/**
* Create a subscription to a topic. You may optionally provide an object to
* customize the subscription.
*
* Your provided callback will be invoked with an error object if an API error
* occurred or a {@linkcode module:pubsub/subscription} object.
*
* @throws {Error} If a Topic instance or topic name is not provided.
* @throws {Error} If a subName is not provided.
*
* @param {module:pubsub/topic|string} - topic - The Topic to create a
* subscription to.
* @param {string} subName - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
* a message that you must ack a message before it is redelivered.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {boolean} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
* (default: false)
* @param {function} callback - The callback function.
*
* @example
* //-
* // Subscribe to a topic. (Also see {module:pubsub/topic#subscribe}).
* //-
* var topic = 'messageCenter';
* var name = 'newMessages';
*
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*
* //-
* // Customize the subscription.
* //-
* pubsub.subscribe(topic, name, {
* ackDeadlineSeconds: 90,
* autoAck: true,
* interval: 30
* }, function(err, subscription, apiResponse) {});
*
* //-
* // Create a subscription to a topic from another project.
* //-
* var anotherProject = gcloud.pubsub({
* projectId: 'another-project'
* });
*
* var topic = anotherProject.topic('messageCenter');
*
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
*/
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
throw new Error('A Topic is required for a new subscription.');
}

if (!util.is(subName, 'string')) {
throw new Error('A subscription name is required for a new subscription.');
}

if (!callback) {
callback = options;
options = {};
}

options = options || {};

if (util.is(topic, 'string')) {

This comment was marked as spam.

This comment was marked as spam.

topic = this.topic(topic);
}

var body = {
topic: topic.name
};

if (options.ackDeadlineSeconds) {
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
}

var subscription = this.subscription(subName, options);

this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
if (err && !(err.code === 409 && options.reuseExisting)) {
callback(err, null, result);
return;
}

callback(null, subscription, result);
});
};

/**
* Create a Subscription object in reference to an existing subscription. This
* command by itself will not run any API requests. You will receive a
* {@linkcode module:pubsub/subscription} object, which will allow you to
* interact with your subscription.
*
* @throws {Error} If a name is not provided.
*
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled.
* @param {number} options.interval - Interval in milliseconds to check for new
* messages.
* @return {module:pubsub/subscription}
*
* @example
* var subscription = pubsub.subscription('my-existing-subscription');
*
* // Register a listener for `message` events.
* subscription.on('message', function(message) {
* // Called every time a message is received.
* // message.id = ID used to acknowledge its receival.
* // message.data = Contents of the message.
* // message.attributes = Attributes of the message.
* });
*/
PubSub.prototype.subscription = function(name, options) {
if (!name) {
throw new Error('The name of a subscription is required.');
}

options = options || {};
options.name = name;
return new Subscription(this, options);
};

/**
* Create a Topic object to reference an existing topic.
*
Expand Down Expand Up @@ -194,9 +326,16 @@ PubSub.prototype.topic = function(name, options) {
* You may optionally provide a query object as the first argument to customize
* the response.
*
* @param {object=} query - Query object.
* @param {string=} query.pageToken - Page token.
* @param {number=} query.pageSize - Maximum number of results to return.
* Your provided callback will be invoked with an error object if an API error
* occurred or an array of {@linkcode module:pubsub/subscription} objects.
*
* To get subscriptions for a topic, see {module:pubsub/topic}.
*
* @param {object=} options - Configuration object.
* @param {string|module:pubsub/topic} options.topic - The name of the topic to
* list subscriptions from.
* @param {number} options.pageSize - Maximum number of results to return.
* @param {string} options.pageToken - Page token.
* @param {function} callback - The callback function.
*
* @example
Expand All @@ -205,38 +344,73 @@ PubSub.prototype.topic = function(name, options) {
* // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
* };
*
* // Get all subscriptions.
* //-
* // Get all subscriptions for this project.
* //-
* pubsub.getSubscriptions(callback);
*
* //-
* // Customize the query.
* //-
* pubsub.getSubscriptions({
* pageSize: 10
* pageSize: 3
* }, callback);
*/
PubSub.prototype.getSubscriptions = function(query, callback) {
PubSub.prototype.getSubscriptions = function(options, callback) {
var self = this;

if (!callback) {
callback = query;
query = {};
callback = options;
options = {};
}

var path = this.projectName + '/subscriptions';
this.makeReq_('GET', path, query, true, function(err, result) {
options = options || {};

var topicName;

if (util.is(options.topic, 'string')) {
topicName = options.topic;
} else if (options.topic instanceof Topic) {
topicName = options.topic.unformattedName;
}

var query = {};

if (options.pageSize) {
query.pageSize = options.pageSize;
}

if (options.pageToken) {
query.pageToken = options.pageToken;
}

var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
projectPath: 'projects/' + this.projectId,
topicPath: topicName ? '/topics/' + topicName : ''
});

this.makeReq_('GET', apiPath, query, null, function(err, result) {
if (err) {
callback(err, null, null, result);
return;
}

var subscriptions = (result.subscriptions || []).map(function(item) {
var subscriptions = (result.subscriptions || []).map(function(sub) {
return new Subscription(self, {
name: item.name
// Depending on if we're using a subscriptions.list or
// topics.subscriptions.list API endpoint, we will get back a
// Subscription resource or just the name of the subscription.
name: sub.name || sub
});
});

var nextQuery = null;

if (result.nextPageToken) {
nextQuery = query;
nextQuery = options;
nextQuery.pageToken = result.nextPageToken;
}

callback(null, subscriptions, nextQuery, result);
});
};
Expand Down
1 change: 1 addition & 0 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ function Subscription(pubsub, options) {
events.EventEmitter.call(this);

this.name = Subscription.formatName_(pubsub.projectId, options.name);

this.makeReq_ = pubsub.makeReq_.bind(pubsub);

this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
Expand Down
Loading