Skip to content

Commit

Permalink
feat: Support WebSocket API routes, Upgrade requests
Browse files Browse the repository at this point in the history
Enables route handlers to receive and act on `Connection: Upgrade` requests, such as WebSockets,
when using the Node runtime. To enable this, the base http server has the `on('upgrade')` handler
removed. In this author's opinion, that handler is an anti-pattern as it makes it much more
difficult to handle middleware and other request lifecycle behavior.

By passing the raw request to the route handler and implementing a `NextResponse.upgrade()` response
value to opt out of additional processing that would write to the socket, the route handler can
handle an upgrade request itself.

Fixes #58698 (feature request)
Fixes #56368 (caused by next-ws / websocket middleware)
  • Loading branch information
AaronFriel committed Nov 20, 2023
1 parent 43b075e commit 546ebdc
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 117 deletions.
10 changes: 5 additions & 5 deletions contributing/core/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ We recommend running the tests in headless mode (with the browser windows hidden

For example, running one test in the production test suite:

Running one test in the `test/integration/production` test suite:
Running one test in the `test/production/pages-dir/production/test` test suite:

```sh
pnpm testheadless test/integration/production/ -t "should allow etag header support"
pnpm testheadless test/production/pages-dir/production/test/ -t "should allow etag header support"
```

Running all tests in the `test/integration/production` test suite:
Running all tests in the `test/production/pages-dir/production/test` test suite:

```sh
pnpm testheadless test/integration/production/
pnpm testheadless test/production/pages-dir/production/test/
```

When you want to debug a particular test you can replace `pnpm testheadless` with `pnpm testonly` to opt out of the headless browser.
When the test runs it will open the browser that is in the background by default, allowing you to inspect what is on the screen.

```sh
pnpm testonly test/integration/production/ -t "should allow etag header support"
pnpm testonly test/production/pages-dir/production/test/ -t "should allow etag header support"
```

**End-to-end (e2e)** tests are run in complete isolation from the repository.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"@types/sharp": "0.29.3",
"@types/string-hash": "1.1.1",
"@types/trusted-types": "2.0.3",
"@types/ws": "8.5.9",
"@typescript-eslint/eslint-plugin": "6.1.0",
"@typescript-eslint/parser": "6.1.0",
"@vercel/fetch": "6.1.1",
Expand Down
5 changes: 5 additions & 0 deletions packages/next/src/server/base-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import {
} from './future/route-modules/checks'
import { PrefetchRSCPathnameNormalizer } from './future/normalizers/request/prefetch-rsc'
import { NextDataPathnameNormalizer } from './future/normalizers/request/next-data'
import { NextResponse, UPGRADE_HEADER } from './web/spec-extension/response'

export type FindComponentsResult = {
components: LoadComponentsReturnType
Expand Down Expand Up @@ -2246,6 +2247,10 @@ export default abstract class Server<ServerOptions extends Options = Options> {

const response = await routeModule.handle(request, context)

if (response.headers.has(UPGRADE_HEADER)) {
return null
}

;(req as any).fetchMetrics = (
context.renderOpts as any
).fetchMetrics
Expand Down
69 changes: 22 additions & 47 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// this must come first as it includes require hooks
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './types'
import type { WorkerRequestHandler } from './types'
import type { DevBundler } from './router-utils/setup-dev-bundler'
import type { NextUrlWithParsedQuery } from '../request-meta'
// This is required before other imports to ensure the require hook is setup.
Expand Down Expand Up @@ -62,7 +62,7 @@ export async function initialize(opts: {
customServer?: boolean
experimentalTestProxy?: boolean
experimentalHttpsServer?: boolean
}): Promise<[WorkerRequestHandler, WorkerUpgradeHandler]> {
}): Promise<WorkerRequestHandler> {
if (!process.env.NODE_ENV) {
// @ts-ignore not readonly
process.env.NODE_ENV = opts.dev ? 'development' : 'production'
Expand Down Expand Up @@ -171,7 +171,24 @@ export async function initialize(opts: {
)

const requestHandlerImpl: WorkerRequestHandler = async (req, res) => {
if (compress) {
const isUpgradeReq =
req?.method === 'GET' &&
req?.headers?.connection?.toLowerCase() === 'upgrade'

if (
isUpgradeReq &&
opts.dev &&
developmentBundler &&
req.url?.includes(`/_next/webpack-hmr`)
) {
return developmentBundler.hotReloader.onHMR(
req,
req.socket,
Buffer.alloc(0)
)
}

if (compress && !isUpgradeReq) {
// @ts-expect-error not express req/res
compress(req, res, () => {})
}
Expand Down Expand Up @@ -292,7 +309,7 @@ export async function initialize(opts: {
} = await resolveRoutes({
req,
res,
isUpgradeReq: false,
isUpgradeReq,
signal: signalFromNodeResponse(res),
invokedOutputs,
})
Expand Down Expand Up @@ -563,47 +580,5 @@ export async function initialize(opts: {
}
requestHandlers[opts.dir] = requestHandler

const upgradeHandler: WorkerUpgradeHandler = async (req, socket, head) => {
try {
req.on('error', (_err) => {
// TODO: log socket errors?
// console.error(_err);
})
socket.on('error', (_err) => {
// TODO: log socket errors?
// console.error(_err);
})

if (opts.dev && developmentBundler) {
if (req.url?.includes(`/_next/webpack-hmr`)) {
return developmentBundler.hotReloader.onHMR(req, socket, head)
}
}

const { matchedOutput, parsedUrl } = await resolveRoutes({
req,
res: socket as any,
isUpgradeReq: true,
signal: signalFromNodeResponse(socket),
})

// TODO: allow upgrade requests to pages/app paths?
// this was not previously supported
if (matchedOutput) {
return socket.end()
}

if (parsedUrl.protocol) {
return await proxyRequest(req, socket as any, parsedUrl, head)
}

// If there's no matched output, we don't handle the request as user's
// custom WS server may be listening on the same path.
} catch (err) {
console.error('Error handling upgrade request', err)
socket.end()
}
}

return [requestHandler, upgradeHandler]
return requestHandler
}
26 changes: 2 additions & 24 deletions packages/next/src/server/lib/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import '../require-hook'

import type { IncomingMessage, ServerResponse } from 'http'
import type { SelfSignedCertificate } from '../../lib/mkcert'
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './types'
import type { WorkerRequestHandler } from './types'

import fs from 'fs'
import v8 from 'v8'
Expand Down Expand Up @@ -113,17 +113,6 @@ export async function startServer(
}
throw new Error('Invariant request handler was not setup')
}
let upgradeHandler: WorkerUpgradeHandler = async (
req,
socket,
head
): Promise<void> => {
if (handlersPromise) {
await handlersPromise
return upgradeHandler(req, socket, head)
}
throw new Error('Invariant upgrade handler was not setup')
}

// setup server listener as fast as possible
if (selfSignedCertificate && !isDev) {
Expand Down Expand Up @@ -172,15 +161,6 @@ export async function startServer(
if (keepAliveTimeout) {
server.keepAliveTimeout = keepAliveTimeout
}
server.on('upgrade', async (req, socket, head) => {
try {
await upgradeHandler(req, socket, head)
} catch (err) {
socket.destroy()
Log.error(`Failed to handle request for ${req.url}`)
console.error(err)
}
})

let portRetryCount = 0

Expand Down Expand Up @@ -281,7 +261,7 @@ export async function startServer(
process.on('uncaughtException', exception)
process.on('unhandledRejection', exception)

const initResult = await getRequestHandlers({
requestHandler = await getRequestHandlers({
dir,
port,
isDev,
Expand All @@ -293,8 +273,6 @@ export async function startServer(
experimentalTestProxy: !!isExperimentalTestProxy,
experimentalHttpsServer: !!selfSignedCertificate,
})
requestHandler = initResult[0]
upgradeHandler = initResult[1]

const startServerProcessDuration =
performance.mark('next-start-end') &&
Expand Down
8 changes: 0 additions & 8 deletions packages/next/src/server/lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import type { IncomingMessage, ServerResponse } from 'http'

import type { Duplex } from 'stream'

export type WorkerRequestHandler = (
req: IncomingMessage,
res: ServerResponse
) => Promise<any>

export type WorkerUpgradeHandler = (
req: IncomingMessage,
socket: Duplex,
head: Buffer
) => any
27 changes: 2 additions & 25 deletions packages/next/src/server/next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { UrlWithParsedQuery } from 'url'
import type { NextConfigComplete } from './config-shared'
import type { IncomingMessage, ServerResponse } from 'http'
import type { NextUrlWithParsedQuery } from './request-meta'
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './lib/types'
import type { WorkerRequestHandler } from './lib/types'

import './require-hook'
import './node-polyfill-crypto'
Expand Down Expand Up @@ -268,41 +268,21 @@ class NextCustomServer extends NextServer {

// @ts-expect-error These are initialized in prepare()
protected requestHandler: WorkerRequestHandler
// @ts-expect-error These are initialized in prepare()
protected upgradeHandler: WorkerUpgradeHandler

async prepare() {
const { getRequestHandlers } =
require('./lib/start-server') as typeof import('./lib/start-server')

const isNodeDebugging = !!checkNodeDebugType()

const initResult = await getRequestHandlers({
this.requestHandler = await getRequestHandlers({
dir: this.options.dir!,
port: this.options.port || 3000,
isDev: !!this.options.dev,
hostname: this.options.hostname || 'localhost',
minimalMode: this.options.minimalMode,
isNodeDebugging: !!isNodeDebugging,
})
this.requestHandler = initResult[0]
this.upgradeHandler = initResult[1]
}

private setupWebSocketHandler(
customServer?: import('http').Server,
_req?: IncomingMessage
) {
if (!this.didWebSocketSetup) {
this.didWebSocketSetup = true
customServer = customServer || (_req?.socket as any)?.server

if (customServer) {
customServer.on('upgrade', async (req, socket, head) => {
this.upgradeHandler(req, socket, head)
})
}
}
}

getRequestHandler() {
Expand All @@ -311,8 +291,6 @@ class NextCustomServer extends NextServer {
res: ServerResponse,
parsedUrl?: UrlWithParsedQuery
) => {
this.setupWebSocketHandler(this.options.httpServer, req)

if (parsedUrl) {
req.url = formatUrl(parsedUrl)
}
Expand All @@ -323,7 +301,6 @@ class NextCustomServer extends NextServer {

async render(...args: Parameters<Server['render']>) {
let [req, res, pathname, query, parsedUrl] = args
this.setupWebSocketHandler(this.options.httpServer, req as any)

if (!pathname.startsWith('/')) {
console.error(`Cannot render page with path "${pathname}"`)
Expand Down
5 changes: 5 additions & 0 deletions packages/next/src/server/send-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { BaseNextRequest, BaseNextResponse } from './base-http'
import type { NodeNextResponse } from './base-http/node'

import { pipeToNodeResponse } from './pipe-readable'
import { UPGRADE_HEADER } from './web/spec-extension/response'
import { splitCookiesString } from './web/utils'

/**
Expand All @@ -19,6 +20,10 @@ export async function sendResponse(
): Promise<void> {
// Don't use in edge runtime
if (process.env.NEXT_RUNTIME !== 'edge') {
if (res.hasHeader(UPGRADE_HEADER)) {
// The route has been upgraded, so we don't want to send the response.
return
}
// Copy over the response status.
res.statusCode = response.status
res.statusMessage = response.statusText
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class NextRequestAdapter {
// @ts-expect-error - see https://github.com/whatwg/fetch/pull/1457
duplex: 'half',
signal,
rawRequest: request.originalRequest,
// geo
// ip
// nextConfig
Expand Down
8 changes: 8 additions & 0 deletions packages/next/src/server/web/spec-extension/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { NextURL } from '../next-url'
import { toNodeOutgoingHttpHeaders, validateURL } from '../utils'
import { RemovedUAError, RemovedPageError } from '../error'
import { RequestCookies } from './cookies'
import type { IncomingMessage } from 'http'

export const INTERNALS = Symbol('internal request')

Expand All @@ -14,6 +15,7 @@ export class NextRequest extends Request {
ip?: string
url: string
nextUrl: NextURL
rawRequest?: IncomingMessage
}

constructor(input: URL | RequestInfo, init: RequestInit = {}) {
Expand All @@ -34,6 +36,7 @@ export class NextRequest extends Request {
url: process.env.__NEXT_NO_MIDDLEWARE_URL_NORMALIZE
? url
: nextUrl.toString(),
rawRequest: init.rawRequest,
}
}

Expand Down Expand Up @@ -98,6 +101,10 @@ export class NextRequest extends Request {
public get url() {
return this[INTERNALS].url
}

public get experimentalRawRequest() {
return this[INTERNALS].rawRequest
}
}

export interface RequestInit extends globalThis.RequestInit {
Expand All @@ -113,4 +120,5 @@ export interface RequestInit extends globalThis.RequestInit {
trailingSlash?: boolean
}
signal?: AbortSignal
rawRequest?: IncomingMessage
}
12 changes: 12 additions & 0 deletions packages/next/src/server/web/spec-extension/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { ResponseCookies } from './cookies'
const INTERNALS = Symbol('internal response')
const REDIRECTS = new Set([301, 302, 303, 307, 308])

export const UPGRADE_HEADER = 'x-route-upgraded'

function handleMiddlewareField(
init: MiddlewareResponseInit | undefined,
headers: Headers
Expand Down Expand Up @@ -111,6 +113,16 @@ export class NextResponse<Body = unknown> extends Response {
handleMiddlewareField(init, headers)
return new NextResponse(null, { ...init, headers })
}

static upgrade() {
const headers = new Headers()
headers.set(UPGRADE_HEADER, '1')

return new NextResponse(null, {
headers,
status: 200, // TODO: This is inaccurate, but undici doesn't allow a 101 here.
})
}
}

interface ResponseInit extends globalThis.ResponseInit {
Expand Down
Loading

0 comments on commit 546ebdc

Please sign in to comment.