From d006e9018e18bc83212224fdffa09281c7b1c1e6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 27 Nov 2024 12:36:07 +0100 Subject: [PATCH] fix: port retry to new hooks (#3883) * fix: port retry to new hooks * fixup * fixup * fixup * fixup: unref * fixup * fixup --- lib/dispatcher/dispatcher-base.js | 6 +- lib/handler/retry-handler.js | 165 ++++++++++++------------------ lib/handler/wrap-handler.js | 2 +- test/node-test/client-dispatch.js | 2 +- test/retry-handler.js | 86 +++------------- 5 files changed, 83 insertions(+), 178 deletions(-) diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index cb3f0e02a3c..615754d0fb5 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -130,6 +130,8 @@ class DispatcherBase extends Dispatcher { throw new InvalidArgumentError('handler must be an object') } + handler = UnwrapHandler.unwrap(handler) + try { if (!opts || typeof opts !== 'object') { throw new InvalidArgumentError('opts must be an object.') @@ -143,10 +145,10 @@ class DispatcherBase extends Dispatcher { throw new ClientClosedError() } - return this[kDispatch](opts, UnwrapHandler.unwrap(handler)) + return this[kDispatch](opts, handler) } catch (err) { if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') + throw err } handler.onError(err) diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index dc3449c8ebe..f469f9df343 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -3,9 +3,9 @@ const assert = require('node:assert') const { kRetryHandlerDefaultRetry } = require('../core/symbols') const { RequestRetryError } = require('../core/errors') +const WrapHandler = require('./wrap-handler') const { isDisturbed, - parseHeaders, parseRangeHeader, wrapRequestBody } = require('../core/util') @@ -16,7 +16,7 @@ function calculateRetryAfterHeader (retryAfter) { } class RetryHandler { - constructor (opts, handlers) { + constructor (opts, { dispatch, handler }) { const { retryOptions, ...dispatchOpts } = opts const { // Retry scoped @@ -32,12 +32,9 @@ class RetryHandler { statusCodes } = retryOptions ?? {} - this.dispatch = handlers.dispatch - this.handler = handlers.handler + this.dispatch = dispatch + this.handler = WrapHandler.wrap(handler) this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) } - this.abort = null - this.aborted = false - this.connectCalled = false this.retryOpts = { retry: retryFn ?? RetryHandler[kRetryHandlerDefaultRetry], retryAfter: retryAfter ?? true, @@ -65,37 +62,20 @@ class RetryHandler { this.retryCount = 0 this.retryCountCheckpoint = 0 + this.headersSent = false this.start = 0 this.end = null this.etag = null - this.resume = null } - onRequestSent () { - if (this.handler.onRequestSent) { - this.handler.onRequestSent() + onRequestStart (controller, context) { + if (!this.headersSent) { + this.handler.onRequestStart?.(controller, context) } } - onUpgrade (statusCode, headers, socket) { - if (this.handler.onUpgrade) { - this.handler.onUpgrade(statusCode, headers, socket) - } - } - - onConnect (abort, context) { - this.abort = abort - if (!this.connectCalled) { - this.connectCalled = true - this.handler.onConnect(reason => { - this.aborted = true - this.abort(reason) - }, context) - } - } - - onBodySent (chunk) { - if (this.handler.onBodySent) return this.handler.onBodySent(chunk) + onRequestUpgrade (controller, statusCode, headers, socket) { + this.handler.onRequestUpgrade?.(controller, statusCode, headers, socket) } static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) { @@ -153,83 +133,68 @@ class RetryHandler { ? Math.min(retryAfterHeader, maxTimeout) : Math.min(minTimeout * timeoutFactor ** (counter - 1), maxTimeout) - setTimeout(() => cb(null), retryTimeout) + setTimeout(() => cb(null), retryTimeout).unref() } - onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const headers = parseHeaders(rawHeaders) - + onResponseStart (controller, statusCode, headers, statusMessage) { this.retryCount += 1 if (statusCode >= 300) { if (this.retryOpts.statusCodes.includes(statusCode) === false) { - return this.handler.onHeaders( + this.headersSent = true + this.handler.onResponseStart?.( + controller, statusCode, - rawHeaders, - resume, + headers, statusMessage ) + return } else { - this.abort( - new RequestRetryError('Request failed', statusCode, { - headers, - data: { - count: this.retryCount - } - }) - ) - return false + throw new RequestRetryError('Request failed', statusCode, { + headers, + data: { + count: this.retryCount + } + }) } } // Checkpoint for resume from where we left it - if (this.resume != null) { - this.resume = null - + if (this.headersSent) { // Only Partial Content 206 supposed to provide Content-Range, // any other status code that partially consumed the payload // should not be retried because it would result in downstream // wrongly concatenate multiple responses. if (statusCode !== 206 && (this.start > 0 || statusCode !== 200)) { - this.abort( - new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, { - headers, - data: { count: this.retryCount } - }) - ) - return false + throw new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, { + headers, + data: { count: this.retryCount } + }) } const contentRange = parseRangeHeader(headers['content-range']) // If no content range if (!contentRange) { - this.abort( - new RequestRetryError('Content-Range mismatch', statusCode, { - headers, - data: { count: this.retryCount } - }) - ) - return false + throw new RequestRetryError('Content-Range mismatch', statusCode, { + headers, + data: { count: this.retryCount } + }) } // Let's start with a weak etag check if (this.etag != null && this.etag !== headers.etag) { - this.abort( - new RequestRetryError('ETag mismatch', statusCode, { - headers, - data: { count: this.retryCount } - }) - ) - return false + throw new RequestRetryError('ETag mismatch', statusCode, { + headers, + data: { count: this.retryCount } + }) } - const { start, size, end = size - 1 } = contentRange + const { start, size, end = size ? size - 1 : null } = contentRange assert(this.start === start, 'content-range mismatch') assert(this.end == null || this.end === end, 'content-range mismatch') - this.resume = resume - return true + return } if (this.end == null) { @@ -238,15 +203,17 @@ class RetryHandler { const range = parseRangeHeader(headers['content-range']) if (range == null) { - return this.handler.onHeaders( + this.headersSent = true + this.handler.onResponseStart?.( + controller, statusCode, - rawHeaders, - resume, + headers, statusMessage ) + return } - const { start, size, end = size - 1 } = range + const { start, size, end = size ? size - 1 : null } = range assert( start != null && Number.isFinite(start), 'content-range mismatch' @@ -269,7 +236,7 @@ class RetryHandler { 'invalid content-length' ) - this.resume = resume + this.resume = true this.etag = headers.etag != null ? headers.etag : null // Weak etags are not useful for comparison nor cache @@ -283,38 +250,36 @@ class RetryHandler { this.etag = null } - return this.handler.onHeaders( + this.headersSent = true + this.handler.onResponseStart?.( + controller, statusCode, - rawHeaders, - resume, + headers, statusMessage ) + } else { + throw new RequestRetryError('Request failed', statusCode, { + headers, + data: { count: this.retryCount } + }) } - - const err = new RequestRetryError('Request failed', statusCode, { - headers, - data: { count: this.retryCount } - }) - - this.abort(err) - - return false } - onData (chunk) { + onResponseData (controller, chunk) { this.start += chunk.length - return this.handler.onData(chunk) + this.handler.onResponseData?.(controller, chunk) } - onComplete (rawTrailers) { + onResponseEnd (controller, trailers) { this.retryCount = 0 - return this.handler.onComplete(rawTrailers) + return this.handler.onResponseEnd?.(controller, trailers) } - onError (err) { - if (this.aborted || isDisturbed(this.opts.body)) { - return this.handler.onError(err) + onResponseError (controller, err) { + if (!controller || controller.aborted || isDisturbed(this.opts.body)) { + this.handler.onResponseError?.(controller, err) + return } // We reconcile in case of a mix between network errors @@ -343,8 +308,8 @@ class RetryHandler { * @returns */ function onRetry (err) { - if (err != null || this.aborted || isDisturbed(this.opts.body)) { - return this.handler.onError(err) + if (err != null || controller?.aborted || isDisturbed(this.opts.body)) { + return this.handler.onResponseError?.(controller, err) } if (this.start !== 0) { @@ -368,7 +333,7 @@ class RetryHandler { this.retryCountCheckpoint = this.retryCount this.dispatch(this.opts, this) } catch (err) { - this.handler.onError(err) + this.handler.onResponseError?.(controller, err) } } } diff --git a/lib/handler/wrap-handler.js b/lib/handler/wrap-handler.js index 271bfb8a177..9a0dee3d069 100644 --- a/lib/handler/wrap-handler.js +++ b/lib/handler/wrap-handler.js @@ -38,7 +38,7 @@ module.exports = class WrapHandler { onError (err) { if (!this.#handler.onError) { - throw new InvalidArgumentError('invalid onError method') + throw err } return this.#handler.onError?.(err) diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index 1a0680916a3..8b7991a3156 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -638,7 +638,7 @@ test('dispatch pool onError missing', async (t) => { }) } catch (err) { p.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - p.strictEqual(err.message, 'invalid onError method') + p.strictEqual(err.message, 'upgrade must be a string') } }) diff --git a/test/retry-handler.js b/test/retry-handler.js index 1f33fc532e1..a52324c086d 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -10,7 +10,7 @@ const { RetryHandler, Client } = require('..') const { RequestHandler } = require('../lib/api/api-request') test('Should retry status code', async t => { - t = tspl(t, { plan: 4 }) + t = tspl(t, { plan: 3 }) let counter = 0 const chunks = [] @@ -64,9 +64,6 @@ test('Should retry status code', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -108,7 +105,7 @@ test('Should retry status code', async t => { }) test('Should account for network and response errors', async t => { - t = tspl(t, { plan: 4 }) + t = tspl(t, { plan: 3 }) let counter = 0 const chunks = [] @@ -162,9 +159,6 @@ test('Should account for network and response errors', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -206,7 +200,7 @@ test('Should account for network and response errors', async t => { }) test('Issue #3288 - request with body (asynciterable)', async t => { - t = tspl(t, { plan: 6 }) + t = tspl(t, { plan: 4 }) const server = createServer() const dispatchOptions = { method: 'POST', @@ -236,11 +230,7 @@ test('Issue #3288 - request with body (asynciterable)', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { - t.strictEqual(status, 500) return true }, onData (chunk) { @@ -274,7 +264,7 @@ test('Issue #3288 - request with body (asynciterable)', async t => { }) test('Should use retry-after header for retries', async t => { - t = tspl(t, { plan: 4 }) + t = tspl(t, { plan: 3 }) let counter = 0 const chunks = [] @@ -317,9 +307,6 @@ test('Should use retry-after header for retries', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -360,7 +347,7 @@ test('Should use retry-after header for retries', async t => { }) test('Should use retry-after header for retries (date)', async t => { - t = tspl(t, { plan: 4 }) + t = tspl(t, { plan: 3 }) let counter = 0 const chunks = [] @@ -405,9 +392,6 @@ test('Should use retry-after header for retries (date)', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -490,9 +474,6 @@ test('Should retry with defaults', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -533,7 +514,7 @@ test('Should retry with defaults', async t => { }) test('Should handle 206 partial content', async t => { - t = tspl(t, { plan: 8 }) + t = tspl(t, { plan: 6 }) const chunks = [] let counter = 0 @@ -586,15 +567,9 @@ test('Should handle 206 partial content', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, _resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -636,7 +611,7 @@ test('Should handle 206 partial content', async t => { }) test('Should handle 206 partial content - bad-etag', async t => { - t = tspl(t, { plan: 8 }) + t = tspl(t, { plan: 7 }) const chunks = [] @@ -680,11 +655,7 @@ test('Should handle 206 partial content - bad-etag', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (_status, _rawHeaders, _resume, _statusMessage) { - t.ok(true, 'pass') return true }, onData (chunk) { @@ -972,9 +943,6 @@ test('should not error if request is not meant to be retried', async t => { onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 400) return true @@ -1087,7 +1055,7 @@ test('Should be able to properly pass the minTimeout to the RetryContext when co }) test('Issue#2986 - Handle custom 206', async t => { - t = tspl(t, { plan: 8 }) + t = tspl(t, { plan: 6 }) const chunks = [] let counter = 0 @@ -1140,15 +1108,9 @@ test('Issue#2986 - Handle custom 206', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -1191,7 +1153,7 @@ test('Issue#2986 - Handle custom 206', async t => { }) test('Issue#3128 - Support if-match', async t => { - t = tspl(t, { plan: 9 }) + t = tspl(t, { plan: 7 }) const chunks = [] let counter = 0 @@ -1246,15 +1208,9 @@ test('Issue#3128 - Support if-match', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -1297,7 +1253,7 @@ test('Issue#3128 - Support if-match', async t => { }) test('Issue#3128 - Should ignore weak etags', async t => { - t = tspl(t, { plan: 9 }) + t = tspl(t, { plan: 7 }) const chunks = [] let counter = 0 @@ -1352,15 +1308,9 @@ test('Issue#3128 - Should ignore weak etags', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -1403,7 +1353,7 @@ test('Issue#3128 - Should ignore weak etags', async t => { }) test('Weak etags are ignored on range-requests', async t => { - t = tspl(t, { plan: 9 }) + t = tspl(t, { plan: 7 }) const chunks = [] let counter = 0 @@ -1458,15 +1408,9 @@ test('Weak etags are ignored on range-requests', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, resume, _statusMessage) { t.strictEqual(status, 200) return true @@ -1509,7 +1453,7 @@ test('Weak etags are ignored on range-requests', async t => { }) test('Should throw RequestRetryError when Content-Range mismatch', async t => { - t = tspl(t, { plan: 10 }) + t = tspl(t, { plan: 8 }) const chunks = [] @@ -1559,15 +1503,9 @@ test('Should throw RequestRetryError when Content-Range mismatch', async t => { return client.dispatch(...args) }, handler: { - onRequestSent () { - t.ok(true, 'pass') - }, onConnect () { t.ok(true, 'pass') }, - onBodySent () { - t.ok(true, 'pass') - }, onHeaders (status, _rawHeaders, _resume, _statusMessage) { t.strictEqual(status, 200) return true