From e91499810b087cd0a21870c2a497b988a6e11eb0 Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Fri, 17 Jul 2015 12:36:17 -0400 Subject: [PATCH] pubsub: add maxInProgress option --- lib/pubsub/subscription.js | 116 +++++++++++++- test/pubsub/subscription.js | 311 ++++++++++++++++++++++++++++-------- 2 files changed, 350 insertions(+), 77 deletions(-) diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index ca6b8ea6b59..878d7069661 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -38,6 +38,8 @@ var util = require('../common/util.js'); * @param {number} options.interval - Interval in milliseconds to check for new * messages. (default: 10) * @param {string} options.name - Name of the subscription. + * @param {number} options.maxInProgress - Maximum messages to consume + * simultaneously. */ /** * A Subscription object will give you access to your Google Cloud Pub/Sub @@ -106,9 +108,18 @@ var util = require('../common/util.js'); * // Register a listener for `message` events. * function onMessage(message) { * // Called every time a message is received. + * * // message.id = ID of the message. * // message.ackId = ID used to acknowledge the message receival. * // message.data = Contents of the message. + * + * // Ack the message: + * // message.ack(callback); + * + * // Skip the message. This is useful with `maxInProgress` option when + * // creating your subscription. This doesn't ack the message, but allows + * // more messages to be retrieved if your limit was hit. + * // message.skip(); * } * subscription.on('message', onMessage); * @@ -125,6 +136,11 @@ function Subscription(pubsub, options) { this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false; this.closed = false; this.interval = util.is(options.interval, 'number') ? options.interval : 10; + this.inProgressAckIds = {}; + this.maxInProgress = + util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity; + this.messageListeners = 0; + this.paused = false; this.listenForEvents_(); } @@ -191,11 +207,10 @@ Subscription.formatMessage_ = function(msg) { */ Subscription.prototype.listenForEvents_ = function() { var self = this; - var messageListeners = 0; this.on('newListener', function(event) { if (event === 'message') { - messageListeners++; + this.messageListeners++; if (self.closed) { self.closed = false; } @@ -204,7 +219,7 @@ Subscription.prototype.listenForEvents_ = function() { }); this.on('removeListener', function(event) { - if (event === 'message' && --messageListeners === 0) { + if (event === 'message' && --this.messageListeners === 0) { self.closed = true; } }); @@ -229,20 +244,31 @@ Subscription.prototype.listenForEvents_ = function() { */ Subscription.prototype.startPulling_ = function() { var self = this; - if (this.closed) { + + if (this.closed || this.paused) { return; } + + var maxResults; + + if (this.maxInProgress < Infinity) { + maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length; + } + this.pull({ - returnImmediately: false + returnImmediately: false, + maxResults: maxResults }, function(err, messages, apiResponse) { if (err) { self.emit('error', err, apiResponse); } + if (messages) { messages.forEach(function(message) { self.emit('message', message, apiResponse); }); } + setTimeout(self.startPulling_.bind(self), self.interval); }); }; @@ -260,16 +286,33 @@ Subscription.prototype.startPulling_ = function() { * subscription.ack('ePHEESyhuE8e...', function(err, apiResponse) {}); */ Subscription.prototype.ack = function(ackIds, callback) { + var self = this; + if (!ackIds || ackIds.length === 0) { throw new Error( 'At least one ID must be specified before it can be acknowledged.'); } + ackIds = util.arrayize(ackIds); + var body = { ackIds: ackIds }; + + callback = callback || util.noop; + var path = this.name + ':acknowledge'; - this.makeReq_('POST', path, null, body, callback || util.noop); + + this.makeReq_('POST', path, null, body, function(err, resp) { + if (!err) { + ackIds.forEach(function(ackId) { + delete self.inProgressAckIds[ackId]; + }); + self.refreshPausedStatus_(); + } + + callback(err, resp); + }); }; /** @@ -324,6 +367,10 @@ Subscription.prototype.delete = function(callback) { * // id: '', // Unique message ID. * // data: '', // Contents of the message. * // attributes: {} // Attributes of the message. + * // + * // Helper functions: + * // ack(callback): // Ack the message. + * // skip(): // Free up 1 slot on the sub's maxInProgress value. * // }, * // // ... * // ] @@ -364,7 +411,11 @@ Subscription.prototype.pull = function(options, callback) { } var messages = response.receivedMessages || []; - messages = messages.map(Subscription.formatMessage_); + messages = messages + .map(Subscription.formatMessage_) + .map(self.decorateMessage_.bind(self)); + + self.refreshPausedStatus_(); if (self.autoAck && messages.length !== 0) { var ackIds = messages.map(function(message) { @@ -411,4 +462,55 @@ Subscription.prototype.setAckDeadline = function(options, callback) { this.makeReq_('POST', path, null, body, callback); }; +/** + * Add functionality on top of a message returned from the API, including the + * ability to `ack` and `skip` the message. + * + * This also records the message as being "in progress". See + * {module:subscription#refreshPausedStatus_}. + * + * @private + * + * @param {object} message - A message object. + * @return {object} message - The original message after being decorated. + * @param {function} message.ack - Ack the message. + * @param {function} message.skip - Increate the number of available messages to + * simultaneously receive. + */ +Subscription.prototype.decorateMessage_ = function(message) { + var self = this; + + this.inProgressAckIds[message.ackId] = true; + + message.ack = self.ack.bind(self, message.ackId); + + message.skip = function() { + delete self.inProgressAckIds[message.ackId]; + self.refreshPausedStatus_(); + }; + + return message; +}; + +/** + * Update the status of `maxInProgress`. Å subscription becomes "paused" (not + * pulling) when the number of messages that have yet to be ack'd or skipped + * exceeds the user's specified `maxInProgress` value. + * + * This will start pulling when that event reverses: we were paused, but one or + * more messages were just ack'd or skipped, freeing up room for more messages + * to be consumed. + * + * @private + */ +Subscription.prototype.refreshPausedStatus_ = function() { + var isCurrentlyPaused = this.paused; + var inProgress = Object.keys(this.inProgressAckIds).length; + this.paused = inProgress >= this.maxInProgress; + + if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) { + this.startPulling_(); + } +}; + module.exports = Subscription; diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index b5b710abf93..6f91a52cf40 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -66,11 +66,13 @@ describe('Subscription', function() { var CONFIG = { name: SUB_NAME, autoAck: true, - interval: 100 + interval: 100, + maxInProgress: 3 }; var sub = new Subscription(pubsubMock, CONFIG); assert.strictEqual(sub.autoAck, CONFIG.autoAck); assert.strictEqual(sub.interval, CONFIG.interval); + assert.strictEqual(sub.maxInProgress, 3); }); it('should not be closed', function() { @@ -86,6 +88,25 @@ describe('Subscription', function() { var sub = new Subscription(pubsubMock, { name: SUB_NAME }); assert.equal(sub.interval, 10); }); + + it('should start inProgressAckIds as an empty object', function() { + var sub = new Subscription(pubsubMock, { name: SUB_NAME }); + assert.equal(Object.keys(sub.inProgressAckIds).length, 0); + }); + + it('should default maxInProgress to Infinity if not specified', function() { + var sub = new Subscription(pubsubMock, { name: SUB_NAME }); + assert.strictEqual(sub.maxInProgress, Infinity); + }); + + it('should set messageListeners to 0', function() { + var sub = new Subscription(pubsubMock, { name: SUB_NAME }); + assert.strictEqual(sub.messageListeners, 0); + }); + + it('should not be paused', function() { + assert.strictEqual(subscription.paused, false); + }); }); describe('formatName_', function() { @@ -112,6 +133,18 @@ describe('Subscription', function() { subscription.on('message', util.noop); }); + it('should track the number of listeners', function() { + subscription.startPulling_ = util.noop; + + assert.strictEqual(subscription.messageListeners, 0); + + subscription.on('message', util.noop); + assert.strictEqual(subscription.messageListeners, 1); + + subscription.removeListener('message', util.noop); + assert.strictEqual(subscription.messageListeners, 0); + }); + it('should close when no more message listeners are bound', function() { subscription.startPulling_ = util.noop; subscription.on('message', util.noop); @@ -168,11 +201,63 @@ describe('Subscription', function() { subscription.ack(IDS, assert.ifError); }); - it('should pass callback to request', function(done) { - subscription.makeReq_ = function(method, path, qs, body, callback) { + it('should unmark the ack ids as being in progress', function(done) { + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(); + }; + + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + + subscription.ack(['id1', 'id2'], function(err) { + assert.ifError(err); + + var inProgressAckIds = subscription.inProgressAckIds; + assert.strictEqual(inProgressAckIds.id1, undefined); + assert.strictEqual(inProgressAckIds.id2, undefined); + assert.strictEqual(inProgressAckIds.id3, true); + + done(); + }); + }); + + it('should not unmark if there was an error', function(done) { + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(new Error('Error.')); + }; + + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + + subscription.ack(['id1', 'id2'], function() { + var inProgressAckIds = subscription.inProgressAckIds; + assert.strictEqual(inProgressAckIds.id1, true); + assert.strictEqual(inProgressAckIds.id2, true); + assert.strictEqual(inProgressAckIds.id3, true); + + done(); + }); + }); + + it('should refresh paused status', function(done) { + subscription.makeReq_ = function(method, path, query, body, callback) { callback(); }; - subscription.ack(1, done); + + subscription.refreshPausedStatus_ = done; + + subscription.ack(1, assert.ifError); + }); + + it('should pass error to callback', function(done) { + var error = new Error('Error.'); + + subscription.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + + subscription.ack(1, function(err) { + assert.strictEqual(err, error); + done(); + }); }); it('should pass apiResponse to callback', function(done) { @@ -227,29 +312,6 @@ describe('Subscription', function() { subscription.pull({ maxResults: 1 }, assert.ifError); }); - it('should execute callback with a message', function(done) { - var apiResponse = { - receivedMessages: [{ - ackId: 1, - message: { - messageId: '123', - data: new Buffer('message').toString('base64') - } - }] - }; - - subscription.makeReq_ = function(method, path, query, body, callback) { - callback(null, apiResponse); - }; - - subscription.pull(function(err, msgs) { - assert.ifError(err); - var msg = Subscription.formatMessage_(apiResponse.receivedMessages[0]); - assert.deepEqual(msgs, [msg]); - done(); - }); - }); - it('should pass error to callback', function(done) { var error = new Error('Error.'); subscription.makeReq_ = function(method, path, qs, body, callback) { @@ -261,23 +323,20 @@ describe('Subscription', function() { }); }); - it('should pass apiResponse to callback', function(done) { - var resp = { - receivedMessages: [{ - ackId: 1, - message: { - messageId: '123', - data: new Buffer('message').toString('base64') - } - }] - }; - subscription.makeReq_ = function(method, path, qs, body, callback) { - callback(null, resp); + it('should decorate the message', function(done) { + subscription.decorateMessage_ = function() { + done(); }; - subscription.pull(function(err, msgs, apiResponse) { - assert.deepEqual(resp, apiResponse); + + subscription.pull({}, assert.ifError); + }); + + it('should refresh paused status', function(done) { + subscription.refreshPausedStatus_ = function() { done(); - }); + }; + + subscription.pull({}, assert.ifError); }); describe('autoAck false', function() { @@ -293,11 +352,21 @@ describe('Subscription', function() { }); it('should execute callback with message', function(done) { + subscription.decorateMessage_ = function(msg) { return msg; }; subscription.pull({}, function(err, msgs) { + assert.ifError(err); assert.deepEqual(msgs, [expectedMessage]); done(); }); }); + + it('should pass apiResponse to callback', function(done) { + subscription.pull(function(err, msgs, apiResponse) { + assert.ifError(err); + assert.strictEqual(apiResponse, messageObj); + done(); + }); + }); }); describe('autoAck true', function() { @@ -386,40 +455,21 @@ describe('Subscription', function() { subscription.pull = util.noop; }); - it('should pull at specified interval', function(done) { - var INTERVAL = 5; - subscription.pull = function(options, callback) { - assert.strictEqual(options.returnImmediately, false); - // After pull is called once, overwrite with `done`. - // This is to override the function passed to `setTimeout`, so we are - // sure it's the same pull function when we execute it. - subscription.pull = function() { - done(); - }; - callback(); - }; - var setTimeout = global.setTimeout; - global.setTimeout = function(fn, interval) { - global.setTimeout = setTimeout; - assert.equal(interval, INTERVAL); - // This should execute the `done` function from when we overrided it - // above. - fn(); + it('should not pull if subscription is closed', function() { + subscription.pull = function() { + throw new Error('Should not be called.'); }; - subscription.interval = INTERVAL; + + subscription.closed = true; subscription.startPulling_(); }); - it('should stop pulling if subscription is closed', function() { - var pulledCount = 0; + it('should not pull if subscription is paused', function() { subscription.pull = function() { - if (++pulledCount === 3) { - subscription.pull = function() { - throw Error('Should have stopped pulling.'); - }; - subscription.close(); - } + throw new Error('Should not be called.'); }; + + subscription.paused = true; subscription.startPulling_(); }); @@ -431,6 +481,24 @@ describe('Subscription', function() { subscription.startPulling_(); }); + it('should not set maxResults if no maxInProgress is set', function(done) { + subscription.pull = function(options) { + assert.strictEqual(options.maxResults, undefined); + done(); + }; + subscription.startPulling_(); + }); + + it('should set maxResults properly with maxInProgress', function(done) { + subscription.pull = function(options) { + assert.strictEqual(options.maxResults, 1); + done(); + }; + subscription.maxInProgress = 4; + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + subscription.startPulling_(); + }); + it('should emit an error event if one is encountered', function(done) { var error = new Error('Error.'); subscription.pull = function(options, callback) { @@ -487,6 +555,30 @@ describe('Subscription', function() { done(); }); }); + + it('should pull at specified interval', function(done) { + var INTERVAL = 5; + subscription.pull = function(options, callback) { + assert.strictEqual(options.returnImmediately, false); + // After pull is called once, overwrite with `done`. + // This is to override the function passed to `setTimeout`, so we are + // sure it's the same pull function when we execute it. + subscription.pull = function() { + done(); + }; + callback(); + }; + var setTimeout = global.setTimeout; + global.setTimeout = function(fn, interval) { + global.setTimeout = setTimeout; + assert.equal(interval, INTERVAL); + // This should execute the `done` function from when we overrided it + // above. + fn(); + }; + subscription.interval = INTERVAL; + subscription.startPulling_(); + }); }); describe('delete', function() { @@ -579,6 +671,85 @@ describe('Subscription', function() { }); }); + describe('decorateMessage_', function() { + var message = { + ackId: 'b' + }; + + it('should return the message', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.strictEqual(decoratedMessage.ackId, message.ackId); + }); + + it('should mark the message as being in progress', function() { + subscription.decorateMessage_(message); + assert.strictEqual(subscription.inProgressAckIds[message.ackId], true); + }); + + describe('ack', function() { + it('should add an ack function to ack', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.equal(typeof decoratedMessage.ack, 'function'); + }); + + it('should pass the ackId to subscription.ack', function(done) { + subscription.ack = function(ackId, callback) { + assert.strictEqual(ackId, message.ackId); + callback(); + }; + + subscription.decorateMessage_(message).ack(done); + }); + }); + + describe('skip', function() { + it('should add a skip function', function() { + var decoratedMessage = subscription.decorateMessage_(message); + assert.equal(typeof decoratedMessage.skip, 'function'); + }); + + it('should unmark the message as being in progress', function() { + subscription.decorateMessage_(message).skip(); + + var inProgressAckIds = subscription.inProgressAckIds; + assert.strictEqual(inProgressAckIds[message.ackId], undefined); + }); + + it('should refresh the paused status', function(done) { + subscription.refreshPausedStatus_ = done; + subscription.decorateMessage_(message).skip(); + }); + }); + }); + + describe('refreshPausedStatus_', function() { + it('should pause if the ackIds in progress is too high', function() { + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + + subscription.maxInProgress = 2; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, true); + + subscription.maxInProgress = 3; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, true); + + subscription.maxInProgress = Infinity; + subscription.refreshPausedStatus_(); + assert.strictEqual(subscription.paused, false); + }); + + it('should start pulling if paused and listeners exist', function(done) { + subscription.startPulling_ = done; + + subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; + subscription.paused = true; + subscription.maxInProgress = Infinity; + subscription.messageListeners = 1; + subscription.refreshPausedStatus_(); + }); + }); + describe('formatMessage_', function() { it('should decode stringified JSON to object', function() { var obj = { hi: 'there' };