Skip to content

Commit

Permalink
[Flight] Add Serialization of Typed Arrays / ArrayBuffer / DataView (#…
Browse files Browse the repository at this point in the history
…26954)

This uses the same mechanism as [large
strings](#26932) to encode chunks
of length based binary data in the RSC payload behind a flag.

I introduce a new BinaryChunk type that's specific to each stream and
ways to convert into it. That's because we sometimes need all chunks to
be Uint8Array for the output, even if the source is another array buffer
view, and sometimes we need to clone it before transferring.

Each type of typed array is its own row tag. This lets us ensure that
the instance is directly in the right format in the cached entry instead
of creating a wrapper at each reference. Ideally this is also how
Map/Set should work but those are lazy which complicates that approach a
bit.

We assume both server and client use little-endian for now. If we want
to support other modes, we'd convert it to/from little-endian so that
the transfer protocol is always little-endian. That way the common
clients can be the fastest possible.

So far this only implements Server to Client. Still need to implement
Client to Server for parity.

NOTE: This is the first time we make RSC effectively a binary format.
This is not compatible with existing SSR techniques which serialize the
stream as unicode in the HTML. To be compatible, those implementations
would have to use base64 or something like that. Which is what we'll do
when we move this technique to be built-in to Fizz.
  • Loading branch information
sebmarkbage authored Jun 29, 2023
1 parent 2153a29 commit d9c3331
Show file tree
Hide file tree
Showing 25 changed files with 517 additions and 66 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,14 @@ module.exports = {
$PropertyType: 'readonly',
$ReadOnly: 'readonly',
$ReadOnlyArray: 'readonly',
$ArrayBufferView: 'readonly',
$Shape: 'readonly',
AnimationFrameID: 'readonly',
// For Flow type annotation. Only `BigInt` is valid at runtime.
bigint: 'readonly',
BigInt: 'readonly',
BigInt64Array: 'readonly',
BigUint64Array: 'readonly',
Class: 'readonly',
ClientRect: 'readonly',
CopyInspectedElementPath: 'readonly',
Expand Down
154 changes: 143 additions & 11 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import type {HintModel} from 'react-server/src/ReactFlightServerConfig';

import type {CallServerCallback} from './ReactFlightReplyClient';

import {enableBinaryFlight} from 'shared/ReactFeatureFlags';

import {
resolveClientReference,
preloadModule,
Expand Down Expand Up @@ -297,6 +299,14 @@ function createInitializedTextChunk(
return new Chunk(INITIALIZED, value, null, response);
}

function createInitializedBufferChunk(
response: Response,
value: $ArrayBufferView | ArrayBuffer,
): InitializedChunk<Uint8Array> {
// $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors
return new Chunk(INITIALIZED, value, null, response);
}

function resolveModelChunk<T>(
chunk: SomeChunk<T>,
value: UninitializedModel,
Expand Down Expand Up @@ -738,6 +748,16 @@ function resolveText(response: Response, id: number, text: string): void {
chunks.set(id, createInitializedTextChunk(response, text));
}

function resolveBuffer(
response: Response,
id: number,
buffer: $ArrayBufferView | ArrayBuffer,
): void {
const chunks = response._chunks;
// We assume that we always reference buffers after they've been emitted.
chunks.set(id, createInitializedBufferChunk(response, buffer));
}

function resolveModule(
response: Response,
id: number,
Expand Down Expand Up @@ -856,24 +876,120 @@ function resolveHint(
dispatchHint(code, hintModel);
}

function mergeBuffer(
buffer: Array<Uint8Array>,
lastChunk: Uint8Array,
): Uint8Array {
const l = buffer.length;
// Count the bytes we'll need
let byteLength = lastChunk.length;
for (let i = 0; i < l; i++) {
byteLength += buffer[i].byteLength;
}
// Allocate enough contiguous space
const result = new Uint8Array(byteLength);
let offset = 0;
// Copy all the buffers into it.
for (let i = 0; i < l; i++) {
const chunk = buffer[i];
result.set(chunk, offset);
offset += chunk.byteLength;
}
result.set(lastChunk, offset);
return result;
}

function resolveTypedArray(
response: Response,
id: number,
buffer: Array<Uint8Array>,
lastChunk: Uint8Array,
constructor: any,
bytesPerElement: number,
): void {
// If the view fits into one original buffer, we just reuse that buffer instead of
// copying it out to a separate copy. This means that it's not always possible to
// transfer these values to other threads without copying first since they may
// share array buffer. For this to work, it must also have bytes aligned to a
// multiple of a size of the type.
const chunk =
buffer.length === 0 && lastChunk.byteOffset % bytesPerElement === 0
? lastChunk
: mergeBuffer(buffer, lastChunk);
// TODO: The transfer protocol of RSC is little-endian. If the client isn't little-endian
// we should convert it instead. In practice big endian isn't really Web compatible so it's
// somewhat safe to assume that browsers aren't going to run it, but maybe there's some SSR
// server that's affected.
const view: $ArrayBufferView = new constructor(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength / bytesPerElement,
);
resolveBuffer(response, id, view);
}

function processFullRow(
response: Response,
id: number,
tag: number,
buffer: Array<Uint8Array>,
lastChunk: string | Uint8Array,
chunk: Uint8Array,
): void {
let row = '';
if (enableBinaryFlight) {
switch (tag) {
case 65 /* "A" */:
// We must always clone to extract it into a separate buffer instead of just a view.
resolveBuffer(response, id, mergeBuffer(buffer, chunk).buffer);
return;
case 67 /* "C" */:
resolveTypedArray(response, id, buffer, chunk, Int8Array, 1);
return;
case 99 /* "c" */:
resolveBuffer(
response,
id,
buffer.length === 0 ? chunk : mergeBuffer(buffer, chunk),
);
return;
case 85 /* "U" */:
resolveTypedArray(response, id, buffer, chunk, Uint8ClampedArray, 1);
return;
case 83 /* "S" */:
resolveTypedArray(response, id, buffer, chunk, Int16Array, 2);
return;
case 115 /* "s" */:
resolveTypedArray(response, id, buffer, chunk, Uint16Array, 2);
return;
case 76 /* "L" */:
resolveTypedArray(response, id, buffer, chunk, Int32Array, 4);
return;
case 108 /* "l" */:
resolveTypedArray(response, id, buffer, chunk, Uint32Array, 4);
return;
case 70 /* "F" */:
resolveTypedArray(response, id, buffer, chunk, Float32Array, 4);
return;
case 68 /* "D" */:
resolveTypedArray(response, id, buffer, chunk, Float64Array, 8);
return;
case 78 /* "N" */:
resolveTypedArray(response, id, buffer, chunk, BigInt64Array, 8);
return;
case 109 /* "m" */:
resolveTypedArray(response, id, buffer, chunk, BigUint64Array, 8);
return;
case 86 /* "V" */:
resolveTypedArray(response, id, buffer, chunk, DataView, 1);
return;
}
}

const stringDecoder = response._stringDecoder;
let row = '';
for (let i = 0; i < buffer.length; i++) {
const chunk = buffer[i];
row += readPartialStringChunk(stringDecoder, chunk);
}
if (typeof lastChunk === 'string') {
row += lastChunk;
} else {
row += readFinalStringChunk(stringDecoder, lastChunk);
row += readPartialStringChunk(stringDecoder, buffer[i]);
}
row += readFinalStringChunk(stringDecoder, chunk);
switch (tag) {
case 73 /* "I" */: {
resolveModule(response, id, row);
Expand Down Expand Up @@ -903,7 +1019,7 @@ function processFullRow(
resolveText(response, id, row);
return;
}
default: {
default: /* """ "{" "[" "t" "f" "n" "0" - "9" */ {
// We assume anything else is JSON.
resolveModel(response, id, row);
return;
Expand Down Expand Up @@ -937,7 +1053,23 @@ export function processBinaryChunk(
}
case ROW_TAG: {
const resolvedRowTag = chunk[i];
if (resolvedRowTag === 84 /* "T" */) {
if (
resolvedRowTag === 84 /* "T" */ ||
(enableBinaryFlight &&
(resolvedRowTag === 65 /* "A" */ ||
resolvedRowTag === 67 /* "C" */ ||
resolvedRowTag === 99 /* "c" */ ||
resolvedRowTag === 85 /* "U" */ ||
resolvedRowTag === 83 /* "S" */ ||
resolvedRowTag === 115 /* "s" */ ||
resolvedRowTag === 76 /* "L" */ ||
resolvedRowTag === 108 /* "l" */ ||
resolvedRowTag === 70 /* "F" */ ||
resolvedRowTag === 68 /* "D" */ ||
resolvedRowTag === 78 /* "N" */ ||
resolvedRowTag === 109 /* "m" */ ||
resolvedRowTag === 86)) /* "V" */
) {
rowTag = resolvedRowTag;
rowState = ROW_LENGTH;
i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface Destination {

export opaque type PrecomputedChunk = string;
export opaque type Chunk = string;
export opaque type BinaryChunk = string;

export function scheduleWork(callback: () => void) {
callback();
Expand All @@ -25,14 +26,14 @@ export function beginWriting(destination: Destination) {}

export function writeChunk(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): void {
writeChunkAndReturn(destination, chunk);
}

export function writeChunkAndReturn(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): boolean {
return destination.push(chunk);
}
Expand All @@ -51,6 +52,12 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return content;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
throw new Error('Not implemented.');
}

export function clonePrecomputedChunk(
chunk: PrecomputedChunk,
): PrecomputedChunk {
Expand All @@ -61,6 +68,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
throw new Error('Not implemented.');
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
throw new Error('Not implemented.');
}

export function closeWithError(destination: Destination, error: mixed): void {
// $FlowFixMe[incompatible-call]: This is an Error object or the destination accepts other types.
destination.destroy(error);
Expand Down
15 changes: 13 additions & 2 deletions packages/react-server-dom-fb/src/ReactServerStreamConfigFB.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export type Destination = {

export opaque type PrecomputedChunk = string;
export opaque type Chunk = string;
export opaque type BinaryChunk = string;

export function scheduleWork(callback: () => void) {
// We don't schedule work in this model, and instead expect performWork to always be called repeatedly.
Expand All @@ -30,14 +31,14 @@ export function beginWriting(destination: Destination) {}

export function writeChunk(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): void {
destination.buffer += chunk;
}

export function writeChunkAndReturn(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: Chunk | PrecomputedChunk | BinaryChunk,
): boolean {
destination.buffer += chunk;
return true;
Expand All @@ -57,6 +58,12 @@ export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return content;
}

export function typedArrayToBinaryChunk(
content: $ArrayBufferView,
): BinaryChunk {
throw new Error('Not implemented.');
}

export function clonePrecomputedChunk(
chunk: PrecomputedChunk,
): PrecomputedChunk {
Expand All @@ -67,6 +74,10 @@ export function byteLengthOfChunk(chunk: Chunk | PrecomputedChunk): number {
throw new Error('Not implemented.');
}

export function byteLengthOfBinaryChunk(chunk: BinaryChunk): number {
throw new Error('Not implemented.');
}

export function closeWithError(destination: Destination, error: mixed): void {
destination.done = true;
destination.fatal = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,31 @@ describe('ReactFlightDOMEdge', () => {
expect(result.text).toBe(testString);
expect(result.text2).toBe(testString2);
});

// @gate enableBinaryFlight
it('should be able to serialize any kind of typed array', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
const buffers = [
buffer,
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
];
const stream = passThrough(
ReactServerDOMServer.renderToReadableStream(buffers),
);
const result = await ReactServerDOMClient.createFromReadableStream(stream);
expect(result).toEqual(buffers);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,32 @@ describe('ReactFlightDOMNode', () => {
// Should still match the result when parsed
expect(result.text).toBe(testString);
});

// @gate enableBinaryFlight
it('should be able to serialize any kind of typed array', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
const buffers = [
buffer,
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
];
const stream = ReactServerDOMServer.renderToPipeableStream(buffers);
const readable = new Stream.PassThrough();
const promise = ReactServerDOMClient.createFromNodeStream(readable);
stream.pipe(readable);
const result = await promise;
expect(result).toEqual(buffers);
});
});
Loading

0 comments on commit d9c3331

Please sign in to comment.