Skip to content

Commit

Permalink
pubsub: allow setting a pull HTTP timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Dec 3, 2015
1 parent 8ca58fe commit c540b31
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 14 deletions.
9 changes: 7 additions & 2 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,16 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
* @param {string} name - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled.
* it's pulled. (default: false)
* @param {string} options.encoding - When pulling for messages, this type is
* used when converting a message's data to a string. (default: 'utf-8')
* @param {number} options.interval - Interval in milliseconds to check for new
* messages.
* messages. (default: 10)
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken.
* @return {module:pubsub/subscription}
*
* @example
Expand Down
31 changes: 21 additions & 10 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ var util = require('../common/util.js');
*
* @param {module:pubsub} pubsub - PubSub object.
* @param {object} options - Configuration object.
* @param {boolean} options.autoAck - Automatically acknowledge the message
* once it's pulled. (default: false)
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
* @param {string} options.encoding - When pulling for messages, this type is
* used when converting a message's data to a string. (default: 'utf-8')
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - Name of the subscription.
* @param {number} options.maxInProgress - Maximum messages to consume
* simultaneously.
* @param {number} options.timeout - Set a maximum amount of time in
* milliseconds on an HTTP request to pull new messages to wait for a
* response before the connection is broken. (default: 90000)
*/
/**
* A Subscription object will give you access to your Google Cloud Pub/Sub
Expand Down Expand Up @@ -269,13 +272,16 @@ function Subscription(pubsub, options) {

this.autoAck = is.boolean(options.autoAck) ? options.autoAck : false;
this.closed = true;
this.interval = is.number(options.interval) ? options.interval : 10;
this.encoding = options.encoding || 'utf-8';
this.inProgressAckIds = {};
this.interval = is.number(options.interval) ? options.interval : 10;
this.maxInProgress =
is.number(options.maxInProgress) ? options.maxInProgress : Infinity;
this.messageListeners = 0;
this.paused = false;
this.encoding = options.encoding || 'utf-8';
// The default timeout set used in this library is 60s, but a pull request
// times out around 90 seconds.
this.timeout = is.number(options.timeout) ? options.timeout : 90000;

this.listenForEvents_();
}
Expand Down Expand Up @@ -447,10 +453,10 @@ Subscription.prototype.delete = function(callback) {
* @resource [Subscriptions: pull API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/pull}
*
* @param {object=} options - Configuration object.
* @param {number} options.maxResults - Limit the amount of messages pulled.
* @param {boolean} options.returnImmediately - If set, the system will respond
* immediately. Otherwise, wait until new messages are available. Returns if
* timeout is reached.
* @param {number} options.maxResults - Limit the amount of messages pulled.
* @param {function} callback - The callback function.
*
* @example
Expand Down Expand Up @@ -496,9 +502,7 @@ Subscription.prototype.pull = function(options, callback) {
}

this.request({
// The default timeout set used in this library is 60s, but a pull request
// times out around 90 seconds.
timeout: 90000,
timeout: this.timeout,
method: 'POST',
uri: ':pull',
json: {
Expand All @@ -507,8 +511,15 @@ Subscription.prototype.pull = function(options, callback) {
}
}, function(err, response) {
if (err) {
callback(err, null, response);
return;
if (err.code === 'ETIMEDOUT' && !err.connect) {
// Simulate a server timeout where no messages were received.
response = {
receivedMessages: []
};
} else {
callback(err, null, response);
return;
}
}

var messages = arrify(response.receivedMessages)
Expand Down
41 changes: 39 additions & 2 deletions test/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ describe('Subscription', function() {
autoAck: true,
interval: 100,
maxInProgress: 3,
encoding: 'binary'
encoding: 'binary',
timeout: 30000
};
var sub = new Subscription(PUBSUB, CONFIG);
assert.strictEqual(sub.autoAck, CONFIG.autoAck);
assert.strictEqual(sub.interval, CONFIG.interval);
assert.strictEqual(sub.maxInProgress, 3);
assert.strictEqual(sub.encoding, CONFIG.encoding);
assert.strictEqual(sub.maxInProgress, CONFIG.maxInProgress);
assert.strictEqual(sub.timeout, CONFIG.timeout);
});

it('should be closed', function() {
Expand Down Expand Up @@ -157,6 +159,10 @@ describe('Subscription', function() {
assert.strictEqual(subscription.encoding, 'utf-8');
});

it('should default timeout to 90 seconds', function() {
assert.strictEqual(subscription.timeout, 90000);
});

it('should create an iam object', function() {
assert.deepEqual(subscription.iam.calledWith_, [
PUBSUB,
Expand Down Expand Up @@ -475,6 +481,22 @@ describe('Subscription', function() {
subscription.pull({ maxResults: 1 }, assert.ifError);
});

it('should pass a timeout if specified', function(done) {
var timeout = 90000;

var subscription = new Subscription(PUBSUB, {
name: SUB_NAME,
timeout: timeout
});

subscription.request = function(reqOpts) {
assert.strictEqual(reqOpts.timeout, timeout);
done();
};

subscription.pull(assert.ifError);
});

it('should pass error to callback', function(done) {
var error = new Error('Error.');
subscription.request = function(reqOpts, callback) {
Expand All @@ -486,6 +508,21 @@ describe('Subscription', function() {
});
});

it('should not return messages if request timed out', function(done) {
subscription.request = function(reqOpts, callback) {
var error = new Error();
error.code = 'ETIMEDOUT';
error.connect = false;
callback(error);
};

subscription.pull({}, function(err, messages) {
assert.ifError(err);
assert.deepEqual(messages, []);
done();
});
});

it('should call formatMessage_ with encoding', function(done) {
subscription.encoding = 'encoding-value';

Expand Down

0 comments on commit c540b31

Please sign in to comment.