Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: port retry to new hooks #3883

Merged
merged 7 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class DispatcherBase extends Dispatcher {
throw new InvalidArgumentError('handler must be an object')
}

handler = UnwrapHandler.unwrap(handler)

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
Expand All @@ -143,10 +145,10 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kDispatch](opts, UnwrapHandler.unwrap(handler))
return this[kDispatch](opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
throw err
}

handler.onError(err)
Expand Down
165 changes: 65 additions & 100 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ const assert = require('node:assert')

const { kRetryHandlerDefaultRetry } = require('../core/symbols')
const { RequestRetryError } = require('../core/errors')
const WrapHandler = require('./wrap-handler')
const {
isDisturbed,
parseHeaders,
parseRangeHeader,
wrapRequestBody
} = require('../core/util')
Expand All @@ -16,7 +16,7 @@ function calculateRetryAfterHeader (retryAfter) {
}

class RetryHandler {
constructor (opts, handlers) {
constructor (opts, { dispatch, handler }) {
const { retryOptions, ...dispatchOpts } = opts
const {
// Retry scoped
Expand All @@ -32,12 +32,9 @@ class RetryHandler {
statusCodes
} = retryOptions ?? {}

this.dispatch = handlers.dispatch
this.handler = handlers.handler
this.dispatch = dispatch
this.handler = WrapHandler.wrap(handler)
this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) }
this.abort = null
this.aborted = false
this.connectCalled = false
this.retryOpts = {
retry: retryFn ?? RetryHandler[kRetryHandlerDefaultRetry],
retryAfter: retryAfter ?? true,
Expand Down Expand Up @@ -65,37 +62,20 @@ class RetryHandler {

this.retryCount = 0
this.retryCountCheckpoint = 0
this.headersSent = false
this.start = 0
this.end = null
this.etag = null
this.resume = null
}

onRequestSent () {
if (this.handler.onRequestSent) {
this.handler.onRequestSent()
onRequestStart (controller, context) {
if (!this.headersSent) {
this.handler.onRequestStart?.(controller, context)
}
}

onUpgrade (statusCode, headers, socket) {
if (this.handler.onUpgrade) {
this.handler.onUpgrade(statusCode, headers, socket)
}
}

onConnect (abort, context) {
this.abort = abort
if (!this.connectCalled) {
this.connectCalled = true
this.handler.onConnect(reason => {
this.aborted = true
this.abort(reason)
}, context)
}
}

onBodySent (chunk) {
if (this.handler.onBodySent) return this.handler.onBodySent(chunk)
onRequestUpgrade (controller, statusCode, headers, socket) {
this.handler.onRequestUpgrade?.(controller, statusCode, headers, socket)
}

static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) {
Expand Down Expand Up @@ -153,83 +133,68 @@ class RetryHandler {
? Math.min(retryAfterHeader, maxTimeout)
: Math.min(minTimeout * timeoutFactor ** (counter - 1), maxTimeout)

setTimeout(() => cb(null), retryTimeout)
setTimeout(() => cb(null), retryTimeout).unref()
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const headers = parseHeaders(rawHeaders)

onResponseStart (controller, statusCode, headers, statusMessage) {
this.retryCount += 1

if (statusCode >= 300) {
if (this.retryOpts.statusCodes.includes(statusCode) === false) {
return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
return
} else {
this.abort(
new RequestRetryError('Request failed', statusCode, {
headers,
data: {
count: this.retryCount
}
})
)
return false
throw new RequestRetryError('Request failed', statusCode, {
headers,
data: {
count: this.retryCount
}
})
}
}

// Checkpoint for resume from where we left it
if (this.resume != null) {
this.resume = null

if (this.headersSent) {
// Only Partial Content 206 supposed to provide Content-Range,
// any other status code that partially consumed the payload
// should not be retried because it would result in downstream
// wrongly concatenate multiple responses.
if (statusCode !== 206 && (this.start > 0 || statusCode !== 200)) {
this.abort(
new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const contentRange = parseRangeHeader(headers['content-range'])
// If no content range
if (!contentRange) {
this.abort(
new RequestRetryError('Content-Range mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('Content-Range mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
}

// Let's start with a weak etag check
if (this.etag != null && this.etag !== headers.etag) {
this.abort(
new RequestRetryError('ETag mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('ETag mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const { start, size, end = size - 1 } = contentRange
const { start, size, end = size ? size - 1 : null } = contentRange

assert(this.start === start, 'content-range mismatch')
assert(this.end == null || this.end === end, 'content-range mismatch')

this.resume = resume
return true
return
}

if (this.end == null) {
Expand All @@ -238,15 +203,17 @@ class RetryHandler {
const range = parseRangeHeader(headers['content-range'])

if (range == null) {
return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
return
}

const { start, size, end = size - 1 } = range
const { start, size, end = size ? size - 1 : null } = range
assert(
start != null && Number.isFinite(start),
'content-range mismatch'
Expand All @@ -269,7 +236,7 @@ class RetryHandler {
'invalid content-length'
)

this.resume = resume
this.resume = true
this.etag = headers.etag != null ? headers.etag : null

// Weak etags are not useful for comparison nor cache
Expand All @@ -283,38 +250,36 @@ class RetryHandler {
this.etag = null
}

return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
} else {
throw new RequestRetryError('Request failed', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const err = new RequestRetryError('Request failed', statusCode, {
headers,
data: { count: this.retryCount }
})

this.abort(err)

return false
}

onData (chunk) {
onResponseData (controller, chunk) {
this.start += chunk.length

return this.handler.onData(chunk)
this.handler.onResponseData?.(controller, chunk)
}

onComplete (rawTrailers) {
onResponseEnd (controller, trailers) {
this.retryCount = 0
return this.handler.onComplete(rawTrailers)
return this.handler.onResponseEnd?.(controller, trailers)
}

onError (err) {
if (this.aborted || isDisturbed(this.opts.body)) {
return this.handler.onError(err)
onResponseError (controller, err) {
if (!controller || controller.aborted || isDisturbed(this.opts.body)) {
this.handler.onResponseError?.(controller, err)
return
}

// We reconcile in case of a mix between network errors
Expand Down Expand Up @@ -343,8 +308,8 @@ class RetryHandler {
* @returns
*/
function onRetry (err) {
if (err != null || this.aborted || isDisturbed(this.opts.body)) {
return this.handler.onError(err)
if (err != null || controller?.aborted || isDisturbed(this.opts.body)) {
return this.handler.onResponseError?.(controller, err)
}

if (this.start !== 0) {
Expand All @@ -368,7 +333,7 @@ class RetryHandler {
this.retryCountCheckpoint = this.retryCount
this.dispatch(this.opts, this)
} catch (err) {
this.handler.onError(err)
this.handler.onResponseError?.(controller, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/handler/wrap-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module.exports = class WrapHandler {

onError (err) {
if (!this.#handler.onError) {
throw new InvalidArgumentError('invalid onError method')
throw err
}

return this.#handler.onError?.(err)
Expand Down
2 changes: 1 addition & 1 deletion test/node-test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ test('dispatch pool onError missing', async (t) => {
})
} catch (err) {
p.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
p.strictEqual(err.message, 'invalid onError method')
p.strictEqual(err.message, 'upgrade must be a string')
}
})

Expand Down
Loading
Loading