From ec9400dc41715bb6ff0392d6320c33627fa7e2ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Markb=C3=A5ge?= Date: Fri, 3 May 2024 23:23:55 +0200 Subject: [PATCH] [Flight Reply] Encode ReadableStream and AsyncIterables (#28893) Same as #28847 but in the other direction. Like other promises, this doesn't actually stream in the outgoing direction. It buffers until the stream is done. This is mainly due to our protocol remains compatible with Safari's lack of outgoing streams until recently. However, the stream chunks are encoded as separate fields and so does support the busboy streaming on the receiving side. --- .../src/ReactFlightReplyClient.js | 146 +++++++- .../src/__tests__/ReactFlightDOMReply-test.js | 161 +++++++++ .../__tests__/ReactFlightDOMReplyEdge-test.js | 101 ++++++ .../src/ReactFlightReplyServer.js | 328 +++++++++++++++++- 4 files changed, 722 insertions(+), 14 deletions(-) diff --git a/packages/react-client/src/ReactFlightReplyClient.js b/packages/react-client/src/ReactFlightReplyClient.js index b54f6e4edb0f5..a78f4517546b6 100644 --- a/packages/react-client/src/ReactFlightReplyClient.js +++ b/packages/react-client/src/ReactFlightReplyClient.js @@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences'; import { enableRenderableContext, enableBinaryFlight, + enableFlightReadableStream, } from 'shared/ReactFeatureFlags'; import { @@ -28,6 +29,7 @@ import { REACT_CONTEXT_TYPE, REACT_PROVIDER_TYPE, getIteratorFn, + ASYNC_ITERATOR, } from 'shared/ReactSymbols'; import { @@ -206,6 +208,123 @@ export function processReply( return '$' + tag + blobId.toString(16); } + function serializeReadableStream(stream: ReadableStream): string { + if (formData === null) { + // Upgrade to use FormData to allow us to stream this value. + formData = new FormData(); + } + const data = formData; + + pendingParts++; + const streamId = nextPartId++; + + // Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the + // receiving side. It also implies that different chunks can be split up or merged as opposed + // to a readable stream that happens to have Uint8Array as the type which might expect it to be + // received in the same slices. + // $FlowFixMe: This is a Node.js extension. + let supportsBYOB: void | boolean = stream.supportsBYOB; + if (supportsBYOB === undefined) { + try { + // $FlowFixMe[extra-arg]: This argument is accepted. + stream.getReader({mode: 'byob'}).releaseLock(); + supportsBYOB = true; + } catch (x) { + supportsBYOB = false; + } + } + + const reader = stream.getReader(); + + function progress(entry: {done: boolean, value: ReactServerValue, ...}) { + if (entry.done) { + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, 'C'); // Close signal + pendingParts--; + if (pendingParts === 0) { + resolve(data); + } + } else { + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, partJSON); + reader.read().then(progress, reject); + } catch (x) { + reject(x); + } + } + } + reader.read().then(progress, reject); + + return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16); + } + + function serializeAsyncIterable( + iterable: $AsyncIterable, + iterator: $AsyncIterator, + ): string { + if (formData === null) { + // Upgrade to use FormData to allow us to stream this value. + formData = new FormData(); + } + const data = formData; + + pendingParts++; + const streamId = nextPartId++; + + // Generators/Iterators are Iterables but they're also their own iterator + // functions. If that's the case, we treat them as single-shot. Otherwise, + // we assume that this iterable might be a multi-shot and allow it to be + // iterated more than once on the receiving server. + const isIterator = iterable === iterator; + + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + function progress( + entry: + | {done: false, +value: ReactServerValue, ...} + | {done: true, +value: ReactServerValue, ...}, + ) { + if (entry.done) { + if (entry.value === undefined) { + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, 'C'); // Close signal + } else { + // Unlike streams, the last value may not be undefined. If it's not + // we outline it and encode a reference to it in the closing instruction. + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal + } catch (x) { + reject(x); + return; + } + } + pendingParts--; + if (pendingParts === 0) { + resolve(data); + } + } else { + try { + // $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here. + const partJSON: string = JSON.stringify(entry.value, resolveToJSON); + // eslint-disable-next-line react-internal/safe-string-coercion + data.append(formFieldPrefix + streamId, partJSON); + iterator.next().then(progress, reject); + } catch (x) { + reject(x); + return; + } + } + } + + iterator.next().then(progress, reject); + return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16); + } + function resolveToJSON( this: | {+[key: string | number]: ReactServerValue} @@ -349,11 +468,9 @@ export function processReply( reject(reason); } }, - reason => { - // In the future we could consider serializing this as an error - // that throws on the server instead. - reject(reason); - }, + // In the future we could consider serializing this as an error + // that throws on the server instead. + reject, ); return serializePromiseID(promiseId); } @@ -486,6 +603,25 @@ export function processReply( return Array.from((iterator: any)); } + if (enableFlightReadableStream) { + // TODO: ReadableStream is not available in old Node. Remove the typeof check later. + if ( + typeof ReadableStream === 'function' && + value instanceof ReadableStream + ) { + return serializeReadableStream(value); + } + const getAsyncIterator: void | (() => $AsyncIterator) = + (value: any)[ASYNC_ITERATOR]; + if (typeof getAsyncIterator === 'function') { + // We treat AsyncIterables as a Fragment and as such we might need to key them. + return serializeAsyncIterable( + (value: any), + getAsyncIterator.call((value: any)), + ); + } + } + // Verify that this is a simple plain object. const proto = getPrototypeOf(value); if ( diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js index 62948f275298c..4e38815cad1cc 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js @@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => { // This should've been the same reference that we already saw. expect(response.children).toBe(children); }); + + // @gate enableFlightReadableStream + it('should supports streaming ReadableStream with objects', async () => { + let controller1; + let controller2; + const s1 = new ReadableStream({ + start(c) { + controller1 = c; + }, + }); + const s2 = new ReadableStream({ + start(c) { + controller2 = c; + }, + }); + + const promise = ReactServerDOMClient.encodeReply({s1, s2}); + + controller1.enqueue({hello: 'world'}); + controller2.enqueue({hi: 'there'}); + + controller1.enqueue('text1'); + controller2.enqueue('text2'); + + controller1.close(); + controller2.close(); + + const body = await promise; + + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + const reader1 = result.s1.getReader(); + const reader2 = result.s2.getReader(); + + expect(await reader1.read()).toEqual({ + value: {hello: 'world'}, + done: false, + }); + expect(await reader2.read()).toEqual({ + value: {hi: 'there'}, + done: false, + }); + + expect(await reader1.read()).toEqual({ + value: 'text1', + done: false, + }); + expect(await reader1.read()).toEqual({ + value: undefined, + done: true, + }); + expect(await reader2.read()).toEqual({ + value: 'text2', + done: false, + }); + expect(await reader2.read()).toEqual({ + value: undefined, + done: true, + }); + }); + + // @gate enableFlightReadableStream + it('should supports streaming AsyncIterables with objects', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + const next = yield {hello: 'A'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'B'}; + return 'C'; + }, + }; + const singleShotIterator = (async function* () { + const next = yield {hello: 'D'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'E'}; + return 'F'; + })(); + + await resolve(); + + const body = await ReactServerDOMClient.encodeReply({ + multiShotIterable, + singleShotIterator, + }); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const iterator1 = result.multiShotIterable[Symbol.asyncIterator](); + const iterator2 = result.singleShotIterator[Symbol.asyncIterator](); + + expect(iterator1).not.toBe(result.multiShotIterable); + expect(iterator2).toBe(result.singleShotIterator); + + expect(await iterator1.next()).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hello: 'D'}, + done: false, + }); + + expect(await iterator1.next()).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hi: 'E'}, + done: false, + }); + expect(await iterator1.next()).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(await iterator1.next()).toEqual({ + value: undefined, + done: true, + }); + + expect(await iterator2.next()).toEqual({ + value: 'F', // Return value + done: true, + }); + + // Multi-shot iterables should be able to do the same thing again + const iterator3 = result.multiShotIterable[Symbol.asyncIterator](); + + expect(iterator3).not.toBe(iterator1); + + // We should be able to iterate over the iterable again and it should be + // synchronously available using instrumented promises so that React can + // rerender it synchronously. + expect(iterator3.next().value).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(iterator3.next().value).toEqual({ + value: undefined, + done: true, + }); + + expect(() => iterator3.next('this is not allowed')).toThrow( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index ab0d54c0bc0f3..93f23e22ae93a 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -132,4 +132,105 @@ describe('ReactFlightDOMReplyEdge', () => { expect(resultBlob.size).toBe(bytes.length * 2); expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should supports ReadableStreams with typed arrays', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + buffer, + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This is not a binary stream, it's a stream that contain binary chunks. + const s = new ReadableStream({ + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const body = await ReactServerDOMClient.encodeReply(s); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const streamedBuffers = []; + const reader = result.getReader(); + let entry; + while (!(entry = await reader.read()).done) { + streamedBuffers.push(entry.value); + } + + expect(streamedBuffers).toEqual(buffers); + }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should support BYOB binary ReadableStreams', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This a binary stream where each chunk ends up as Uint8Array. + const s = new ReadableStream({ + type: 'bytes', + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const body = await ReactServerDOMClient.encodeReply(s); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + const streamedBuffers = []; + const reader = result.getReader({mode: 'byob'}); + let entry; + while (!(entry = await reader.read(new Uint8Array(10))).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + streamedBuffers.push(entry.value); + } + + // The streamed buffers might be in different chunks and in Uint8Array form but + // the concatenated bytes should be the same. + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( + buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ), + ); + }); }); diff --git a/packages/react-server/src/ReactFlightReplyServer.js b/packages/react-server/src/ReactFlightReplyServer.js index 4a447a5ee18da..1989936afe410 100644 --- a/packages/react-server/src/ReactFlightReplyServer.js +++ b/packages/react-server/src/ReactFlightReplyServer.js @@ -25,7 +25,17 @@ import { } from 'react-client/src/ReactFlightClientConfig'; import {createTemporaryReference} from './ReactFlightServerTemporaryReferences'; -import {enableBinaryFlight} from 'shared/ReactFeatureFlags'; +import { + enableBinaryFlight, + enableFlightReadableStream, +} from 'shared/ReactFeatureFlags'; +import {ASYNC_ITERATOR} from 'shared/ReactSymbols'; + +interface FlightStreamController { + enqueueModel(json: string): void; + close(json: string): void; + error(error: Error): void; +} export type JSONValue = | number @@ -46,35 +56,44 @@ type PendingChunk = { value: null | Array<(T) => mixed>, reason: null | Array<(mixed) => mixed>, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type BlockedChunk = { status: 'blocked', value: null | Array<(T) => mixed>, reason: null | Array<(mixed) => mixed>, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModelChunk = { status: 'resolved_model', value: string, reason: null, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type InitializedChunk = { status: 'fulfilled', value: T, reason: null, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, +}; +type InitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +> = { + status: 'fulfilled', + value: T, + reason: FlightStreamController, + _response: Response, + then(resolve: (ReadableStream) => mixed, reject?: (mixed) => mixed): void, }; type ErroredChunk = { status: 'rejected', value: null, reason: mixed, _response: Response, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type SomeChunk = | PendingChunk @@ -181,7 +200,14 @@ function wakeChunkIfInitialized( function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + // $FlowFixMe[incompatible-call]: The error method should accept mixed. + controller.error(error); + } return; } const listeners = chunk.reason; @@ -203,7 +229,17 @@ function createResolvedModelChunk( function resolveModelChunk(chunk: SomeChunk, value: string): void { if (chunk.status !== PENDING) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + if (value[0] === 'C') { + controller.close(value === 'C' ? '"$undefined"' : value.slice(1)); + } else { + controller.enqueueModel(value); + } + } return; } const resolveListeners = chunk.value; @@ -221,6 +257,42 @@ function resolveModelChunk(chunk: SomeChunk, value: string): void { } } +function createInitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +>( + response: Response, + value: T, + controller: FlightStreamController, +): InitializedChunk { + // We use the reason field to stash the controller since we already have that + // field. It's a bit of a hack but efficient. + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, value, controller, response); +} + +function createResolvedIteratorResultChunk( + response: Response, + value: string, + done: boolean, +): ResolvedModelChunk> { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(RESOLVED_MODEL, iteratorResultJSON, null, response); +} + +function resolveIteratorResultChunk( + chunk: SomeChunk>, + value: string, + done: boolean, +): void { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + resolveModelChunk(chunk, iteratorResultJSON); +} + function bindArgs(fn: any, args: any) { return fn.bind.apply(fn, [null].concat(args)); } @@ -342,11 +414,18 @@ function createModelResolver( } else { blocked = initializingChunkBlockedModel = { deps: 1, - value: null, + value: (null: any), }; } return value => { parentObject[key] = value; + + // If this is the root object for a model reference, where `blocked.value` + // is a stale `null`, the resolved value can be used directly. + if (key === '' && blocked.value === null) { + blocked.value = parentObject[key]; + } + blocked.deps--; if (blocked.deps === 0) { if (chunk.status !== BLOCKED) { @@ -411,6 +490,221 @@ function parseTypedArray( return null; } +function resolveStream>( + response: Response, + id: number, + stream: T, + controller: FlightStreamController, +): void { + const chunks = response._chunks; + const chunk = createInitializedStreamChunk(response, stream, controller); + chunks.set(id, chunk); + + const prefix = response._prefix; + const key = prefix + id; + const existingEntries = response._formData.getAll(key); + for (let i = 0; i < existingEntries.length; i++) { + // We assume that this is a string entry for now. + const value: string = (existingEntries[i]: any); + if (value[0] === 'C') { + controller.close(value === 'C' ? '"$undefined"' : value.slice(1)); + } else { + controller.enqueueModel(value); + } + } +} + +function parseReadableStream( + response: Response, + reference: string, + type: void | 'bytes', + parentObject: Object, + parentKey: string, +): ReadableStream { + const id = parseInt(reference.slice(2), 16); + + let controller: ReadableStreamController = (null: any); + const stream = new ReadableStream({ + type: type, + start(c) { + controller = c; + }, + }); + let previousBlockedChunk: SomeChunk | null = null; + const flightController = { + enqueueModel(json: string): void { + if (previousBlockedChunk === null) { + // If we're not blocked on any other chunks, we can try to eagerly initialize + // this as a fast-path to avoid awaiting them. + const chunk: ResolvedModelChunk = createResolvedModelChunk( + response, + json, + ); + initializeModelChunk(chunk); + const initializedChunk: SomeChunk = chunk; + if (initializedChunk.status === INITIALIZED) { + controller.enqueue(initializedChunk.value); + } else { + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + } + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + const blockedChunk = previousBlockedChunk; + const chunk: SomeChunk = createPendingChunk(response); + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + blockedChunk.then(function () { + if (previousBlockedChunk === chunk) { + // We were still the last chunk so we can now clear the queue and return + // to synchronous emitting. + previousBlockedChunk = null; + } + resolveModelChunk(chunk, json); + }); + } + }, + close(json: string): void { + if (previousBlockedChunk === null) { + controller.close(); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.close()); + } + }, + error(error: mixed): void { + if (previousBlockedChunk === null) { + // $FlowFixMe[incompatible-call] + controller.error(error); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.error((error: any))); + } + }, + }; + resolveStream(response, id, stream, flightController); + return stream; +} + +function asyncIterator(this: $AsyncIterator) { + // Self referencing iterator. + return this; +} + +function createIterator( + next: (arg: void) => SomeChunk>, +): $AsyncIterator { + const iterator: any = { + next: next, + // TODO: Add return/throw as options for aborting. + }; + // TODO: The iterator could inherit the AsyncIterator prototype which is not exposed as + // a global but exists as a prototype of an AsyncGenerator. However, it's not needed + // to satisfy the iterable protocol. + (iterator: any)[ASYNC_ITERATOR] = asyncIterator; + return iterator; +} + +function parseAsyncIterable( + response: Response, + reference: string, + iterator: boolean, + parentObject: Object, + parentKey: string, +): $AsyncIterable | $AsyncIterator { + const id = parseInt(reference.slice(2), 16); + + const buffer: Array>> = []; + let closed = false; + let nextWriteIndex = 0; + const flightController = { + enqueueModel(value: string): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + false, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, false); + } + nextWriteIndex++; + }, + close(value: string): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + true, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, true); + } + nextWriteIndex++; + while (nextWriteIndex < buffer.length) { + // In generators, any extra reads from the iterator have the value undefined. + resolveIteratorResultChunk( + buffer[nextWriteIndex++], + '"$undefined"', + true, + ); + } + }, + error(error: Error): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = + createPendingChunk>(response); + } + while (nextWriteIndex < buffer.length) { + triggerErrorOnChunk(buffer[nextWriteIndex++], error); + } + }, + }; + const iterable: $AsyncIterable = { + [ASYNC_ITERATOR](): $AsyncIterator { + let nextReadIndex = 0; + return createIterator(arg => { + if (arg !== undefined) { + throw new Error( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + } + if (nextReadIndex === buffer.length) { + if (closed) { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk( + INITIALIZED, + {done: true, value: undefined}, + null, + response, + ); + } + buffer[nextReadIndex] = + createPendingChunk>(response); + } + return buffer[nextReadIndex++]; + }); + }, + }; + // TODO: If it's a single shot iterator we can optimize memory by cleaning up the buffer after + // reading through the end, but currently we favor code size over this optimization. + const stream = iterator ? iterable[ASYNC_ITERATOR]() : iterable; + resolveStream(response, id, stream, flightController); + return stream; +} + function parseModelString( response: Response, obj: Object, @@ -560,6 +854,22 @@ function parseModelString( } } } + if (enableFlightReadableStream) { + switch (value[1]) { + case 'R': { + return parseReadableStream(response, value, undefined, obj, key); + } + case 'r': { + return parseReadableStream(response, value, 'bytes', obj, key); + } + case 'X': { + return parseAsyncIterable(response, value, false, obj, key); + } + case 'x': { + return parseAsyncIterable(response, value, true, obj, key); + } + } + } // We assume that anything else is a reference ID. const id = parseInt(value.slice(1), 16);