Skip to content

Commit

Permalink
feat: Support WebSocket API routes, Upgrade requests
Browse files Browse the repository at this point in the history
Enables route handlers to receive and act on `Connection: Upgrade` requests, such as WebSockets,
when using the Node runtime. To enable this, the base http server has the `on('upgrade')` handler
removed. In this author's opinion, that handler is an anti-pattern as it makes it much more
difficult to handle middleware and other request lifecycle behavior.

By passing the raw request to the route handler and implementing a `NextResponse.upgrade()` response
value to opt out of additional processing that would write to the socket, the route handler can
handle an upgrade request itself.

Fixes #58698 (feature request)
Fixes #56368 (caused by next-ws / websocket middleware)
  • Loading branch information
AaronFriel committed Dec 1, 2023
1 parent 43b075e commit 6bbe950
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 106 deletions.
6 changes: 6 additions & 0 deletions packages/next/src/export/routes/app-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ export async function exportAppRoute(
const module = await RouteModuleLoader.load<AppRouteRouteModule>(filename)
const response = await module.handle(request, context)

if (response === 'Upgraded') {
return {
revalidate: 0,
}
}

const isValidStatus = response.status < 400 || response.status === 404
if (!isValidStatus) {
return { revalidate: 0 }
Expand Down
3 changes: 3 additions & 0 deletions packages/next/src/server/base-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2245,6 +2245,9 @@ export default abstract class Server<ServerOptions extends Options = Options> {
)

const response = await routeModule.handle(request, context)
if (response === 'Upgraded') {
return null
}

;(req as any).fetchMetrics = (
context.renderOpts as any
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { NextConfig } from '../../../config-shared'
import type { AppRouteRouteDefinition } from '../../route-definitions/app-route-route-definition'
import type { AppConfig } from '../../../../build/utils'
import type { NextRequest } from '../../../web/spec-extension/request'
import type { PrerenderManifest } from '../../../../build'
import {
isUpgradeError,
type NextRequest,
} from '../../../web/spec-extension/request'

import {
RouteModule,
Expand Down Expand Up @@ -449,14 +452,18 @@ export class AppRouteRouteModule extends RouteModule<
public async handle(
request: NextRequest,
context: AppRouteRouteHandlerContext
): Promise<Response> {
): Promise<Response | 'Upgraded'> {
try {
// Execute the route to get the response.
const response = await this.execute(request, context)

// The response was handled, return it.
return response
} catch (err) {
if (isUpgradeError(err)) {
return 'Upgraded'
}

// Try to resolve the error to a response, else throw it again.
const response = resolveHandlerError(err)
if (!response) throw err
Expand Down
69 changes: 22 additions & 47 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// this must come first as it includes require hooks
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './types'
import type { WorkerRequestHandler } from './types'
import type { DevBundler } from './router-utils/setup-dev-bundler'
import type { NextUrlWithParsedQuery } from '../request-meta'
// This is required before other imports to ensure the require hook is setup.
Expand Down Expand Up @@ -62,7 +62,7 @@ export async function initialize(opts: {
customServer?: boolean
experimentalTestProxy?: boolean
experimentalHttpsServer?: boolean
}): Promise<[WorkerRequestHandler, WorkerUpgradeHandler]> {
}): Promise<WorkerRequestHandler> {
if (!process.env.NODE_ENV) {
// @ts-ignore not readonly
process.env.NODE_ENV = opts.dev ? 'development' : 'production'
Expand Down Expand Up @@ -171,7 +171,24 @@ export async function initialize(opts: {
)

const requestHandlerImpl: WorkerRequestHandler = async (req, res) => {
if (compress) {
const isUpgradeReq =
req?.method === 'GET' &&
req?.headers?.connection?.toLowerCase() === 'upgrade'

if (
isUpgradeReq &&
opts.dev &&
developmentBundler &&
req.url?.includes(`/_next/webpack-hmr`)
) {
return developmentBundler.hotReloader.onHMR(
req,
req.socket,
Buffer.alloc(0)
)
}

if (compress && !isUpgradeReq) {
// @ts-expect-error not express req/res
compress(req, res, () => {})
}
Expand Down Expand Up @@ -292,7 +309,7 @@ export async function initialize(opts: {
} = await resolveRoutes({
req,
res,
isUpgradeReq: false,
isUpgradeReq,
signal: signalFromNodeResponse(res),
invokedOutputs,
})
Expand Down Expand Up @@ -563,47 +580,5 @@ export async function initialize(opts: {
}
requestHandlers[opts.dir] = requestHandler

const upgradeHandler: WorkerUpgradeHandler = async (req, socket, head) => {
try {
req.on('error', (_err) => {
// TODO: log socket errors?
// console.error(_err);
})
socket.on('error', (_err) => {
// TODO: log socket errors?
// console.error(_err);
})

if (opts.dev && developmentBundler) {
if (req.url?.includes(`/_next/webpack-hmr`)) {
return developmentBundler.hotReloader.onHMR(req, socket, head)
}
}

const { matchedOutput, parsedUrl } = await resolveRoutes({
req,
res: socket as any,
isUpgradeReq: true,
signal: signalFromNodeResponse(socket),
})

// TODO: allow upgrade requests to pages/app paths?
// this was not previously supported
if (matchedOutput) {
return socket.end()
}

if (parsedUrl.protocol) {
return await proxyRequest(req, socket as any, parsedUrl, head)
}

// If there's no matched output, we don't handle the request as user's
// custom WS server may be listening on the same path.
} catch (err) {
console.error('Error handling upgrade request', err)
socket.end()
}
}

return [requestHandler, upgradeHandler]
return requestHandler
}
26 changes: 2 additions & 24 deletions packages/next/src/server/lib/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import '../require-hook'

import type { IncomingMessage, ServerResponse } from 'http'
import type { SelfSignedCertificate } from '../../lib/mkcert'
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './types'
import type { WorkerRequestHandler } from './types'

import fs from 'fs'
import v8 from 'v8'
Expand Down Expand Up @@ -113,17 +113,6 @@ export async function startServer(
}
throw new Error('Invariant request handler was not setup')
}
let upgradeHandler: WorkerUpgradeHandler = async (
req,
socket,
head
): Promise<void> => {
if (handlersPromise) {
await handlersPromise
return upgradeHandler(req, socket, head)
}
throw new Error('Invariant upgrade handler was not setup')
}

// setup server listener as fast as possible
if (selfSignedCertificate && !isDev) {
Expand Down Expand Up @@ -172,15 +161,6 @@ export async function startServer(
if (keepAliveTimeout) {
server.keepAliveTimeout = keepAliveTimeout
}
server.on('upgrade', async (req, socket, head) => {
try {
await upgradeHandler(req, socket, head)
} catch (err) {
socket.destroy()
Log.error(`Failed to handle request for ${req.url}`)
console.error(err)
}
})

let portRetryCount = 0

Expand Down Expand Up @@ -281,7 +261,7 @@ export async function startServer(
process.on('uncaughtException', exception)
process.on('unhandledRejection', exception)

const initResult = await getRequestHandlers({
requestHandler = await getRequestHandlers({
dir,
port,
isDev,
Expand All @@ -293,8 +273,6 @@ export async function startServer(
experimentalTestProxy: !!isExperimentalTestProxy,
experimentalHttpsServer: !!selfSignedCertificate,
})
requestHandler = initResult[0]
upgradeHandler = initResult[1]

const startServerProcessDuration =
performance.mark('next-start-end') &&
Expand Down
8 changes: 0 additions & 8 deletions packages/next/src/server/lib/types.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import type { IncomingMessage, ServerResponse } from 'http'

import type { Duplex } from 'stream'

export type WorkerRequestHandler = (
req: IncomingMessage,
res: ServerResponse
) => Promise<any>

export type WorkerUpgradeHandler = (
req: IncomingMessage,
socket: Duplex,
head: Buffer
) => any
27 changes: 2 additions & 25 deletions packages/next/src/server/next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { UrlWithParsedQuery } from 'url'
import type { NextConfigComplete } from './config-shared'
import type { IncomingMessage, ServerResponse } from 'http'
import type { NextUrlWithParsedQuery } from './request-meta'
import type { WorkerRequestHandler, WorkerUpgradeHandler } from './lib/types'
import type { WorkerRequestHandler } from './lib/types'

import './require-hook'
import './node-polyfill-crypto'
Expand Down Expand Up @@ -268,41 +268,21 @@ class NextCustomServer extends NextServer {

// @ts-expect-error These are initialized in prepare()
protected requestHandler: WorkerRequestHandler
// @ts-expect-error These are initialized in prepare()
protected upgradeHandler: WorkerUpgradeHandler

async prepare() {
const { getRequestHandlers } =
require('./lib/start-server') as typeof import('./lib/start-server')

const isNodeDebugging = !!checkNodeDebugType()

const initResult = await getRequestHandlers({
this.requestHandler = await getRequestHandlers({
dir: this.options.dir!,
port: this.options.port || 3000,
isDev: !!this.options.dev,
hostname: this.options.hostname || 'localhost',
minimalMode: this.options.minimalMode,
isNodeDebugging: !!isNodeDebugging,
})
this.requestHandler = initResult[0]
this.upgradeHandler = initResult[1]
}

private setupWebSocketHandler(
customServer?: import('http').Server,
_req?: IncomingMessage
) {
if (!this.didWebSocketSetup) {
this.didWebSocketSetup = true
customServer = customServer || (_req?.socket as any)?.server

if (customServer) {
customServer.on('upgrade', async (req, socket, head) => {
this.upgradeHandler(req, socket, head)
})
}
}
}

getRequestHandler() {
Expand All @@ -311,8 +291,6 @@ class NextCustomServer extends NextServer {
res: ServerResponse,
parsedUrl?: UrlWithParsedQuery
) => {
this.setupWebSocketHandler(this.options.httpServer, req)

if (parsedUrl) {
req.url = formatUrl(parsedUrl)
}
Expand All @@ -323,7 +301,6 @@ class NextCustomServer extends NextServer {

async render(...args: Parameters<Server['render']>) {
let [req, res, pathname, query, parsedUrl] = args
this.setupWebSocketHandler(this.options.httpServer, req as any)

if (!pathname.startsWith('/')) {
console.error(`Cannot render page with path "${pathname}"`)
Expand Down
3 changes: 3 additions & 0 deletions packages/next/src/server/web/edge-route-module-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ export class EdgeRouteModuleWrapper {

// Get the response from the handler.
const res = await this.routeModule.handle(request, context)
if (res === 'Upgraded') {
throw new Error('Unreachable - Edge routes cannot be upgraded')
}

const waitUntilPromises = [internal_getCurrentFunctionWaitUntil()]
if (context.renderOpts.waitUntil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class NextRequestAdapter {
// @ts-expect-error - see https://github.com/whatwg/fetch/pull/1457
duplex: 'half',
signal,
rawRequest: request.originalRequest,
// geo
// ip
// nextConfig
Expand Down
32 changes: 32 additions & 0 deletions packages/next/src/server/web/spec-extension/request.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import type { IncomingMessage } from 'node:http'
import type { Duplex } from 'node:stream'
import type { I18NConfig } from '../../config-shared'
import type { RequestData } from '../types'
import { NextURL } from '../next-url'
import { toNodeOutgoingHttpHeaders, validateURL } from '../utils'
import { RemovedUAError, RemovedPageError } from '../error'
import { RequestCookies } from './cookies'
import { scheduleOnNextTick } from '../../../lib/scheduler'

export const RequestUpgradedName = 'RequestUpgraded'
export class RequestUpgraded extends Error {
public readonly name = RequestUpgradedName
}

export function isUpgradeError(
e: any
): e is Error & { name: typeof RequestUpgradedName } {
return e?.name === RequestUpgradedName
}

export const INTERNALS = Symbol('internal request')

Expand All @@ -14,6 +28,7 @@ export class NextRequest extends Request {
ip?: string
url: string
nextUrl: NextURL
rawRequest?: IncomingMessage
}

constructor(input: URL | RequestInfo, init: RequestInit = {}) {
Expand All @@ -34,6 +49,7 @@ export class NextRequest extends Request {
url: process.env.__NEXT_NO_MIDDLEWARE_URL_NORMALIZE
? url
: nextUrl.toString(),
rawRequest: init.rawRequest,
}
}

Expand Down Expand Up @@ -98,6 +114,21 @@ export class NextRequest extends Request {
public get url() {
return this[INTERNALS].url
}

public upgrade(
handler: (request: IncomingMessage, socket: Duplex) => void
): never {
const rawRequest = this[INTERNALS].rawRequest
if (!rawRequest) {
throw new Error(
'Cannot upgrade to websocket, this feature is not compatible with the edge runtime.'
)
}

scheduleOnNextTick(() => handler(rawRequest, rawRequest.socket))

throw new RequestUpgraded('upgrade')
}
}

export interface RequestInit extends globalThis.RequestInit {
Expand All @@ -113,4 +144,5 @@ export interface RequestInit extends globalThis.RequestInit {
trailingSlash?: boolean
}
signal?: AbortSignal
rawRequest?: IncomingMessage
}
Loading

0 comments on commit 6bbe950

Please sign in to comment.