Skip to content

Commit

Permalink
Web Streams cleanup (#56819)
Browse files Browse the repository at this point in the history
This updates some code related to web streams and encoding.

- Removes some unused code related to base64 encoding/decoding (Edge runtime currently supports it natively via `Buffer`)
- Prefer readable stream `pull` versus `.on("data", (chunk) => { ... })` event handlers (simplifies execution)
- Utilize `pipeTo` and `pipeThrough` on web streams to remove custom code related to stream pumping
- Updates pipe readable function to utilize web streams first class rather than relying on manual pumping + stream management
  - This also takes advantage of the `AbortController` when piping so that the response can use it to cancel the stream
  • Loading branch information
wyattjoh authored Oct 18, 2023
1 parent 40dd14f commit 07c434d
Show file tree
Hide file tree
Showing 29 changed files with 562 additions and 532 deletions.
19 changes: 8 additions & 11 deletions packages/next/src/experimental/testmode/proxy/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import { UNHANDLED } from './types'
import type { FetchHandler } from './fetch-api'
import { handleFetch } from './fetch-api'

function readBody(req: IncomingMessage): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const acc: Buffer[] = []
req.on('data', (chunk) => {
acc.push(chunk)
})
req.on('end', () => {
resolve(Buffer.concat(acc))
})
req.on('error', reject)
})
async function readBody(req: IncomingMessage): Promise<Buffer> {
const acc: Buffer[] = []

for await (const chunk of req) {
acc.push(chunk)
}

return Buffer.concat(acc)
}

export async function createProxyServer({
Expand Down
5 changes: 1 addition & 4 deletions packages/next/src/export/routes/app-page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ export async function generatePrefetchRsc(
renderOpts
)

prefetchRenderResult.pipe(res)
await res.hasStreamed

const prefetchRscData = Buffer.concat(res.buffers)
const prefetchRscData = await prefetchRenderResult.toUnchunkedString(true)

if ((renderOpts as any).store.staticPrefetchBailout) return

Expand Down
6 changes: 5 additions & 1 deletion packages/next/src/export/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ export interface AmpValidation {
export type FileWriter = (
type: string,
path: string,
content: any,
content:
| string
| NodeJS.ArrayBufferView
| Iterable<string | NodeJS.ArrayBufferView>
| AsyncIterable<string | NodeJS.ArrayBufferView>,
encodingOptions?: WriteFileOptions
) => Promise<void>

Expand Down
21 changes: 11 additions & 10 deletions packages/next/src/server/app-render/action-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ function nodeToWebReadableStream(nodeReadable: import('stream').Readable) {
return Readable.toWeb(nodeReadable)
}

const iterator = nodeReadable[Symbol.asyncIterator]()

return new ReadableStream({
start(controller) {
nodeReadable.on('data', (chunk) => {
controller.enqueue(chunk)
})
pull: async (controller) => {
const { value, done } = await iterator.next()

nodeReadable.on('end', () => {
if (done) {
controller.close()
})

nodeReadable.on('error', (error) => {
controller.error(error)
})
} else {
controller.enqueue(value)
}
},
cancel: () => {
iterator.return?.()
},
})
} else {
Expand Down
7 changes: 3 additions & 4 deletions packages/next/src/server/app-render/app-render.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
renderToInitialFizzStream,
createBufferedTransformStream,
continueFizzStream,
streamToBufferedResult,
cloneTransformStream,
} from '../stream-utils/node-web-streams-helper'
import { canSegmentBeOverridden } from '../../client/components/match-segments'
Expand Down Expand Up @@ -908,7 +907,7 @@ async function renderToHTMLOrFlightImpl(
})

if (staticGenerationStore.isStaticGeneration) {
const htmlResult = await streamToBufferedResult(renderResult)
const htmlResult = await renderResult.toUnchunkedString(true)

// if we encountered any unexpected errors during build
// we fail the prerendering phase and the build
Expand All @@ -918,9 +917,9 @@ async function renderToHTMLOrFlightImpl(

// TODO-APP: derive this from same pass to prevent additional
// render during static generation
const stringifiedFlightPayload = await streamToBufferedResult(
const stringifiedFlightPayload = await (
await generateFlight(ctx)
)
).toUnchunkedString(true)

if (staticGenerationStore.forceStatic === false) {
staticGenerationStore.revalidate = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { RenderOpts } from './types'
import type { FlightResponseRef } from './flight-response-ref'
import type { AppPageModule } from '../future/route-modules/app-page/module'
import type { createErrorHandler } from './create-error-handler'

import React, { use } from 'react'
import type { createErrorHandler } from './create-error-handler'
import { useFlightResponse } from './use-flight-response'

/**
Expand Down
90 changes: 46 additions & 44 deletions packages/next/src/server/app-render/use-flight-response.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,47 @@
import type { ClientReferenceManifest } from '../../build/webpack/plugins/flight-manifest-plugin'
import type { FlightResponseRef } from './flight-response-ref'
import { encodeText, decodeText } from '../stream-utils/encode-decode'

import { htmlEscapeJsonString } from '../htmlescape'
import {
createDecodeTransformStream,
createEncodeTransformStream,
} from '../stream-utils/encode-decode'

const isEdgeRuntime = process.env.NEXT_RUNTIME === 'edge'

const INLINE_FLIGHT_PAYLOAD_BOOTSTRAP = 0
const INLINE_FLIGHT_PAYLOAD_DATA = 1
const INLINE_FLIGHT_PAYLOAD_FORM_STATE = 2

function createFlightTransformer(
nonce: string | undefined,
formState: unknown | null
) {
const startScriptTag = nonce
? `<script nonce=${JSON.stringify(nonce)}>`
: '<script>'

return new TransformStream<string, string>({
// Bootstrap the flight information.
start(controller) {
controller.enqueue(
`${startScriptTag}(self.__next_f=self.__next_f||[]).push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_BOOTSTRAP])
)});self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_FORM_STATE, formState])
)})</script>`
)
},
transform(chunk, controller) {
const scripts = `${startScriptTag}self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_DATA, chunk])
)})</script>`

controller.enqueue(scripts)
},
})
}

/**
* Render Flight stream.
* This is only used for renderToHTML, the Flight response does not need additional wrappers.
Expand Down Expand Up @@ -49,50 +82,19 @@ export function useFlightResponse(
})
flightResponseRef.current = res

let bootstrapped = false
// We only attach CSS chunks to the inlined data.
const forwardReader = forwardStream.getReader()
const writer = writable.getWriter()
const startScriptTag = nonce
? `<script nonce=${JSON.stringify(nonce)}>`
: '<script>'
const textDecoder = new TextDecoder()

function read() {
forwardReader.read().then(({ done, value }) => {
if (!bootstrapped) {
bootstrapped = true
writer.write(
encodeText(
`${startScriptTag}(self.__next_f=self.__next_f||[]).push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_BOOTSTRAP])
)});self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_FORM_STATE, formState])
)})</script>`
)
)
}
if (done) {
// Add a setTimeout here because the error component is too small, the first forwardReader.read() read will return the full chunk
// and then it immediately set flightResponseRef.current as null.
// react renders the component twice, the second render will run into the state with useFlightResponse where flightResponseRef.current is null,
// so it tries to render the flight payload again
setTimeout(() => {
flightResponseRef.current = null
})
writer.close()
} else {
const responsePartial = decodeText(value, textDecoder)
const scripts = `${startScriptTag}self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_DATA, responsePartial])
)})</script>`

writer.write(encodeText(scripts))
read()
}
forwardStream
.pipeThrough(createDecodeTransformStream())
.pipeThrough(createFlightTransformer(nonce, formState))
.pipeThrough(createEncodeTransformStream())
.pipeTo(writable)
.finally(() => {
// Once the last encoding stream has flushed, then unset the flight
// response ref.
flightResponseRef.current = null
})
.catch((err) => {
console.error('Unexpected error while rendering Flight stream', err)
})
}
read()

return res
}
40 changes: 19 additions & 21 deletions packages/next/src/server/base-http/web.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { IncomingHttpHeaders, OutgoingHttpHeaders } from 'http'
import type { FetchMetrics } from './index'

import { toNodeOutgoingHttpHeaders } from '../web/utils'
import { BaseNextRequest, BaseNextResponse } from './index'
import { DetachedPromise } from '../../lib/detached-promise'

export class WebNextRequest extends BaseNextRequest<ReadableStream | null> {
public request: Request
Expand Down Expand Up @@ -32,27 +34,10 @@ export class WebNextRequest extends BaseNextRequest<ReadableStream | null> {
export class WebNextResponse extends BaseNextResponse<WritableStream> {
private headers = new Headers()
private textBody: string | undefined = undefined
private _sent = false

private sendPromise = new Promise<void>((resolve) => {
this.sendResolve = resolve
})
private sendResolve?: () => void
private response = this.sendPromise.then(() => {
return new Response(this.textBody ?? this.transformStream.readable, {
headers: this.headers,
status: this.statusCode,
statusText: this.statusMessage,
})
})

public statusCode: number | undefined
public statusMessage: string | undefined

get sent() {
return this._sent
}

constructor(public transformStream = new TransformStream()) {
super(transformStream.writable)
}
Expand Down Expand Up @@ -99,12 +84,25 @@ export class WebNextResponse extends BaseNextResponse<WritableStream> {
return this
}

send() {
this.sendResolve?.()
private readonly sendPromise = new DetachedPromise<void>()
private _sent = false
public send() {
this.sendPromise.resolve()
this._sent = true
}

toResponse() {
return this.response
get sent() {
return this._sent
}

public async toResponse() {
// If we haven't called `send` yet, wait for it to be called.
if (!this.sent) await this.sendPromise.promise

return new Response(this.textBody ?? this.transformStream.readable, {
headers: this.headers,
status: this.statusCode,
statusText: this.statusMessage,
})
}
}
9 changes: 4 additions & 5 deletions packages/next/src/server/body-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ export function requestToBodyStream(
stream: Readable
) {
return new context.ReadableStream({
start(controller) {
stream.on('data', (chunk) =>
start: async (controller) => {
for await (const chunk of stream) {
controller.enqueue(new KUint8Array([...new Uint8Array(chunk)]))
)
stream.on('end', () => controller.close())
stream.on('error', (err) => controller.error(err))
}
controller.close()
},
})
}
Expand Down
28 changes: 3 additions & 25 deletions packages/next/src/server/font-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
DEFAULT_SANS_SERIF_FONT,
} from '../shared/lib/constants'
const capsizeFontsMetrics = require('next/dist/server/capsize-font-metrics.json')
const https = require('https')

const CHROME_UA =
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36'
Expand All @@ -22,30 +21,9 @@ function isGoogleFont(url: string): boolean {
return url.startsWith(GOOGLE_FONT_PROVIDER)
}

function getFontForUA(url: string, UA: string): Promise<String> {
return new Promise((resolve, reject) => {
let rawData: any = ''
https
.get(
url,
{
headers: {
'user-agent': UA,
},
},
(res: any) => {
res.on('data', (chunk: any) => {
rawData += chunk
})
res.on('end', () => {
resolve(rawData.toString('utf8'))
})
}
)
.on('error', (e: Error) => {
reject(e)
})
})
async function getFontForUA(url: string, UA: string): Promise<string> {
const res = await fetch(url, { headers: { 'user-agent': UA } })
return await res.text()
}

export async function getFontDefinitionFromNetwork(
Expand Down
Loading

0 comments on commit 07c434d

Please sign in to comment.