Skip to content

Commit

Permalink
fix: ensure websockets are correctly passed (#65759)
Browse files Browse the repository at this point in the history
Further enhancing the typings across the codebase, this resolves some
errors discovered while running tests. During development, previously,
if the websocket request was forwarded down to the route resolver, it
would fail. This is because a `Duplex` stream is not a `ServerResponse`.

I opted to use the `MockedResponse` here to ensure the remaining code
didn't change, as we're only using the resolve routes code to identify a
match rather than actually sending the response on. The response data is
sent later with the `proxyRequest` which here does have support for
`Duplex` streams.
  • Loading branch information
wyattjoh authored May 15, 2024
1 parent 337666b commit 6c3577d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 72 deletions.
12 changes: 10 additions & 2 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { ensureLeadingSlash } from '../../shared/lib/page-path/ensure-leading-sl
import { getNextPathnameInfo } from '../../shared/lib/router/utils/get-next-pathname-info'
import { getHostname } from '../../shared/lib/get-hostname'
import { detectDomainLocale } from '../../shared/lib/i18n/detect-domain-locale'
import { MockedResponse } from './mock-request'

const debug = setupDebug('next:router-server:main')
const isNextFont = (pathname: string | null) =>
Expand Down Expand Up @@ -660,9 +661,16 @@ export async function initialize(opts: {
}
}

const res = new MockedResponse({
resWriter: () => {
throw new Error(
'Invariant: did not expect response writer to be written to for upgrade request'
)
},
})
const { matchedOutput, parsedUrl } = await resolveRoutes({
req,
res: socket as any,
res,
isUpgradeReq: true,
signal: signalFromNodeResponse(socket),
})
Expand All @@ -674,7 +682,7 @@ export async function initialize(opts: {
}

if (parsedUrl.protocol) {
return await proxyRequest(req, socket as any, parsedUrl, head)
return await proxyRequest(req, socket, parsedUrl, head)
}

// If there's no matched output, we don't handle the request as user's
Expand Down
154 changes: 84 additions & 70 deletions packages/next/src/server/lib/router-utils/proxy-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import type { NextUrlWithParsedQuery } from '../../request-meta'

import url from 'url'
import { stringifyQuery } from '../../server-route-utils'
import { Duplex } from 'stream'
import { DetachedPromise } from '../../../lib/detached-promise'

export async function proxyRequest(
req: IncomingMessage,
res: ServerResponse,
res: ServerResponse | Duplex,
parsedUrl: NextUrlWithParsedQuery,
upgradeHead?: any,
upgradeHead?: Buffer,
reqBody?: any,
proxyTimeout?: number | null
) {
Expand All @@ -33,83 +35,95 @@ export async function proxyRequest(
},
})

await new Promise((proxyResolve, proxyReject) => {
let finished = false
let finished = false

// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads
// and closes the request object. Node **will not** fire `aborted` when
// the request is already closed. Listening for `close` on our response
// object will detect the disconnect, and we can abort the proxy's
// connection.
proxy.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})
proxy.on('proxyRes', (proxyRes) => {
if (res.destroyed) {
proxyRes.destroy()
} else {
res.on('close', () => proxyRes.destroy())
}
})
// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads
// and closes the request object. Node **will not** fire `aborted` when
// the request is already closed. Listening for `close` on our response
// object will detect the disconnect, and we can abort the proxy's
// connection.
proxy.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})

proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
const cleanup = (err: any) => {
// cleanup event listeners to allow clean garbage collection
proxyRes.removeListener('error', cleanup)
proxyRes.removeListener('close', cleanup)
innerRes.removeListener('error', cleanup)
innerRes.removeListener('close', cleanup)

// destroy all source streams to propagate the caught event backward
innerReq.destroy(err)
proxyRes.destroy(err)
}
proxy.on('proxyRes', (proxyRes) => {
if (res.destroyed) {
proxyRes.destroy()
} else {
res.on('close', () => proxyRes.destroy())
}
})

proxyRes.once('error', cleanup)
proxyRes.once('close', cleanup)
innerRes.once('error', cleanup)
innerRes.once('close', cleanup)
})
proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
const cleanup = (err: any) => {
// cleanup event listeners to allow clean garbage collection
proxyRes.removeListener('error', cleanup)
proxyRes.removeListener('close', cleanup)
innerRes.removeListener('error', cleanup)
innerRes.removeListener('close', cleanup)

// destroy all source streams to propagate the caught event backward
innerReq.destroy(err)
proxyRes.destroy(err)
}

proxyRes.once('error', cleanup)
proxyRes.once('close', cleanup)
innerRes.once('error', cleanup)
innerRes.once('close', cleanup)
})

const detached = new DetachedPromise<boolean>()

// When the proxy finishes proxying the request, shut down the proxy.
detached.promise.finally(() => {
proxy.close()
})

proxy.on('error', (err) => {
console.error(`Failed to proxy ${target}`, err)
if (!finished) {
finished = true
proxyReject(err)
proxy.on('error', (err) => {
console.error(`Failed to proxy ${target}`, err)
if (!finished) {
finished = true
detached.reject(err)

if (!res.destroyed) {
if (!res.destroyed) {
if (!(res instanceof Duplex)) {
res.statusCode = 500
res.end('Internal Server Error')
}

res.end('Internal Server Error')
}
})
}
})

// if upgrade head is present treat as WebSocket request
if (upgradeHead) {
proxy.on('proxyReqWs', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.ws(req as any as IncomingMessage, res, upgradeHead)
proxyResolve(true)
} else {
proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
// If upgrade head is present or the response is a Duplex stream, treat as
// WebSocket request.
if (upgradeHead || res instanceof Duplex) {
proxy.on('proxyReqWs', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
detached.resolve(true)
}
})
proxy.web(req, res, {
buffer: reqBody,
})
proxy.ws(req, res, upgradeHead)
detached.resolve(true)
} else {
proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
detached.resolve(true)
}
})
}
})
})
proxy.web(req, res, {
buffer: reqBody,
})
}

return detached.promise
}

0 comments on commit 6c3577d

Please sign in to comment.