diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js index 64d7b375c056..505be907c3d2 100644 --- a/lib/pubsub/index.js +++ b/lib/pubsub/index.js @@ -480,11 +480,16 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) { * @param {string} name - The name of the subscription. * @param {object=} options - Configuration object. * @param {boolean} options.autoAck - Automatically acknowledge the message once - * it's pulled. + * it's pulled. (default: false) * @param {string} options.encoding - When pulling for messages, this type is * used when converting a message's data to a string. (default: 'utf-8') * @param {number} options.interval - Interval in milliseconds to check for new - * messages. + * messages. (default: 10) + * @param {number} options.maxInProgress - Maximum messages to consume + * simultaneously. + * @param {number} options.timeout - Set a maximum amount of time in + * milliseconds on an HTTP request to pull new messages to wait for a + * response before the connection is broken. * @return {module:pubsub/subscription} * * @example diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index ec48d1a0b14a..46e4846b0db1 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -48,8 +48,8 @@ var util = require('../common/util.js'); * * @param {module:pubsub} pubsub - PubSub object. * @param {object} options - Configuration object. - * @param {boolean} options.autoAck - Automatically acknowledge the message - * once it's pulled. (default: false) + * @param {boolean} options.autoAck - Automatically acknowledge the message once + * it's pulled. (default: false) * @param {string} options.encoding - When pulling for messages, this type is * used when converting a message's data to a string. (default: 'utf-8') * @param {number} options.interval - Interval in milliseconds to check for new @@ -57,6 +57,9 @@ var util = require('../common/util.js'); * @param {string} options.name - Name of the subscription. * @param {number} options.maxInProgress - Maximum messages to consume * simultaneously. + * @param {number} options.timeout - Set a maximum amount of time in + * milliseconds on an HTTP request to pull new messages to wait for a + * response before the connection is broken. (default: 90000) */ /** * A Subscription object will give you access to your Google Cloud Pub/Sub @@ -269,13 +272,16 @@ function Subscription(pubsub, options) { this.autoAck = is.boolean(options.autoAck) ? options.autoAck : false; this.closed = true; - this.interval = is.number(options.interval) ? options.interval : 10; + this.encoding = options.encoding || 'utf-8'; this.inProgressAckIds = {}; + this.interval = is.number(options.interval) ? options.interval : 10; this.maxInProgress = is.number(options.maxInProgress) ? options.maxInProgress : Infinity; this.messageListeners = 0; this.paused = false; - this.encoding = options.encoding || 'utf-8'; + // The default timeout set used in this library is 60s, but a pull request + // times out around 90 seconds. + this.timeout = is.number(options.timeout) ? options.timeout : 90000; this.listenForEvents_(); } @@ -447,10 +453,10 @@ Subscription.prototype.delete = function(callback) { * @resource [Subscriptions: pull API Documentation]{@link https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/pull} * * @param {object=} options - Configuration object. + * @param {number} options.maxResults - Limit the amount of messages pulled. * @param {boolean} options.returnImmediately - If set, the system will respond * immediately. Otherwise, wait until new messages are available. Returns if * timeout is reached. - * @param {number} options.maxResults - Limit the amount of messages pulled. * @param {function} callback - The callback function. * * @example @@ -496,9 +502,7 @@ Subscription.prototype.pull = function(options, callback) { } this.request({ - // The default timeout set used in this library is 60s, but a pull request - // times out around 90 seconds. - timeout: 90000, + timeout: this.timeout, method: 'POST', uri: ':pull', json: { @@ -507,8 +511,15 @@ Subscription.prototype.pull = function(options, callback) { } }, function(err, response) { if (err) { - callback(err, null, response); - return; + if (err.code === 'ETIMEDOUT' && !err.connect) { + // Simulate a server timeout where no messages were received. + response = { + receivedMessages: [] + }; + } else { + callback(err, null, response); + return; + } } var messages = arrify(response.receivedMessages) diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index a953ce389ea9..f8700f4cdff4 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -116,13 +116,15 @@ describe('Subscription', function() { autoAck: true, interval: 100, maxInProgress: 3, - encoding: 'binary' + encoding: 'binary', + timeout: 30000 }; var sub = new Subscription(PUBSUB, CONFIG); assert.strictEqual(sub.autoAck, CONFIG.autoAck); assert.strictEqual(sub.interval, CONFIG.interval); - assert.strictEqual(sub.maxInProgress, 3); assert.strictEqual(sub.encoding, CONFIG.encoding); + assert.strictEqual(sub.maxInProgress, CONFIG.maxInProgress); + assert.strictEqual(sub.timeout, CONFIG.timeout); }); it('should be closed', function() { @@ -157,6 +159,10 @@ describe('Subscription', function() { assert.strictEqual(subscription.encoding, 'utf-8'); }); + it('should default timeout to 90 seconds', function() { + assert.strictEqual(subscription.timeout, 90000); + }); + it('should create an iam object', function() { assert.deepEqual(subscription.iam.calledWith_, [ PUBSUB, @@ -475,6 +481,22 @@ describe('Subscription', function() { subscription.pull({ maxResults: 1 }, assert.ifError); }); + it('should pass a timeout if specified', function(done) { + var timeout = 90000; + + var subscription = new Subscription(PUBSUB, { + name: SUB_NAME, + timeout: timeout + }); + + subscription.request = function(reqOpts) { + assert.strictEqual(reqOpts.timeout, timeout); + done(); + }; + + subscription.pull(assert.ifError); + }); + it('should pass error to callback', function(done) { var error = new Error('Error.'); subscription.request = function(reqOpts, callback) { @@ -486,6 +508,21 @@ describe('Subscription', function() { }); }); + it('should not return messages if request timed out', function(done) { + subscription.request = function(reqOpts, callback) { + var error = new Error(); + error.code = 'ETIMEDOUT'; + error.connect = false; + callback(error); + }; + + subscription.pull({}, function(err, messages) { + assert.ifError(err); + assert.deepEqual(messages, []); + done(); + }); + }); + it('should call formatMessage_ with encoding', function(done) { subscription.encoding = 'encoding-value';