Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions knip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const config: KnipConfig = {
'tests/emptyForTesting.js',
// Jest setup and test utilities - not detected by Jest plugin in workspace setup
'tests/jest.setup.js',
'tests/utils/removeRSCStackFromAllChunks.ts',
// Build output directories that should be ignored
'lib/**',
// Pro features exported for external consumption
Expand Down
6 changes: 5 additions & 1 deletion packages/react-on-rails-pro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,9 @@
"bugs": {
"url": "https://github.com/shakacode/react_on_rails/issues"
},
"homepage": "https://github.com/shakacode/react_on_rails#readme"
"homepage": "https://github.com/shakacode/react_on_rails#readme",
"devDependencies": {
"@types/mock-fs": "^4.13.4",
"mock-fs": "^5.5.0"
}
}
57 changes: 57 additions & 0 deletions packages/react-on-rails-pro/tests/AsyncQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import * as EventEmitter from 'node:events';

class AsyncQueue<T> {
private eventEmitter = new EventEmitter();

private buffer: T[] = [];

private isEnded = false;

enqueue(value: T) {
if (this.isEnded) {
throw new Error('Queue Ended');
}

if (this.eventEmitter.listenerCount('data') > 0) {
this.eventEmitter.emit('data', value);
} else {
this.buffer.push(value);
}
}

end() {
this.isEnded = true;
this.eventEmitter.emit('end');
}

dequeue() {
return new Promise<T>((resolve, reject) => {
const bufferValueIfExist = this.buffer.shift();
if (bufferValueIfExist) {
resolve(bufferValueIfExist);
} else if (this.isEnded) {
reject(new Error('Queue Ended'));
} else {
let teardown = () => {};
const onData = (value: T) => {
resolve(value);
teardown();
};

const onEnd = () => {
reject(new Error('Queue Ended'));
teardown();
};

this.eventEmitter.on('data', onData);
this.eventEmitter.on('end', onEnd);
teardown = () => {
this.eventEmitter.off('data', onData);
this.eventEmitter.off('end', onEnd);
};
}
});
}
}

export default AsyncQueue;
33 changes: 33 additions & 0 deletions packages/react-on-rails-pro/tests/StreamReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { PassThrough, Readable } from 'node:stream';
import AsyncQueue from './AsyncQueue.ts';

class StreamReader {
private asyncQueue: AsyncQueue<string>;

constructor(pipeableStream: Pick<Readable, 'pipe'>) {
this.asyncQueue = new AsyncQueue();
const decoder = new TextDecoder();

const readableStream = new PassThrough();
pipeableStream.pipe(readableStream);

readableStream.on('data', (chunk: Buffer) => {
const decodedChunk = decoder.decode(chunk);
this.asyncQueue.enqueue(decodedChunk);
});

if (readableStream.closed) {
this.asyncQueue.end();
} else {
readableStream.on('end', () => {
this.asyncQueue.end();
});
}
}

nextChunk() {
return this.asyncQueue.dequeue();
}
}

export default StreamReader;
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/**
* @jest-environment node
*/
/// <reference types="react/experimental" />

import * as React from 'react';
import { Suspense, PropsWithChildren } from 'react';

import * as path from 'path';
import * as mock from 'mock-fs';

import ReactOnRails, { RailsContextWithServerStreamingCapabilities } from '../src/ReactOnRailsRSC.ts';
import AsyncQueue from './AsyncQueue.ts';
import StreamReader from './StreamReader.ts';
import removeRSCChunkStack from './utils/removeRSCChunkStack.ts';

const manifestFileDirectory = path.resolve(__dirname, '../src');
const clientManifestPath = path.join(manifestFileDirectory, 'react-client-manifest.json');

beforeEach(() => {
mock({
[clientManifestPath]: JSON.stringify({
filePathToModuleMetadata: {},
moduleLoading: { prefix: '', crossOrigin: null },
}),
});
});

afterEach(() => mock.restore());

const AsyncQueueItem = async ({
asyncQueue,
children,
}: PropsWithChildren<{ asyncQueue: AsyncQueue<string> }>) => {
const value = await asyncQueue.dequeue();

return (
<>
<p>Data: {value}</p>
{children}
</>
);
};

const AsyncQueueContainer = ({ asyncQueue }: { asyncQueue: AsyncQueue<string> }) => {
return (
<div>
<h1>Async Queue</h1>
<Suspense fallback={<p>Loading Item1</p>}>
<AsyncQueueItem asyncQueue={asyncQueue}>
<Suspense fallback={<p>Loading Item2</p>}>
<AsyncQueueItem asyncQueue={asyncQueue}>
<Suspense fallback={<p>Loading Item3</p>}>
<AsyncQueueItem asyncQueue={asyncQueue} />
</Suspense>
</AsyncQueueItem>
</Suspense>
</AsyncQueueItem>
</Suspense>
</div>
);
};

ReactOnRails.register({ AsyncQueueContainer });

const renderComponent = (props: Record<string, unknown>) => {
return ReactOnRails.serverRenderRSCReactComponent({
railsContext: {
reactClientManifestFileName: 'react-client-manifest.json',
reactServerClientManifestFileName: 'react-server-client-manifest.json',
} as unknown as RailsContextWithServerStreamingCapabilities,
name: 'AsyncQueueContainer',
renderingReturnsPromises: true,
throwJsErrors: true,
domNodeId: 'dom-id',
props,
});
};

const createParallelRenders = (size: number) => {
const asyncQueues = new Array(size).fill(null).map(() => new AsyncQueue<string>());
const streams = asyncQueues.map((asyncQueue) => {
return renderComponent({ asyncQueue });
});
const readers = streams.map((stream) => new StreamReader(stream));

const enqueue = (value: string) => asyncQueues.forEach((asyncQueue) => asyncQueue.enqueue(value));

const expectNextChunk = (nextChunk: string) =>
Promise.all(
readers.map(async (reader) => {
const chunk = await reader.nextChunk();
expect(removeRSCChunkStack(chunk)).toEqual(removeRSCChunkStack(nextChunk));
}),
);

const expectEndOfStream = () =>
Promise.all(readers.map((reader) => expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/)));

return { enqueue, expectNextChunk, expectEndOfStream };
};

test('Renders concurrent rsc streams as single rsc stream', async () => {
expect.assertions(258);
const asyncQueue = new AsyncQueue<string>();
const stream = renderComponent({ asyncQueue });
const reader = new StreamReader(stream);

const chunks: string[] = [];
let chunk = await reader.nextChunk();
chunks.push(chunk);
expect(chunk).toContain('Async Queue');
expect(chunk).toContain('Loading Item2');
expect(chunk).not.toContain('Random Value');

asyncQueue.enqueue('Random Value1');
chunk = await reader.nextChunk();
chunks.push(chunk);
expect(chunk).toContain('Random Value1');

asyncQueue.enqueue('Random Value2');
chunk = await reader.nextChunk();
chunks.push(chunk);
expect(chunk).toContain('Random Value2');

asyncQueue.enqueue('Random Value3');
chunk = await reader.nextChunk();
chunks.push(chunk);
expect(chunk).toContain('Random Value3');

await expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/);

const { enqueue, expectNextChunk, expectEndOfStream } = createParallelRenders(50);

expect(chunks).toHaveLength(4);
await expectNextChunk(chunks[0]);
enqueue('Random Value1');
await expectNextChunk(chunks[1]);
enqueue('Random Value2');
await expectNextChunk(chunks[2]);
enqueue('Random Value3');
await expectNextChunk(chunks[3]);
await expectEndOfStream();
});
Loading
Loading