diff --git a/README.md b/README.md index f7581ad98a0..1fb00aea644 100644 --- a/README.md +++ b/README.md @@ -246,7 +246,9 @@ pubsub.createTopic('my-new-topic', function(err, topic) {}); var topic = pubsub.topic('my-existing-topic'); // Publish a message to the topic. -topic.publish('New message!', function(err) {}); +topic.publish({ + data: 'New message!' +}, function(err) {}); // Subscribe to the topic. topic.subscribe('new-subscription', function(err, subscription) { diff --git a/lib/common/util.js b/lib/common/util.js index f7c11ffbe7d..b3ccf208a8c 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -92,9 +92,14 @@ module.exports.extendGlobalConfig = extendGlobalConfig; * // [ 'Hi' ] */ function arrayize(input) { + if (input === null || input === undefined) { + return []; + } + if (!Array.isArray(input)) { return [input]; } + return input; } diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index 133f86ae39d..1f31fc35280 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -146,16 +146,24 @@ Subscription.formatName_ = function(projectId, name) { * @private */ Subscription.formatMessage_ = function(msg) { + var event = msg.pubsubEvent; + var message = { - id: msg.ackId + ackId: msg.ackId }; - var evt = msg.pubsubEvent; - if (evt && evt.message && evt.message.data) { - message.data = new Buffer(evt.message.data, 'base64').toString('utf-8'); - try { - message.data = JSON.parse(message.data); - } catch(e) {} + + if (event && event.message) { + message.id = event.message.messageId; + + if (event.message.data) { + message.data = new Buffer(event.message.data, 'base64').toString('utf-8'); + + try { + message.data = JSON.parse(message.data); + } catch(e) {} + } } + return message; }; @@ -216,6 +224,7 @@ Subscription.prototype.startPulling_ = function() { return; } this.pull({ + maxResults: 1, returnImmediately: false }, function(err, message) { if (err) { @@ -243,7 +252,7 @@ Subscription.prototype.startPulling_ = function() { Subscription.prototype.ack = function(ids, callback) { if (!ids || ids.length === 0) { throw new Error( - 'At least one ID must be specified before it can be acknowledged'); + 'At least one ID must be specified before it can be acknowledged.'); } ids = util.arrayize(ids); var body = { @@ -278,50 +287,86 @@ Subscription.prototype.delete = function(callback) { /** * Pull messages from the subscribed topic. If messages were found, your - * callback is executed with the message object. + * callback is executed with an array of message objects. * * Note that messages are pulled automatically once you register your first * event listener to the subscription, thus the call to `pull` is handled for * you. If you don't want to start pulling, simply don't register a * `subscription.on('message', function() {})` event handler. * + * @todo Should not be racing with other pull. + * @todo Fix API to return a list of messages. + * * @param {object=} options - Configuration object. - * @param {boolean=} options.returnImmediately - If set, the system will respond + * @param {boolean} options.returnImmediately - If set, the system will respond * immediately. Otherwise, wait until new messages are available. Returns if * timeout is reached. + * @param {number} options.maxResults - Limit the amount of messages pulled. * @param {function} callback - The callback function. * * @example - * subscription.pull(function(err, message) { - * // message.id = ID used to acknowledge its receival. - * // message.data = Contents of the message. + * //- + * // Pull all available messages. + * //- + * subscription.pull(function(err, messages) { + * // messages = [ + * // { + * // ackId: '', // ID used to acknowledge its receival. + * // id: '', // Unique message ID. + * // data: '' // Contents of the message. + * // }, + * // // ... + * // ] * }); + * + * //- + * // Pull a single message. + * //- + * var opts = { + * maxResults: 1 + * }; + * + * subscription.pull(opts, function(err, messages) {}); */ Subscription.prototype.pull = function(options, callback) { var that = this; - // TODO(jbd): Should not be racing with other pull. + var MAX_EVENTS_LIMIT = 1000; + var apiEndpoint = 'subscriptions/pullBatch'; + if (!callback) { callback = options; options = {}; } + + if (!util.is(options.maxResults, 'number')) { + options.maxResults = MAX_EVENTS_LIMIT; + } + var body = { subscription: this.name, - returnImmediately: !!options.returnImmediately + returnImmediately: !!options.returnImmediately, + maxEvents: options.maxResults }; - this.makeReq_( - 'POST', 'subscriptions/pull', null, body, function(err, message) { - // TODO(jbd): Fix API to return a list of messages. + + this.makeReq_('POST', apiEndpoint, null, body, function(err, response) { if (err) { callback(err); return; } - message = Subscription.formatMessage_(message); + + var messages = response.pullResponses || [response]; + messages = messages.map(Subscription.formatMessage_); + if (that.autoAck) { - that.ack(message.id, function(err) { - callback(err, message); + var ackIds = messages.map(function(message) { + return message.ackId; + }); + + that.ack(ackIds, function(err) { + callback(err, messages); }); } else { - callback(null, message); + callback(null, messages); } }); }; diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js index 4eca36ca769..fcc6f323040 100644 --- a/lib/pubsub/topic.js +++ b/lib/pubsub/topic.js @@ -62,6 +62,23 @@ function Topic(pubsub, options) { this.pubsub = pubsub; } +/** + * Format a message object as the upstream API expects it. + * + * @private + * + * @return {object} + */ +Topic.formatMessage_ = function(message) { + if (!util.is(message.data, 'buffer')) { + message.data = new Buffer(JSON.stringify(message.data)); + } + + message.data = message.data.toString('base64'); + + return message; +}; + /** * Format the name of a topic. A Topic's full name is in the format of * /topics/{projectId}/{name}. @@ -83,59 +100,55 @@ Topic.formatName_ = function(projectId, name) { * * @throws {Error} If no message is provided. * - * @param {*} message - The message to publish. + * @param {object|object[]} message - The message(s) to publish. + * @param {*} message.data - The contents of the message. + * @param {array=} message.labels - Labels to apply to the message. * @param {function=} callback - The callback function. * * @example - * topic.publish('New message!', function(err) {}); - * - * topic.publish({ - * user_id: 3, - * name: 'Stephen', - * message: 'Hello from me!' - * }, function(err) {}); - */ -Topic.prototype.publish = function(message, callback) { - if (!message) { - throw new Error('Cannot publish an empty message.'); - } - callback = callback || util.noop; - if (!util.is(message, 'string') && !util.is(message, 'buffer')) { - message = JSON.stringify(message); - } - this.publishRaw({ - data: new Buffer(message).toString('base64') - }, callback); -}; - -/** - * Publish a raw message. + * var registrationMessage = { + * data: { + * userId: 3, + * name: 'Stephen', + * event: 'new user' + * }, + * labels: [ + * 'registration' + * ] + * }; + * topic.publish(registrationMessage, function(err) {}); * - * @throws {Error} If no message is provided. - * - * @param {object} message - Raw message to publish. - * @param {array=} message.label - List of labels for the message. - * @param {string} message.data - The base64-encoded contents of the message. - * @param {function=} callback - The callback function. + * //- + * // You can publish a batch of messages at once by supplying an array. + * //- + * var purchaseMessage = { + * data: { + * userId: 3, + * product: 'computer', + * event: 'purchase' + * } + * }; * - * @example - * topic.publishRaw({ - * data: new Buffer('New message!').toString('base64') - * }, function(err) {}); + * topic.publish([ + * registrationMessage, + * purchaseMessage + * ], function(err) {}); */ -Topic.prototype.publishRaw = function(message, callback) { - if (!message) { - throw new Error('Cannot publish an empty message.'); +Topic.prototype.publish = function(messages, callback) { + messages = util.arrayize(messages); + + if (messages.length === 0) { + throw new Error('Cannot publish without a message.'); } + callback = callback || util.noop; - if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) { - message.data = new Buffer(JSON.stringify(message.data)).toString('base64'); - } + var body = { - message: message, - topic: this.name + topic: this.name, + messages: messages.map(Topic.formatMessage_) }; - this.makeReq_('POST', 'topics/publish', null, body, callback); + + this.makeReq_('POST', 'topics/publishBatch', null, body, callback); }; /** diff --git a/regression/pubsub.js b/regression/pubsub.js index f92a16137cf..9afcfe84c5b 100644 --- a/regression/pubsub.js +++ b/regression/pubsub.js @@ -81,7 +81,7 @@ describe('pubsub', function() { it('should publish a message', function(done) { pubsub.topic(topicNames[0]) - .publish('message from me', done); + .publish({ data: 'message from me' }, done); }); it('should be deleted', function(done) { @@ -166,6 +166,7 @@ describe('pubsub', function() { it('should error when using a non-existent subscription', function(done) { var subscription = topic.subscription('non-existent-subscription'); + subscription.pull(function(err) { assert.equal(err.code, 404); done(); @@ -174,33 +175,106 @@ describe('pubsub', function() { it('should be able to pull and ack', function(done) { var subscription = topic.subscription(subscriptions[0].name); - subscription.pull({ returnImmediately: true }, function(err, msg) { + + topic.publish({ data: 'hello' }, function(err) { assert.ifError(err); - subscription.ack(msg.id, done); + + subscription.pull({ + returnImmediately: true, + maxResults: 1 + }, function(err, msgs) { + assert.ifError(err); + subscription.ack(msgs[0].ackId, done); + }); }); - topic.publish('hello', assert.ifError); }); it('should receive the published message', function(done) { var subscription = topic.subscription(subscriptions[0].name); - subscription.pull({ returnImmediately: true }, function(err, msg) { + + topic.publish([ + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' } + ], function(err) { assert.ifError(err); - assert.equal(msg.data, 'hello'); - subscription.ack(msg.id, done); + + subscription.pull({ + returnImmediately: true, + maxResults: 1 + }, function(err, msgs) { + assert.ifError(err); + assert.equal(msgs[0].data, 'hello'); + subscription.ack(msgs[0].ackId, done); + }); }); - topic.publish('hello', assert.ifError); }); it('should receive a raw published message', function(done) { var subscription = topic.subscription(subscriptions[0].name); - subscription.pull({ returnImmediately: true }, function(err, msg) { + + topic.publish([ + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' } + ], function(err) { assert.ifError(err); - assert.equal(msg.data, 'hello'); - subscription.ack(msg.id, done); + + subscription.pull({ + returnImmediately: true, + maxResults: 1 + }, function(err, msgs) { + assert.ifError(err); + assert.equal(msgs[0].data, 'hello'); + subscription.ack(msgs[0].ackId, done); + }); + }); + }); + + it('should receive the chosen amount of results', function(done) { + var subscription = topic.subscription(subscriptions[0].name); + var opts = { returnImmediately: true, maxResults: 3 }; + + topic.publish([ + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' }, + { data: 'hello' } + ], function(err) { + assert.ifError(err); + + subscription.pull(opts, function(err, messages) { + assert.ifError(err); + + assert.equal(messages.length, opts.maxResults); + + var ackIds = messages.map(function(message) { + return message.ackId; + }); + + subscription.ack(ackIds, done); + }); }); - topic.publishRaw({ - data: new Buffer('hello').toString('base64') - }, assert.ifError); }); }); }); diff --git a/test/common/util.js b/test/common/util.js index b648a3cfdfd..02d49d7c044 100644 --- a/test/common/util.js +++ b/test/common/util.js @@ -62,10 +62,21 @@ describe('common/util', function() { }); describe('arrayize', function() { - it('should arrayize if the input is not an array', function(done) { - var o = util.arrayize('text'); - assert.deepEqual(o, ['text']); - done(); + it('should arrayize if the input is not an array', function() { + assert.deepEqual(util.arrayize('text'), ['text']); + }); + + it('should return the same array if given an array', function() { + var arr = [1, 2, 3]; + assert.deepEqual(util.arrayize(arr), arr); + }); + + it('should return an empty array in correct circumstance', function() { + assert.deepEqual(util.arrayize(undefined), []); + assert.deepEqual(util.arrayize(null), []); + + assert.deepEqual(util.arrayize(false), [false]); + assert.deepEqual(util.arrayize(0), [0]); }); }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index 2d1ed3206e7..dd2427ee2cf 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -36,11 +36,16 @@ describe('Subscription', function() { ackId: 3, pubsubEvent: { message: { - data: messageBuffer + data: messageBuffer, + messageId: 7 } } }; - var expectedMessage = { id: 3, data: message }; + var expectedMessage = { + ackId: 3, + data: message, + id: 7 + }; var subscription; beforeEach(function() { @@ -180,17 +185,6 @@ describe('Subscription', function() { }; }); - it('should make correct api request', function(done) { - subscription.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'subscriptions/pull'); - assert.equal(body.subscription, SUB_FULL_NAME); - assert.strictEqual(body.returnImmediately, false); - done(); - }; - subscription.pull({}, assert.ifError); - }); - it('should not require configuration options', function(done) { subscription.pull(done); }); @@ -211,6 +205,119 @@ describe('Subscription', function() { subscription.pull({ returnImmediately: true }, assert.ifError); }); + it('should default to batching', function(done) { + subscription.makeReq_ = function(method, path, query, body) { + assert.equal(path, 'subscriptions/pullBatch'); + assert.equal(body.maxEvents, 1000); + done(); + }; + + subscription.pull(assert.ifError); + }); + + describe('single pull', function() { + it('should make correct api request', function(done) { + subscription.makeReq_ = function(method, path, qs, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'subscriptions/pullBatch'); + assert.equal(body.subscription, SUB_FULL_NAME); + assert.strictEqual(body.returnImmediately, false); + assert.equal(body.maxEvents, 1); + done(); + }; + + subscription.pull({ maxResults: 1 }, assert.ifError); + }); + + it('should execute callback with a message', function(done) { + var apiResponse = { + ackId: 1, + pubsubEvent: { + message: { + data: new Buffer('message').toString('base64') + } + } + }; + + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(null, apiResponse); + }; + + subscription.pull(function(err, msgs) { + assert.ifError(err); + + assert.deepEqual(msgs, [Subscription.formatMessage_(apiResponse)]); + + done(); + }); + }); + }); + + describe('batching', function() { + it('should make correct api request', function(done) { + subscription.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'subscriptions/pullBatch'); + assert.strictEqual(query, null); + assert.deepEqual(body, { + subscription: subscription.name, + returnImmediately: false, + maxEvents: 3 + }); + + done(); + }; + + subscription.pull({ maxResults: 3 }, assert.ifError); + }); + + it('should execute callback with the messages', function(done) { + var apiResponse = { + pullResponses: [ + { + ackId: 1, + pubsubEvent: { + message: { + data: new Buffer('message').toString('base64') + } + } + }, + { + ackId: 2, + pubsubEvent: { + message: { + data: new Buffer('message').toString('base64') + } + } + }, + { + ackId: 3, + pubsubEvent: { + message: { + data: new Buffer('message').toString('base64') + } + } + } + ] + }; + + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(null, apiResponse); + }; + + subscription.pull({ maxResults: 3 }, function(err, messages) { + assert.ifError(err); + + assert.deepEqual( + messages, + apiResponse.pullResponses.map(Subscription.formatMessage_) + ); + + done(); + }); + }); + }); + it('should pass error to callback', function(done) { var error = new Error('Error.'); subscription.makeReq_ = function(method, path, qs, body, callback) { @@ -235,8 +342,8 @@ describe('Subscription', function() { }); it('should execute callback with message', function(done) { - subscription.pull({}, function(err, msg) { - assert.deepEqual(msg, expectedMessage); + subscription.pull({}, function(err, msgs) { + assert.deepEqual(msgs, [expectedMessage]); done(); }); }); @@ -259,7 +366,7 @@ describe('Subscription', function() { it('should pass id to ack', function(done) { subscription.ack = function(id) { - assert.equal(id, expectedMessage.id); + assert.equal(id, expectedMessage.ackId); done(); }; subscription.pull({}, assert.ifError); @@ -294,6 +401,8 @@ describe('Subscription', function() { it('should pull at specified interval', function(done) { var INTERVAL = 5; subscription.pull = function(options, callback) { + assert.equal(options.maxResults, 1); + assert.strictEqual(options.returnImmediately, false); // After pull is called once, overwrite with `done`. // This is to override the function passed to `setTimeout`, so we are // sure it's the same pull function when we execute it. @@ -415,11 +524,22 @@ describe('Subscription', function() { it('should decode stringified JSON to object', function() { var obj = { hi: 'there' }; var stringified = new Buffer(JSON.stringify(obj)).toString('base64'); + var msg = Subscription.formatMessage_({ ackId: 3, - pubsubEvent: { message: { data: stringified } } + pubsubEvent: { + message: { + data: stringified, + messageId: 7 + } + } + }); + + assert.deepEqual(msg, { + ackId: 3, + id: 7, + data: obj }); - assert.deepEqual(msg, { id: 3, data: obj }); }); it('should decode buffer to string', function() { diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js index 2425b67c804..b6ad21d2db3 100644 --- a/test/pubsub/topic.js +++ b/test/pubsub/topic.js @@ -86,6 +86,28 @@ describe('Topic', function() { }); }); + describe('formatMessage_', function() { + var messageString = 'string'; + var messageBuffer = new Buffer(messageString); + + var messageObjectWithString = { data: messageString }; + var messageObjectWithBuffer = { data: messageBuffer }; + + it('should handle string data', function() { + assert.deepEqual( + Topic.formatMessage_(messageObjectWithString), + { data: new Buffer(JSON.stringify(messageString)).toString('base64') } + ); + }); + + it('should handle buffer data', function() { + assert.deepEqual( + Topic.formatMessage_(messageObjectWithBuffer), + { data: messageBuffer.toString('base64') } + ); + }); + }); + describe('formatName_', function() { var fullName = '/topics/' + PROJECT_ID + '/' + TOPIC_NAME; @@ -100,85 +122,43 @@ describe('Topic', function() { }); }); - describe('publishing', function() { + describe('publish', function() { var message = 'howdy'; - var messageBuffer = new Buffer(message); - var messageRaw = { data: messageBuffer.toString('base64') }; - var messageObj = { test: 'object' }; - var messageObjDecoded = - new Buffer(JSON.stringify(messageObj)).toString('base64'); - - describe('publish', function() { - it('should throw if no message is provided', function() { - assert.throws(function() { - topic.publish(); - }, /empty message/); - }); - - it('should convert string to raw message', function(done) { - topic.publishRaw = function(msg) { - assert.deepEqual(msg, messageRaw); - done(); - }; - topic.publish(message, assert.ifError); - }); + var messageObject = { data: message }; - it('should convert buffer to raw message', function(done) { - topic.publishRaw = function(msg) { - assert.deepEqual(msg, messageRaw); - done(); - }; - topic.publish(messageBuffer, assert.ifError); - }); + it('should throw if no message is provided', function() { + assert.throws(function() { + topic.publish(); + }, /Cannot publish/); - it('should stringify non-strings & non-buffers', function(done) { - topic.publishRaw = function(msg) { - assert.deepEqual(msg.data, messageObjDecoded); - done(); - }; - topic.publish(messageObj, assert.ifError); - }); - - it('should pass callback', function(done) { - topic.publishRaw = function(msg, callback) { - callback(); - }; - topic.publish(message, done); - }); + assert.throws(function() { + topic.publish([]); + }, /Cannot publish/); }); - describe('publishRaw', function() { - it('should throw if no message is provided', function() { - assert.throws(function() { - topic.publishRaw(); - }, /empty message/); - }); + it('should send correct api request', function(done) { + topic.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, 'topics/publishBatch'); + assert.strictEqual(query, null); + assert.deepEqual(body, { + topic: topic.name, + messages: [ + { data: new Buffer(JSON.stringify(message)).toString('base64') } + ] + }); + done(); + }; - it('should stringify non-strings & non-buffers', function(done) { - topic.makeReq_ = function(method, path, qs, body) { - assert.deepEqual(body.message.data, messageObjDecoded); - done(); - }; - topic.publishRaw({ data: messageObj }, assert.ifError); - }); + topic.publish(messageObject, assert.ifError); + }); - it('should post raw messages to the api', function(done) { - topic.makeReq_ = function(method, path, qs, body) { - assert.equal(method, 'POST'); - assert.equal(path, 'topics/publish'); - assert.deepEqual(body.message.data, messageRaw.data); - done(); - }; - topic.publishRaw(messageRaw, assert.ifError); - }); + it('should execute callback', function(done) { + topic.makeReq_ = function(method, path, query, body, callback) { + callback(); + }; - it('should attach topic name to the request', function(done) { - topic.makeReq_ = function(method, path, qs, body) { - assert.equal(body.topic, topic.name); - done(); - }; - topic.publishRaw(messageRaw, assert.ifError); - }); + topic.publish(messageObject, done); }); });