diff --git a/lib/client.js b/lib/client.js index 13d4b84de2d..49682bc35e6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -62,6 +62,7 @@ class Parser extends HTTPParser { [HTTPParser.kOnHeadersComplete] ({ statusCode, headers }) { const { client, resumeSocket } = this const request = client[kQueue][client[kComplete]] + const { callback, signal, opaque } = request const skipBody = request.method === 'HEAD' assert(!this.read) @@ -69,12 +70,14 @@ class Parser extends HTTPParser { let body - if (request.callback) { - body = request.callback(null, { + if (callback) { + body = callback(null, { statusCode, headers: parseHeaders(headers), - opaque: request.opaque + opaque }, resumeSocket) + clearTimeout(request.timeout) + request.timeout = null request.callback = null request.opaque = null } @@ -87,15 +90,8 @@ class Parser extends HTTPParser { if (body) { this.body = body - if (request.signal) { - const onAbort = () => { - body(new RequestAbortedError(), null) - } - if ('addEventListener' in request.signal) { - request.signal.addEventListener('abort', onAbort) - } else { - request.signal.once('abort', onAbort) - } + if (signal) { + signal.once('error', body) } } else { this.next() @@ -815,11 +811,7 @@ function resume (client) { } const onFinished = (err) => { if (signal) { - if ('removeEventListener' in signal) { - signal.removeEventListener('abort', onAbort) - } else { - signal.removeListener('abort', onAbort) - } + signal.removeListener('error', onFinished) } socket @@ -857,11 +849,7 @@ function resume (client) { } if (signal) { - if ('addEventListener' in signal) { - signal.addEventListener('abort', onAbort) - } else { - signal.on('abort', onAbort) - } + signal.on('error', onFinished) } body diff --git a/lib/request.js b/lib/request.js index 2c6638f012c..c57bfed7248 100644 --- a/lib/request.js +++ b/lib/request.js @@ -3,8 +3,11 @@ const { AsyncResource } = require('async_hooks') const { InvalidArgumentError, - RequestAbortedError + RequestAbortedError, + TimeoutError } = require('./errors') +const EE = require('events') +const assert = require('assert') const methods = [ 'ACL', @@ -61,7 +64,7 @@ class Request extends AsyncResource { throw new InvalidArgumentError('no options passed') } - const { path, method, body, headers, idempotent, opaque, signal } = opts + const { path, method, body, headers, idempotent, opaque, signal, timeout } = opts if (!(typeof path === 'string' && path[0] === '/')) { throw new InvalidArgumentError('path must be a valid path') @@ -71,6 +74,10 @@ class Request extends AsyncResource { throw new InvalidArgumentError('method must be a valid method') } + if (timeout != null && (!Number.isInteger(timeout) || timeout < 1)) { + throw new InvalidArgumentError('timeout must be a positive integer') + } + if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { throw new InvalidArgumentError('signal must implement .on(name, callback)') } @@ -79,7 +86,9 @@ class Request extends AsyncResource { throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream') } - this.signal = signal + this.timeout = null + + this.signal = null this.method = method @@ -101,33 +110,59 @@ class Request extends AsyncResource { this.rawHeaders = '' - if (this.signal) { + if (headers) { + const headerNames = Object.keys(headers) + for (let i = 0; i < headerNames.length; i++) { + const name = headerNames[i] + this.rawHeaders += name + ': ' + headers[name] + '\r\n' + } + } + + if (signal) { + if (!this.signal) { + this.signal = new EE() + } + const onAbort = () => { + this.signal.emit('error', new RequestAbortedError()) + } + + if ('addEventListener' in signal) { + signal.addEventListener('abort', onAbort) + } else { + signal.once('abort', onAbort) + } + } + + if (timeout) { + if (!this.signal) { + this.signal = new EE() + } + + const onTimeout = () => { + this.signal.emit('error', new TimeoutError()) + } + + this.timeout = setTimeout(onTimeout, timeout) + } + + if (this.signal) { + this.signal.on('error', (err) => { + assert(err) + const { callback } = this if (!callback) { return } + clearTimeout(this.timeout) + this.timeout = null this.callback = null this.opaque = null - callback(new RequestAbortedError(), null) - } - - if ('addEventListener' in this.signal) { - this.signal.addEventListener('abort', onAbort) - } else { - this.signal.once('abort', onAbort) - } - } - - if (headers) { - const headerNames = Object.keys(headers) - for (let i = 0; i < headerNames.length; i++) { - const name = headerNames[i] - this.rawHeaders += name + ': ' + headers[name] + '\r\n' - } + callback(err, null) + }) } }