Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add maxInProgress option #650

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 109 additions & 7 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var util = require('../common/util.js');
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - Name of the subscription.
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
*/
/**
* A Subscription object will give you access to your Google Cloud Pub/Sub
Expand Down Expand Up @@ -106,9 +108,18 @@ var util = require('../common/util.js');
* // Register a listener for `message` events.
* function onMessage(message) {
* // Called every time a message is received.
*
* // message.id = ID of the message.
* // message.ackId = ID used to acknowledge the message receival.
* // message.data = Contents of the message.
*
* // Ack the message:
* // message.ack(callback);
*
* // Skip the message. This is useful with `maxInProgress` option when
* // creating your subscription. This doesn't ack the message, but allows
* // more messages to be retrieved if your limit was hit.
* // message.skip();
* }
* subscription.on('message', onMessage);
*
Expand All @@ -125,6 +136,11 @@ function Subscription(pubsub, options) {
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
this.closed = false;
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
this.inProgressAckIds = {};
this.maxInProgress =
util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity;
this.messageListeners = 0;
this.paused = false;

this.listenForEvents_();
}
Expand Down Expand Up @@ -191,11 +207,10 @@ Subscription.formatMessage_ = function(msg) {
*/
Subscription.prototype.listenForEvents_ = function() {
var self = this;
var messageListeners = 0;

this.on('newListener', function(event) {
if (event === 'message') {
messageListeners++;
this.messageListeners++;
if (self.closed) {
self.closed = false;
}
Expand All @@ -204,7 +219,7 @@ Subscription.prototype.listenForEvents_ = function() {
});

this.on('removeListener', function(event) {
if (event === 'message' && --messageListeners === 0) {
if (event === 'message' && --this.messageListeners === 0) {
self.closed = true;
}
});
Expand All @@ -229,20 +244,31 @@ Subscription.prototype.listenForEvents_ = function() {
*/
Subscription.prototype.startPulling_ = function() {
var self = this;
if (this.closed) {

if (this.closed || this.paused) {
return;
}

var maxResults;

if (this.maxInProgress < Infinity) {
maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length;
}

this.pull({
returnImmediately: false
returnImmediately: false,
maxResults: maxResults
}, function(err, messages, apiResponse) {
if (err) {
self.emit('error', err, apiResponse);
}

if (messages) {
messages.forEach(function(message) {
self.emit('message', message, apiResponse);
});
}

setTimeout(self.startPulling_.bind(self), self.interval);
});
};
Expand All @@ -260,16 +286,33 @@ Subscription.prototype.startPulling_ = function() {
* subscription.ack('ePHEESyhuE8e...', function(err, apiResponse) {});
*/
Subscription.prototype.ack = function(ackIds, callback) {
var self = this;

if (!ackIds || ackIds.length === 0) {
throw new Error(
'At least one ID must be specified before it can be acknowledged.');
}

ackIds = util.arrayize(ackIds);

var body = {
ackIds: ackIds
};

callback = callback || util.noop;

var path = this.name + ':acknowledge';
this.makeReq_('POST', path, null, body, callback || util.noop);

this.makeReq_('POST', path, null, body, function(err, resp) {
if (!err) {
ackIds.forEach(function(ackId) {
delete self.inProgressAckIds[ackId];
});
self.refreshPausedStatus_();
}

callback(err, resp);
});
};

/**
Expand Down Expand Up @@ -324,6 +367,10 @@ Subscription.prototype.delete = function(callback) {
* // id: '', // Unique message ID.
* // data: '', // Contents of the message.
* // attributes: {} // Attributes of the message.
* //
* // Helper functions:
* // ack(callback): // Ack the message.
* // skip(): // Free up 1 slot on the sub's maxInProgress value.
* // },
* // // ...
* // ]
Expand Down Expand Up @@ -364,7 +411,11 @@ Subscription.prototype.pull = function(options, callback) {
}

var messages = response.receivedMessages || [];
messages = messages.map(Subscription.formatMessage_);
messages = messages
.map(Subscription.formatMessage_)
.map(self.decorateMessage_.bind(self));

self.refreshPausedStatus_();

if (self.autoAck && messages.length !== 0) {
var ackIds = messages.map(function(message) {
Expand Down Expand Up @@ -411,4 +462,55 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
this.makeReq_('POST', path, null, body, callback);
};

/**
* Add functionality on top of a message returned from the API, including the
* ability to `ack` and `skip` the message.
*
* This also records the message as being "in progress". See
* {module:subscription#refreshPausedStatus_}.
*
* @private
*
* @param {object} message - A message object.
* @return {object} message - The original message after being decorated.
* @param {function} message.ack - Ack the message.
* @param {function} message.skip - Increate the number of available messages to
* simultaneously receive.
*/
Subscription.prototype.decorateMessage_ = function(message) {
var self = this;

this.inProgressAckIds[message.ackId] = true;

message.ack = self.ack.bind(self, message.ackId);

message.skip = function() {
delete self.inProgressAckIds[message.ackId];
self.refreshPausedStatus_();
};

return message;
};

/**
* Update the status of `maxInProgress`. Å subscription becomes "paused" (not
* pulling) when the number of messages that have yet to be ack'd or skipped
* exceeds the user's specified `maxInProgress` value.
*
* This will start pulling when that event reverses: we were paused, but one or
* more messages were just ack'd or skipped, freeing up room for more messages
* to be consumed.
*
* @private
*/
Subscription.prototype.refreshPausedStatus_ = function() {
var isCurrentlyPaused = this.paused;
var inProgress = Object.keys(this.inProgressAckIds).length;
this.paused = inProgress >= this.maxInProgress;

if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) {
this.startPulling_();
}
};

module.exports = Subscription;
Loading