Skip to content

Commit

Permalink
pubsub: privatize createTopic + default autoCreate:true
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jun 24, 2015
1 parent 9a49b79 commit c43f35f
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 157 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,7 @@ var pubsub = gcloud.pubsub({
keyFilename: '/path/to/keyfile.json'
});

// Create a new topic.
pubsub.createTopic('my-new-topic', function(err, topic) {});

// Reference an existing topic.
var topic = pubsub.topic('my-existing-topic');
var topic = pubsub.topic('my-topic');

// Publish a message to the topic.
topic.publish({
Expand Down
65 changes: 33 additions & 32 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,34 +148,6 @@ PubSub.prototype.getTopics = function(query, callback) {
});
};

/**
* Create a topic with the given name.
*
* @param {string} name - Name of the topic.
* @param {function=} callback - The callback function.
* @param {?error} callback.err - An error from the API call, may be null.
* @param {module:pubsub/topic} callback.topic - The newly created topic.
* @param {object} callback.apiResponse - The full API response from the
* service.
*
* @example
* pubsub.createTopic('my-new-topic', function(err, topic, apiResponse) {
* topic.publish('New message!', function(err) {});
* });
*/
PubSub.prototype.createTopic = function(name, callback) {
callback = callback || util.noop;
var topic = this.topic(name);
var path = this.projectName + '/topics/' + name;
this.makeReq_('PUT', path, null, null, function(err, result) {
if (err) {
callback(err, null, result);
return;
}
callback(null, topic, result);
});
};

/**
* Create a subscription to a topic. You may optionally provide an object to
* customize the subscription.
Expand Down Expand Up @@ -315,14 +287,13 @@ PubSub.prototype.subscription = function(name, options) {
*
* @param {string} name - The name of the topic.
* @param {object=} options - Configuration object.
* @param {boolean=} options.autoCreate - Automatically create topic if it
* @param {boolean} options.autoCreate - Automatically create topic if it
* doesn't exist. Note that messages published to a topic with no
* subscribers will not be delivered.
* subscribers will not be delivered. Default: true.
* @return {module:pubsub/topic}
*
* @example
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('topic-that-maybe-exists', { autoCreate: true });
* var topic = pubsub.topic('my-topic');
*/
PubSub.prototype.topic = function(name, options) {
if (!name) {
Expand Down Expand Up @@ -436,6 +407,36 @@ PubSub.prototype.getSubscriptions = function(options, callback) {
});
};

/**
* Create a topic with the given name.
*
* @private
*
* @param {string} name - Name of the topic.
* @param {function=} callback - The callback function.
* @param {?error} callback.err - An error from the API call, may be null.
* @param {module:pubsub/topic} callback.topic - The newly created topic.
* @param {object} callback.apiResponse - The full API response from the
* service.
*
* @example
* pubsub.createTopic_('my-new-topic', function(err, topic, apiResponse) {
* topic.publish('New message!', function(err) {});
* });
*/
PubSub.prototype.createTopic_ = function(name, callback) {
callback = callback || util.noop;
var topic = this.topic(name);
var path = this.projectName + '/topics/' + name;
this.makeReq_('PUT', path, null, null, function(err, result) {
if (err) {
callback(err, null, result);
return;
}
callback(null, topic, result);
});
};

/**
* Make a new request object from the provided arguments and wrap the callback
* to intercept non-successful responses.
Expand Down
6 changes: 3 additions & 3 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,23 @@ var util = require('../common/util.js');
* //-
* // From {@linkcode module:pubsub/topic#getSubscriptions}:
* //-
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('my-topic');
* topic.getSubscriptions(function(err, subscriptions) {
* // `subscriptions` is an array of Subscription objects.
* });
*
* //-
* // From {@linkcode module:pubsub/topic#subscribe}:
* //-
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('my-topic');
* topic.subscribe('new-subscription', function(err, subscription) {
* // `subscription` is a Subscription object.
* });
*
* //-
* // From {@linkcode module:pubsub/topic#subscription}:
* //-
* var topic = pubsub.topic('my-existing-topic');
* var topic = pubsub.topic('my-topic');
* var subscription = topic.subscription('my-existing-subscription');
* // `subscription` is a Subscription object.
*
Expand Down
91 changes: 45 additions & 46 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ var util = require('../common/util.js');
*
* @param {module:pubsub} pubsub - PubSub object.
* @param {object} options - Configuration object.
* @param {boolean=} options.autoCreate - Automatically create topic if it
* doesn't exist. Note that messages published to a topic with no
* subscribers will not be delivered. Default: true.
* @param {string} options.name - Name of the topic.
*/
/**
* A Topic object allows you to interact with a Google Cloud Pub/Sub topic. To
* get this object, you will use the methods on the `pubsub` object,
* {@linkcode module:pubsub#topic} and {@linkcode module:pubsub#createTopic}.
* A Topic object allows you to interact with a Google Cloud Pub/Sub topic.
*
* @constructor
* @alias module:pubsub/topic
Expand All @@ -45,25 +46,15 @@ var util = require('../common/util.js');
* projectId: 'grape-spaceship-123'
* });
*
* // From pubsub.topic:
* var topic = pubsub.topic('my-existing-topic');
*
* // From pubsub.createTopic:
* pubsub.createTopic('my-new-topic', function(err, topic) {
* // `topic` is a Topic object.
* });
* var topic = pubsub.topic('my-topic');
*/
function Topic(pubsub, options) {
this.makeReq_ = pubsub.makeReq_.bind(pubsub);
this.autoCreate = options.autoCreate !== false;
this.name = Topic.formatName_(pubsub.projectId, options.name);

this.projectId = pubsub.projectId;
this.pubsub = pubsub;
this.unformattedName = options.name;

if (options.autoCreate) {
this.origMakeReq_ = this.makeReq_;
this.makeReq_ = this.autoCreateWrapper_;
}
}

/**
Expand Down Expand Up @@ -99,34 +90,6 @@ Topic.formatName_ = function(projectId, name) {
return 'projects/' + projectId + '/topics/' + name;
};

/**
* Wrapper for makeReq_ that automatically attempts to create a topic if it does
* not yet exist.
*
* @private
*/
Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
var self = this;

function createAndRetry() {
self.pubsub.createTopic(self.unformattedName, function(err) {
if (err) {
callback(err);
return;
}
self.origMakeReq_(method, path, q, body, callback);
});
}

this.origMakeReq_(method, path, q, body, function(err, res) {
if (err && err.code === 404 && method !== 'DELETE') {
createAndRetry();
} else {
callback(err, res);
}
});
};

/**
* Publish the provided message or array of messages. On success, an array of
* messageIds is returned in the response.
Expand All @@ -144,7 +107,7 @@ Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
* topic.publish({
* data: 'Hello, world!'
* }, function(err, messageIds, apiResponse) {});
*
*
* //-
* // The data property can be a JSON object as well.
* //-
Expand All @@ -159,7 +122,7 @@ Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
* hello: 'world'
* }
* };
*
*
* topic.publish(registerMessage, function(err, messageIds, apiResponse) {});
*
* //-
Expand Down Expand Up @@ -321,4 +284,40 @@ Topic.prototype.subscription = function(name, options) {
return this.pubsub.subscription(name, options);
};

/**
* Make an API request using the parent PubSub object's `makeReq_`. If the Topic
* instance has `autoCreate: true` set, this method will first try to create the
* Topic in the event of a 404.
*
* @private
*
* @param {string} method - Action.
* @param {string} path - Request path.
* @param {*} query - Request query object.
* @param {*} body - Request body contents.
* @param {function} callback - The callback function.
*/
Topic.prototype.makeReq_ = function(method, path, query, body, callback) {
var self = this;

function createTopicThenRetryRequest() {
self.pubsub.createTopic_(self.unformattedName, function(err, topic, res) {
if (err) {
callback(err, null, res);
return;
}

self.pubsub.makeReq_(method, path, query, body, callback);
});
}

this.pubsub.makeReq_(method, path, query, body, function(err, res) {
if (self.autoCreate && err && err.code === 404 && method !== 'DELETE') {
createTopicThenRetryRequest();
} else {
callback(err, res);
}
});
};

module.exports = Topic;
26 changes: 22 additions & 4 deletions system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('pubsub', function() {
before(function(done) {
// create all needed topics
async.each(TOPIC_NAMES, function(name, cb) {
pubsub.createTopic(name, cb);
pubsub.createTopic_(name, cb);
}, done);
});

Expand All @@ -63,7 +63,6 @@ describe('pubsub', function() {
});

describe('Topic', function() {

it('should be listed', function(done) {
pubsub.getTopics(function(err, topics) {
assert.ifError(err);
Expand Down Expand Up @@ -92,12 +91,31 @@ describe('pubsub', function() {

it('should be created and deleted', function(done) {
var TOPIC_NAME = generateTopicName();
pubsub.createTopic(TOPIC_NAME, function(err) {
pubsub.createTopic_(TOPIC_NAME, function(err) {
assert.ifError(err);
pubsub.topic(TOPIC_NAME).delete(done);
});
});

it('should lazily create by default', function(done) {
var newTopicName = generateTopicName();
var newTopic = pubsub.topic(newTopicName);

newTopic.publish({ data: 'message from me' }, function(err) {
assert.ifError(err);

pubsub.getTopics(function(err, topics) {
assert.ifError(err);

assert(topics.some(function(topic) {
return topic.name.indexOf(newTopicName) > -1;
}));

newTopic.delete(done);
});
});
});

it('should publish a message', function(done) {
var topic = pubsub.topic(TOPIC_NAMES[0]);
topic.publish({ data: 'message from me' }, function(err, messageIds) {
Expand Down Expand Up @@ -131,7 +149,7 @@ describe('pubsub', function() {

before(function(done) {
// Create a new test topic.
pubsub.createTopic(TOPIC_NAME, function(err, newTopic) {
pubsub.createTopic_(TOPIC_NAME, function(err, newTopic) {
assert.ifError(err);
topic = newTopic;

Expand Down
Loading

0 comments on commit c43f35f

Please sign in to comment.