Skip to content

Commit

Permalink
Dispatch compose (nodejs#2795)
Browse files Browse the repository at this point in the history
* feat: new interceptors API

* WIP: compose
  • Loading branch information
ronag authored Feb 23, 2024
1 parent cba8e0f commit 649185a
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 504 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ fuzz-results-*.json
# Bundle output
undici-fetch.js
/test/imports/undici-import.js

.tap
15 changes: 5 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ const MockClient = require('./lib/mock/mock-client')
const MockAgent = require('./lib/mock/mock-agent')
const MockPool = require('./lib/mock/mock-pool')
const mockErrors = require('./lib/mock/mock-errors')
const ProxyAgent = require('./lib/proxy-agent')
const RetryAgent = require('./lib/retry-agent')
const RetryHandler = require('./lib/handler/RetryHandler')
const { getGlobalDispatcher, setGlobalDispatcher } = require('./lib/global')
const DecoratorHandler = require('./lib/handler/DecoratorHandler')
const RedirectHandler = require('./lib/handler/RedirectHandler')

Object.assign(Dispatcher.prototype, api)

Expand All @@ -28,12 +23,12 @@ module.exports.Client = Client
module.exports.Pool = Pool
module.exports.BalancedPool = BalancedPool
module.exports.Agent = Agent
module.exports.ProxyAgent = ProxyAgent
module.exports.RetryAgent = RetryAgent
module.exports.RetryHandler = RetryHandler

module.exports.DecoratorHandler = DecoratorHandler
module.exports.RedirectHandler = RedirectHandler
module.exports.interceptor = {
redirect: require('./lib/interceptor/redirect'),
retry: require('./lib/interceptor/retry'),
proxy: require('./lib/interceptor/proxy')
}

module.exports.buildConnector = buildConnector
module.exports.errors = errors
Expand Down
18 changes: 4 additions & 14 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class ConnectHandler extends AsyncResource {
Expand Down Expand Up @@ -91,19 +91,9 @@ function connect (opts, callback) {
}

try {
const connectHandler = new ConnectHandler(opts, callback)
const connectOptions = { ...opts, method: 'CONNECT' }

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(connectOptions, connectHandler)
return
}

this.dispatch(connectOptions, connectHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'CONNECT' }, new ConnectHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
14 changes: 4 additions & 10 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

const kResume = Symbol('resume')
Expand Down Expand Up @@ -241,15 +241,9 @@ function pipeline (opts, handler) {
try {
const pipelineHandler = new PipelineHandler(opts, handler)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)({ ...opts, body: pipelineHandler.req }, pipelineHandler)
} else {
this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
}
this
.compose(redirect(opts))
.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

return pipelineHandler.ret
} catch (err) {
Expand Down
17 changes: 4 additions & 13 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -167,18 +167,9 @@ function request (opts, callback) {
}

try {
const handler = new RequestHandler(opts, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
17 changes: 4 additions & 13 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -208,18 +208,9 @@ function stream (opts, factory, callback) {
}

try {
const handler = new StreamHandler(opts, factory, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
22 changes: 4 additions & 18 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class UpgradeHandler extends AsyncResource {
Expand Down Expand Up @@ -88,23 +88,9 @@ function upgrade (opts, callback) {
}

try {
const upgradeHandler = new UpgradeHandler(opts, callback)
const upgradeOpts = {
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(upgradeOpts, upgradeHandler)
return
}

this.dispatch(upgradeOpts, upgradeHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'GET', upgrade: opts?.protocol || 'Websocket' }, new UpgradeHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
24 changes: 24 additions & 0 deletions lib/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

const EventEmitter = require('node:events')

const kDispatcherVersion = Symbol.for('undici.dispatcher.version')

class Dispatcher extends EventEmitter {
[kDispatcherVersion] = 1

dispatch () {
throw new Error('not implemented')
}
Expand All @@ -14,6 +18,26 @@ class Dispatcher extends EventEmitter {
destroy () {
throw new Error('not implemented')
}

compose (...interceptors) {
let dispatcher = this
for (const interceptor of interceptors) {
if (interceptor == null) {
continue
}

if (typeof interceptor !== 'function') {
throw new Error('invalid interceptor')
}

dispatcher = interceptor(dispatcher) ?? dispatcher

if (dispatcher[kDispatcherVersion] !== 1) {
throw new Error('invalid dispatcher')
}
}
return dispatcher
}
}

module.exports = Dispatcher
31 changes: 0 additions & 31 deletions lib/handler/DecoratorHandler.js

This file was deleted.

38 changes: 29 additions & 9 deletions lib/proxy-agent.js → lib/interceptor/proxy.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'use strict'

const { kProxy, kClose, kDestroy } = require('./core/symbols')
const { kProxy, kClose, kDestroy } = require('../core/symbols')
const { URL } = require('node:url')
const Agent = require('./agent')
const Pool = require('./pool')
const DispatcherBase = require('./dispatcher-base')
const { InvalidArgumentError, RequestAbortedError } = require('./core/errors')
const buildConnector = require('./core/connect')
const Agent = require('../agent')
const Pool = require('../pool')
const DispatcherBase = require('../dispatcher-base')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const buildConnector = require('../core/connect')

const kAgent = Symbol('proxy agent')
const kClient = Symbol('proxy client')
Expand Down Expand Up @@ -39,10 +39,10 @@ function defaultFactory (origin, opts) {
}

class ProxyAgent extends DispatcherBase {
constructor (opts) {
constructor (dispatcher, opts) {
super(opts)
this[kProxy] = buildProxyOptions(opts)
this[kAgent] = new Agent(opts)
this[kAgent] = dispatcher

if (typeof opts === 'string') {
opts = { uri: opts }
Expand Down Expand Up @@ -183,4 +183,24 @@ function throwIfProxyAuthIsSent (headers) {
}
}

module.exports = ProxyAgent
module.exports = (opts) => {
if (typeof opts === 'string') {
opts = { uri: opts }
}

if (!opts || !opts.uri) {
throw new InvalidArgumentError('Proxy opts.uri is mandatory')
}

const { clientFactory = defaultFactory } = opts

if (typeof clientFactory !== 'function') {
throw new InvalidArgumentError('Proxy opts.clientFactory must be a function.')
}

if (opts.auth && opts.token) {
throw new InvalidArgumentError('opts.auth cannot be used in combination with opts.token')
}

return (dispatcher) => new ProxyAgent(dispatcher, opts)
}
Loading

0 comments on commit 649185a

Please sign in to comment.