Skip to content

Commit

Permalink
chore: abstract http message parser
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito committed Mar 4, 2024
1 parent cff7820 commit 5d9a812
Showing 1 changed file with 91 additions and 84 deletions.
175 changes: 91 additions & 84 deletions src/interceptors/Socket/SocketInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ export class SocketInterceptor extends Interceptor<SocketEventMap> {
// Otherwise, listen to the original response
// and forward it to the interceptor.
controller.onResponse = (response, isMockedResponse) => {
console.log('onResponse callback')

self.emitter.emit('response', {
requestId,
request,
Expand Down Expand Up @@ -130,9 +128,7 @@ class SocketController {
private shouldSuppressEvents = false
private suppressedEvents: Array<[event: string, ...args: Array<unknown>]> = []
private request: Request
private requestParser: typeof HTTPParser
private requestStream?: Readable
private responseParser: typeof HTTPParser
private responseStream?: Readable

constructor(
Expand All @@ -142,58 +138,35 @@ class SocketController {
) {
this.url = parseSocketConnectionUrl(normalizedOptions)

// Create the parser later on because a single
// socket can be *reused* for multiple requests.
// The same way, don't free the parser.
this.requestParser = new HTTPParser()
this.requestParser[HTTPParser.kOnHeadersComplete] = (
verionMajor: number,
versionMinor: number,
headers: Array<string>,
idk: number,
path: string,
idk2: undefined,
idk3: undefined,
idk4: boolean
) => {
this.onRequestStart(path, headers)
}
this.requestParser[HTTPParser.kOnBody] = (chunk: Buffer) => {
this.onRequestData(chunk)
}
this.requestParser[HTTPParser.kOnMessageComplete] = () => {
this.onRequestEnd()
}
this.requestParser.initialize(HTTPParser.REQUEST, {})

this.responseParser = new HTTPParser()
this.responseParser[HTTPParser.kOnHeadersComplete] = (
verionMajor: number,
versionMinor: number,
headers: Array<string>,
method: string | undefined,
url: string | undefined,
status: number,
statusText: string,
upgrade: boolean,
shouldKeepAlive: boolean
) => {
this.onResponseStart(status, statusText, headers)
}
this.responseParser[HTTPParser.kOnBody] = (chunk: Buffer) => {
this.onResponseData(chunk)
}
this.responseParser[HTTPParser.kOnMessageComplete] = () => {
this.onResponseEnd()
}
this.responseParser.initialize(
HTTPParser.RESPONSE,
// Don't create any async resources here.
// This has to be "HTTPINCOMINGMESSAGE" in practice.
// @see https://github.com/nodejs/llhttp/issues/44#issuecomment-582499320
// new HTTPServerAsyncResource('INTERCEPTORINCOMINGMESSAGE', socket)
{}
)
const requestParser = new HttpMessageParser('request', {
onHeadersComplete: (major, minor, headers, _, path) => {
this.onRequestStart(path, headers)
},
onBody: (chunk) => {
this.onRequestData(chunk)
},
onMessageComplete: this.onRequestEnd.bind(this),
})

const responseParser = new HttpMessageParser('response', {
onHeadersComplete: (
versionMajor,
versionMinor,
headers,
method,
url,
status,
statusText,
upgrade,
keepalive
) => {
this.onResponseStart(status, statusText, headers)
},
onBody: (chunk) => {
this.onResponseData(chunk)
},
onMessageComplete: this.onResponseEnd.bind(this),
})

socket.emit = new Proxy(socket.emit, {
apply: (target, thisArg, args) => {
Expand All @@ -209,13 +182,13 @@ class SocketController {
if (this.shouldSuppressEvents) {
if (args[0] === 'error') {
Reflect.set(this.socket, '_hadError', false)
this.suppressedEvents.push(['error', args.slice(1)])
this.suppressedEvents.push(['error', ...args.slice(1)])
return true
}

// Suppress close events for errored mocked connections.
if (args[0] === 'close') {
this.suppressedEvents.push(['close', args.slice(1)])
this.suppressedEvents.push(['close', ...args.slice(1)])
return true
}
}
Expand All @@ -224,7 +197,7 @@ class SocketController {
},
})

socket.once('ready', () => {
socket.once('connect', () => {
// Notify the interceptor once the socket is ready.
// The HTTP parser triggers BEFORE that.
this.onRequest(this.request)
Expand All @@ -234,7 +207,7 @@ class SocketController {
socket.write = new Proxy(socket.write, {
apply: (target, thisArg, args) => {
if (args[0] !== null) {
this.requestParser.execute(
requestParser.push(
Buffer.isBuffer(args[0]) ? args[0] : Buffer.from(args[0])
)
}
Expand All @@ -246,7 +219,7 @@ class SocketController {
socket.push = new Proxy(socket.push, {
apply: (target, thisArg, args) => {
if (args[0] !== null) {
this.responseParser.execute(
responseParser.push(
Buffer.isBuffer(args[0]) ? args[0] : Buffer.from(args[0])
)
}
Expand Down Expand Up @@ -304,11 +277,15 @@ class SocketController {
}

private replayErrors() {
console.log('replay errors...', this.suppressedEvents)

if (this.suppressedEvents.length === 0) {
return
}

for (const [event, ...args] of this.suppressedEvents) {
console.log('replaying event', event, ...args)

if (event === 'error') {
Reflect.set(this.socket, '_hadError', true)
}
Expand Down Expand Up @@ -342,6 +319,7 @@ class SocketController {
method,
headers,
body: methodWithBody ? Readable.toWeb(this.requestStream) : null,
// @ts-expect-error Not documented fetch property.
duplex: methodWithBody ? 'half' : undefined,
credentials: 'same-origin',
})
Expand All @@ -356,8 +334,6 @@ class SocketController {
}

private onRequestEnd() {
this.requestParser.free()

invariant(
this.requestStream,
'Failed to handle the request end: request stream is missing'
Expand All @@ -376,7 +352,7 @@ class SocketController {
statusText,
headers: parseRawHeaders(rawHeaders),
})
this.onResponse(response)
this.onResponse(response, false)
}

private onResponseData(chunk: Buffer) {
Expand All @@ -388,8 +364,6 @@ class SocketController {
}

private onResponseEnd() {
this.responseParser.free()

invariant(
this.responseStream,
'Failed to handle the response end: response stream is missing'
Expand All @@ -398,6 +372,57 @@ class SocketController {
}
}

type HttpMessageParserMessageType = 'request' | 'response'
interface HttpMessageParserCallbacks<T extends HttpMessageParserMessageType> {
onHeadersComplete?: T extends 'request'
? (
versionMajor: number,
versionMinor: number,
headers: Array<string>,
idk: number,
path: string
) => void
: (
versionMajor: number,
versionMinor: number,
headers: Array<string>,
method: string | undefined,
url: string | undefined,
status: number,
statusText: string,
upgrade: boolean,
shouldKeepAlive: boolean
) => void
onBody?: (chunk: Buffer) => void
onMessageComplete?: () => void
}

class HttpMessageParser<T extends HttpMessageParserMessageType> {
private parser: HTTPParser

constructor(messageType: T, callbacks: HttpMessageParserCallbacks<T>) {
this.parser = new HTTPParser()
this.parser.initialize(
messageType === 'request' ? HTTPParser.REQUEST : HTTPParser.RESPONSE,
// Don't create any async resources here.
// This has to be "HTTPINCOMINGMESSAGE" in practice.
// @see https://github.com/nodejs/llhttp/issues/44#issuecomment-582499320
// new HTTPServerAsyncResource('INTERCEPTORINCOMINGMESSAGE', socket)
{}
)
this.parser[HTTPParser.kOnHeadersComplete] = callbacks.onHeadersComplete
this.parser[HTTPParser.kOnMessageComplete] = callbacks.onMessageComplete
}

public push(chunk: Buffer): void {
this.parser.execute(chunk)
}

public destroy(): void {
this.parser.free()
}
}

function parseSocketConnectionUrl(
options: NormalizedSocketConnectOptions
): URL {
Expand Down Expand Up @@ -429,21 +454,3 @@ function parseRawHeaders(rawHeaders: Array<string>): Headers {
}
return headers
}

// MOCKED REQUEST:
// 1. lookup // mock that's OK
// 2. connect
// 3. ready
// HAS MOCK?
// -> Y: data -> close
// -> N (no response, non-existing host):
// -> replayErrors()
// -> lookup (error), error, close

// BYPASSED REQUEST TO EXISTING HOST:
// 1. lookup (no errors)
// 2. (skip mockConnect), forward all socket events.
// 3. emit "request" on the interceptor.
// 4. HAS MOCK?
// -> Y: respondWith: data -> close
// -> N: do nothing

0 comments on commit 5d9a812

Please sign in to comment.