Skip to content

Commit

Permalink
fix: interceptor back-pressure
Browse files Browse the repository at this point in the history
Refs: #3368
Refs: #3370
  • Loading branch information
ronag committed Jun 25, 2024
1 parent dd98299 commit 7f7e3a0
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 44 deletions.
12 changes: 3 additions & 9 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,24 @@ class Agent extends DispatcherBase {
return ret
}

[kDispatch] (opts, handler) {
[kDispatch] (opts, handler, onDrain) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

let dispatcher = this[kClients].get(key)
const dispatcher = this[kClients].get(key)

if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

// This introduces a tiny memory leak, as dispatchers are never removed from the map.
// TODO(mcollina): remove te timer when the client/pool do not have any more
// active connections.
this[kClients].set(key, dispatcher)
}

return dispatcher.dispatch(opts, handler)
return dispatcher.dispatch(opts, handler, onDrain)
}

async [kClose] () {
Expand Down
37 changes: 27 additions & 10 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const connectH2 = require('./client-h2.js')
let deprecatedInterceptorWarned = false

const kClosedResolve = Symbol('kClosedResolve')
const kDrainQueue = Symbol('kDrainQueue')

function getPipelining (client) {
return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
Expand Down Expand Up @@ -243,6 +244,13 @@ class Client extends DispatcherBase {
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
this[kHTTPContext] = null

this[kDrainQueue] = []
this.on('drain', () => {
for (const callback of this[kDrainQueue].splice(0)) {
callback(null)
}
})

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
// | complete | running | pending |
Expand Down Expand Up @@ -299,26 +307,31 @@ class Client extends DispatcherBase {
this.once('connect', cb)
}

[kDispatch] (opts, handler) {
[kDispatch] (opts, handler, onDrain) {
const origin = opts.origin || this[kUrl].origin
const request = new Request(origin, opts, handler)

this[kQueue].push(request)
if (this[kResuming]) {
// Do nothing.
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
// Wait a tick in case stream/iterator is ended in the same tick.
this[kResuming] = 1
queueMicrotask(() => resume(this))
if (this[kBusy] && onDrain) {
this[kDrainQueue].push(onDrain)
return false
} else {
this[kResume](true)
this[kQueue].push(request)
if (this[kResuming]) {
// Do nothing.
} else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
// Wait a tick in case stream/iterator is ended in the same tick.
this[kResuming] = 1
queueMicrotask(() => resume(this))
} else {
this[kResume](true)
}
}

if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
this[kNeedDrain] = 2
}

return this[kNeedDrain] < 2
return onDrain ? true : this[kNeedDrain] < 2
}

async [kClose] () {
Expand All @@ -341,6 +354,10 @@ class Client extends DispatcherBase {
util.errorRequest(this, request, err)
}

for (const callback of this[kDrainQueue].splice(0)) {
callback(err)
}

const callback = () => {
if (this[kClosedResolve]) {
// TODO (fix): Should we error here with ClientDestroyedError?
Expand Down
10 changes: 5 additions & 5 deletions lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,21 @@ class DispatcherBase extends Dispatcher {
})
}

[kInterceptedDispatch] (opts, handler) {
[kInterceptedDispatch] (opts, handler, onDrain) {
if (!this[kInterceptors] || this[kInterceptors].length === 0) {
this[kInterceptedDispatch] = this[kDispatch]
return this[kDispatch](opts, handler)
return this[kDispatch](opts, handler, onDrain)
}

let dispatch = this[kDispatch].bind(this)
for (let i = this[kInterceptors].length - 1; i >= 0; i--) {
dispatch = this[kInterceptors][i](dispatch)
}
this[kInterceptedDispatch] = dispatch
return dispatch(opts, handler)
return dispatch(opts, handler, onDrain)
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
}
Expand All @@ -174,7 +174,7 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kInterceptedDispatch](opts, handler)
return this[kInterceptedDispatch](opts, handler, onDrain)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
Expand Down
12 changes: 10 additions & 2 deletions lib/dispatcher/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ class ComposedDispatcher extends Dispatcher {
this.#dispatch = dispatch
}

dispatch (...args) {
this.#dispatch(...args)
dispatch (opts, handler, onDrain) {
onDrain ??= (err) => {
if (err) {
handler.onError(err)
} else {
this.#dispatch(opts, handler, onDrain)
}
}

return this.#dispatch(opts, handler, onDrain)
}

close (...args) {
Expand Down
21 changes: 18 additions & 3 deletions lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const kGetDispatcher = Symbol('get dispatcher')
const kAddClient = Symbol('add client')
const kRemoveClient = Symbol('remove client')
const kStats = Symbol('stats')
const kDrainQueue = Symbol('kDrainQueue')

class PoolBase extends DispatcherBase {
constructor () {
Expand Down Expand Up @@ -69,6 +70,13 @@ class PoolBase extends DispatcherBase {
}

this[kStats] = new PoolStats(this)

this[kDrainQueue] = []
this.on('drain', () => {
for (const callback of this[kDrainQueue].splice(0)) {
callback(null)
}
})
}

get [kBusy] () {
Expand Down Expand Up @@ -122,6 +130,10 @@ class PoolBase extends DispatcherBase {
}

async [kDestroy] (err) {
for (const callback of this[kDrainQueue].splice(0)) {
callback(err)
}

while (true) {
const item = this[kQueue].shift()
if (!item) {
Expand All @@ -133,10 +145,13 @@ class PoolBase extends DispatcherBase {
return Promise.all(this[kClients].map(c => c.destroy(err)))
}

[kDispatch] (opts, handler) {
[kDispatch] (opts, handler, onDrain) {
const dispatcher = this[kGetDispatcher]()

if (!dispatcher) {
if (!dispatcher && onDrain) {
this[kDrainQueue].push(onDrain)
return false
} else if (!dispatcher) {
this[kNeedDrain] = true
this[kQueue].push({ opts, handler })
this[kQueued]++
Expand All @@ -145,7 +160,7 @@ class PoolBase extends DispatcherBase {
this[kNeedDrain] = !this[kGetDispatcher]()
}

return !this[kNeedDrain]
return onDrain ? true : !this[kNeedDrain]
}

[kAddClient] (client) {
Expand Down
5 changes: 3 additions & 2 deletions lib/dispatcher/proxy-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ProxyAgent extends DispatcherBase {
})
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
const headers = buildHeaders(opts.headers)
throwIfProxyAuthIsSent(headers)

Expand All @@ -121,7 +121,8 @@ class ProxyAgent extends DispatcherBase {
...opts,
headers
},
handler
handler,
onDrain
)
}

Expand Down
9 changes: 5 additions & 4 deletions lib/dispatcher/retry-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ const Dispatcher = require('./dispatcher')
const RetryHandler = require('../handler/retry-handler')

class RetryAgent extends Dispatcher {
#agent = null
#options = null
#agent
#options

constructor (agent, options = {}) {
super(options)
this.#agent = agent
this.#options = options
}

dispatch (opts, handler) {
dispatch (opts, handler, onDrain) {
const retry = new RetryHandler({
...opts,
retryOptions: this.#options
}, {
dispatch: this.#agent.dispatch.bind(this.#agent),
handler
})
return this.#agent.dispatch(opts, retry)
return this.#agent.dispatch(opts, retry, onDrain)
}

close () {
Expand Down
4 changes: 2 additions & 2 deletions lib/interceptor/dump.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function createDumpInterceptor (
}
) {
return dispatch => {
return function Intercept (opts, handler) {
return function Intercept (opts, handler, onDrain) {
const { dumpMaxSize = defaultMaxSize } =
opts

Expand All @@ -115,7 +115,7 @@ function createDumpInterceptor (
handler
)

return dispatch(opts, dumpHandler)
return dispatch(opts, dumpHandler, onDrain)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/interceptor/redirect-interceptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ const RedirectHandler = require('../handler/redirect-handler')

function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) {
return (dispatch) => {
return function Intercept (opts, handler) {
return function Intercept (opts, handler, onDrain) {
const { maxRedirections = defaultMaxRedirections } = opts

if (!maxRedirections) {
return dispatch(opts, handler)
return dispatch(opts, handler, onDrain)
}

const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler)
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
return dispatch(opts, redirectHandler)
return dispatch(opts, redirectHandler, onDrain)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/interceptor/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const RedirectHandler = require('../handler/redirect-handler')
module.exports = opts => {
const globalMaxRedirections = opts?.maxRedirections
return dispatch => {
return function redirectInterceptor (opts, handler) {
return function redirectInterceptor (opts, handler, onDrain) {
const { maxRedirections = globalMaxRedirections, ...baseOpts } = opts

if (!maxRedirections) {
Expand All @@ -18,7 +18,7 @@ module.exports = opts => {
handler
)

return dispatch(baseOpts, redirectHandler)
return dispatch(baseOpts, redirectHandler, onDrain)
}
}
}
5 changes: 3 additions & 2 deletions lib/interceptor/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const RetryHandler = require('../handler/retry-handler')

module.exports = globalOpts => {
return dispatch => {
return function retryInterceptor (opts, handler) {
return function retryInterceptor (opts, handler, onDrain) {
return dispatch(
opts,
new RetryHandler(
Expand All @@ -12,7 +12,8 @@ module.exports = globalOpts => {
handler,
dispatch
}
)
),
onDrain
)
}
}
Expand Down

0 comments on commit 7f7e3a0

Please sign in to comment.