Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport v6.x] fix: use fasttimers for all connection timeouts #3675

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,6 @@ undici-fetch.js
.npmrc

.tap

# File generated by /test/request-timeout.js
test/request-timeout.10mb.bin
3 changes: 3 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ lib/llhttp/llhttp.wasm
!index.d.ts
!docs/docs/**/*
!scripts/strip-comments.js

# File generated by /test/request-timeout.js
test/request-timeout.10mb.bin
69 changes: 48 additions & 21 deletions lib/core/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const util = require('./util')
const { InvalidArgumentError, ConnectTimeoutError } = require('./errors')
const timers = require('../util/timers')

function noop () {}

let tls // include tls conditionally since it is not always available

// TODO: session re-use does not wait for the first
Expand Down Expand Up @@ -96,6 +98,8 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess

const session = customSession || sessionCache.get(sessionKey) || null

port = port || 443

socket = tls.connect({
highWaterMark: 16384, // TLS in node can't have bigger HWM anyway...
...options,
Expand All @@ -105,7 +109,7 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
// TODO(HTTP/2): Add support for h2c
ALPNProtocols: allowH2 ? ['http/1.1', 'h2'] : ['http/1.1'],
socket: httpSocket, // upgrade socket connection
port: port || 443,
port,
host: hostname
})

Expand All @@ -116,11 +120,14 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
})
} else {
assert(!httpSocket, 'httpSocket can only be sent on TLS update')

port = port || 80

socket = net.connect({
highWaterMark: 64 * 1024, // Same as nodejs fs streams.
...options,
localAddress,
port: port || 80,
port,
host: hostname
})
}
Expand All @@ -131,12 +138,12 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
socket.setKeepAlive(true, keepAliveInitialDelay)
}

const cancelConnectTimeout = setupConnectTimeout(new WeakRef(socket), timeout)
const clearConnectTimeout = setupConnectTimeout(new WeakRef(socket), { timeout, hostname, port })

socket
.setNoDelay(true)
.once(protocol === 'https:' ? 'secureConnect' : 'connect', function () {
cancelConnectTimeout()
queueMicrotask(clearConnectTimeout)

if (callback) {
const cb = callback
Expand All @@ -145,7 +152,7 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
}
})
.on('error', function (err) {
cancelConnectTimeout()
queueMicrotask(clearConnectTimeout)

if (callback) {
const cb = callback
Expand All @@ -158,50 +165,70 @@ function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, sess
}
}

/**
* @param {WeakRef<net.Socket>} socketWeakRef
* @param {object} opts
* @param {number} opts.timeout
* @param {string} opts.hostname
* @param {number} opts.port
* @returns {() => void}
*/
const setupConnectTimeout = process.platform === 'win32'
? (socket, timeout) => {
if (!timeout) {
return () => { }
? (socketWeakRef, opts) => {
if (!opts.timeout) {
return noop
}

let s1 = null
let s2 = null
const timer = timers.setTimeout(() => {
const fastTimer = timers.setFastTimeout(() => {
// setImmediate is added to make sure that we prioritize socket error events over timeouts
s1 = setImmediate(() => {
// Windows needs an extra setImmediate probably due to implementation differences in the socket logic
s2 = setImmediate(() => onConnectTimeout(socket.deref()))
s2 = setImmediate(() => onConnectTimeout(socketWeakRef.deref(), opts))
})
}, timeout)
}, opts.timeout)
return () => {
timers.clearTimeout(timer)
timers.clearFastTimeout(fastTimer)
clearImmediate(s1)
clearImmediate(s2)
}
}
: (socket, timeout) => {
if (!timeout) {
return () => { }
: (socketWeakRef, opts) => {
if (!opts.timeout) {
return noop
}

let s1 = null
const timer = timers.setTimeout(() => {
const fastTimer = timers.setFastTimeout(() => {
// setImmediate is added to make sure that we prioritize socket error events over timeouts
s1 = setImmediate(() => {
onConnectTimeout(socket.deref())
onConnectTimeout(socketWeakRef.deref(), opts)
})
}, timeout)
}, opts.timeout)
return () => {
timers.clearTimeout(timer)
timers.clearFastTimeout(fastTimer)
clearImmediate(s1)
}
}

function onConnectTimeout (socket) {
/**
* @param {net.Socket} socket
* @param {object} opts
* @param {number} opts.timeout
* @param {string} opts.hostname
* @param {number} opts.port
*/
function onConnectTimeout (socket, opts) {
let message = 'Connect Timeout Error'
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
message += ` (attempted addresses: ${socket.autoSelectFamilyAttemptedAddresses.join(', ')})`
message += ` (attempted addresses: ${socket.autoSelectFamilyAttemptedAddresses.join(', ')},`
} else {
message += ` (attempted address: ${opts.hostname}:${opts.port},`
}

message += ` timeout: ${opts.timeout}ms)`

util.destroy(socket, new ConnectTimeoutError(message))
}

Expand Down
49 changes: 35 additions & 14 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,17 @@ let currentBufferRef = null
let currentBufferSize = 0
let currentBufferPtr = null

const TIMEOUT_HEADERS = 1
const TIMEOUT_BODY = 2
const TIMEOUT_IDLE = 3
const USE_NATIVE_TIMER = 0
const USE_FAST_TIMER = 1

// Use fast timers for headers and body to take eventual event loop
// latency into account.
const TIMEOUT_HEADERS = 2 | USE_FAST_TIMER
const TIMEOUT_BODY = 4 | USE_FAST_TIMER

// Use native timers to ignore event loop latency for keep-alive
// handling.
const TIMEOUT_KEEP_ALIVE = 8 | USE_NATIVE_TIMER

class Parser {
constructor (client, socket, { exports }) {
Expand Down Expand Up @@ -165,25 +173,38 @@ class Parser {
}

setTimeout (delay, type) {
this.timeoutType = type
if (delay !== this.timeoutValue) {
this.timeout && timers.clearTimeout(this.timeout)
// If the existing timer and the new timer are of different timer type
// (fast or native) or have different delay, we need to clear the existing
// timer and set a new one.
if (
delay !== this.timeoutValue ||
(type & USE_FAST_TIMER) ^ (this.timeoutType & USE_FAST_TIMER)
) {
// If a timeout is already set, clear it with clearTimeout of the fast
// timer implementation, as it can clear fast and native timers.
if (this.timeout) {
timers.clearTimeout(this.timeout)
this.timeout = null
}

if (delay) {
this.timeout = timers.setTimeout(onParserTimeout, delay, new WeakRef(this))
// istanbul ignore else: only for jest
if (this.timeout.unref) {
if (type & USE_FAST_TIMER) {
this.timeout = timers.setFastTimeout(onParserTimeout, delay, new WeakRef(this))
} else {
this.timeout = setTimeout(onParserTimeout, delay, new WeakRef(this))
this.timeout.unref()
}
} else {
this.timeout = null
}

this.timeoutValue = delay
} else if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
this.timeout.refresh()
}
}

this.timeoutType = type
}

resume () {
Expand Down Expand Up @@ -624,7 +645,7 @@ function onParserTimeout (parser) {
if (!paused) {
util.destroy(socket, new BodyTimeoutError())
}
} else if (timeoutType === TIMEOUT_IDLE) {
} else if (timeoutType === TIMEOUT_KEEP_ALIVE) {
assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
util.destroy(socket, new InformationalError('socket idle timeout'))
}
Expand Down Expand Up @@ -802,8 +823,8 @@ function resumeH1 (client) {
}

if (client[kSize] === 0) {
if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
if (socket[kParser].timeoutType !== TIMEOUT_KEEP_ALIVE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_KEEP_ALIVE)
}
} else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
Expand Down
49 changes: 48 additions & 1 deletion lib/util/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ module.exports = {
* The clearTimeout method cancels an instantiated Timer previously created
* by calling setTimeout.
*
* @param {FastTimer} timeout
* @param {NodeJS.Timeout|FastTimer} timeout
*/
clearTimeout (timeout) {
// If the timeout is a FastTimer, call its own clear method.
Expand All @@ -362,6 +362,29 @@ module.exports = {
nativeClearTimeout(timeout)
}
},
/**
* The setFastTimeout() method sets a fastTimer which executes a function once
* the timer expires.
* @param {Function} callback A function to be executed after the timer
* expires.
* @param {number} delay The time, in milliseconds that the timer should
* wait before the specified function or code is executed.
* @param {*} [arg] An optional argument to be passed to the callback function
* when the timer expires.
* @returns {FastTimer}
*/
setFastTimeout (callback, delay, arg) {
return new FastTimer(callback, delay, arg)
},
/**
* The clearTimeout method cancels an instantiated FastTimer previously
* created by calling setFastTimeout.
*
* @param {FastTimer} timeout
*/
clearFastTimeout (timeout) {
timeout.clear()
},
/**
* The now method returns the value of the internal fast timer clock.
*
Expand All @@ -370,6 +393,30 @@ module.exports = {
now () {
return fastNow
},
/**
* Trigger the onTick function to process the fastTimers array.
* Exported for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
* @deprecated
* @param {number} [delay=0] The delay in milliseconds to add to the now value.
*/
tick (delay = 0) {
fastNow += delay - RESOLUTION_MS + 1
onTick()
onTick()
},
/**
* Reset FastTimers.
* Exported for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
* @deprecated
*/
reset () {
fastNow = 0
fastTimers.length = 0
clearTimeout(fastNowTimeout)
fastNowTimeout = null
},
/**
* Exporting for testing purposes only.
* Marking as deprecated to discourage any use outside of testing.
Expand Down
11 changes: 8 additions & 3 deletions test/connect-timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ const skip = !!process.env.CITGM

// Using describe instead of test to avoid the timeout
describe('prioritize socket errors over timeouts', { skip }, async () => {
const t = tspl({ ...assert, after: () => {} }, { plan: 1 })
const t = tspl({ ...assert, after: () => {} }, { plan: 2 })
const client = new Pool('http://foorbar.invalid:1234', { connectTimeout: 1 })

client.request({ method: 'GET', path: '/foobar' })
.then(() => t.fail())
.catch((err) => {
t.strictEqual(err.code, 'ENOTFOUND')
t.strictEqual(err.code !== 'UND_ERR_CONNECT_TIMEOUT', true)
})

Expand All @@ -32,7 +33,7 @@ net.connect = function (options) {
}

test('connect-timeout', { skip }, async t => {
t = tspl(t, { plan: 1 })
t = tspl(t, { plan: 3 })

const client = new Client('http://localhost:9000', {
connectTimeout: 1e3
Expand All @@ -48,14 +49,16 @@ test('connect-timeout', { skip }, async t => {
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ConnectTimeoutError)
t.strictEqual(err.code, 'UND_ERR_CONNECT_TIMEOUT')
t.strictEqual(err.message, 'Connect Timeout Error (attempted address: localhost:9000, timeout: 1000ms)')
clearTimeout(timeout)
})

await t.completed
})

test('connect-timeout', { skip }, async t => {
t = tspl(t, { plan: 1 })
t = tspl(t, { plan: 3 })

const client = new Pool('http://localhost:9000', {
connectTimeout: 1e3
Expand All @@ -71,6 +74,8 @@ test('connect-timeout', { skip }, async t => {
method: 'GET'
}, (err) => {
t.ok(err instanceof errors.ConnectTimeoutError)
t.strictEqual(err.code, 'UND_ERR_CONNECT_TIMEOUT')
t.strictEqual(err.message, 'Connect Timeout Error (attempted address: localhost:9000, timeout: 1000ms)')
clearTimeout(timeout)
})

Expand Down
5 changes: 4 additions & 1 deletion test/issue-3356.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
const { createServer } = require('node:http')
const { once } = require('node:events')

const { tick: fastTimersTick } = require('../lib/util/timers')
const { fetch, Agent, RetryAgent } = require('..')

test('https://github.com/nodejs/undici/issues/3356', async (t) => {
Expand Down Expand Up @@ -42,6 +42,9 @@ test('https://github.com/nodejs/undici/issues/3356', async (t) => {
const response = await fetch(`http://localhost:${server.address().port}`, {
dispatcher: agent
})

fastTimersTick()

setTimeout(async () => {
try {
t.equal(response.status, 200)
Expand Down
Loading
Loading