Skip to content

Commit ffa5dd3

Browse files
Refactor redisReceiver to eliminate global state and prevent memory leaks (#1894)
## Key Improvements ### Architecture Changes - **Eliminated global state**: Removed shared Redis clients and global promise maps that caused memory leaks and cross-request contamination - **Lazy initialization**: Stream listening now starts only when getValue() is called, reducing unnecessary connections - **Simplified promise management**: Replaced complex dual-lifecycle promise handling with a simple value map and promise cache - **Single-phase stream reading**: Merged checkExistingMessages() and setupStreamListener() into one listenToStream() function ### Bug Fixes - Fixed memory leaks by using Map.delete() instead of setting undefined - Fixed race conditions in connection handling with proper promise guards - Improved error handling and propagation throughout the module - Fixed lastProcessedId tracking to prevent message loss - Added proper cleanup in destroy() method
1 parent 43accf3 commit ffa5dd3

File tree

7 files changed

+262
-278
lines changed

7 files changed

+262
-278
lines changed

.github/workflows/pro-integration-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ jobs:
207207
- name: Run Rails server in background
208208
run: |
209209
cd spec/dummy
210-
RAILS_ENV=test rails server &
210+
RAILS_ENV="test" rails server &
211211
212212
- name: Wait for Rails server to start
213213
run: |
@@ -392,7 +392,7 @@ jobs:
392392
- name: Run Rails server in background
393393
run: |
394394
cd spec/dummy
395-
RAILS_ENV=test rails server &
395+
RAILS_ENV="test" rails server &
396396
397397
- name: Wait for Rails server to start
398398
run: |

.github/workflows/pro-package-tests.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ jobs:
9898
package-js-tests:
9999
needs: build-dummy-app-webpack-test-bundles
100100
runs-on: ubuntu-22.04
101+
# Redis service container
102+
services:
103+
redis:
104+
image: cimg/redis:6.2.6
105+
ports:
106+
- 6379:6379
107+
options: >-
108+
--health-cmd "redis-cli ping"
109+
--health-interval 10s
110+
--health-timeout 5s
111+
--health-retries 5
101112
env:
102113
REACT_ON_RAILS_PRO_LICENSE: ${{ secrets.REACT_ON_RAILS_PRO_LICENSE }}
103114
steps:
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// This test in only for documenting Redis client usage
2+
3+
import { createClient } from 'redis';
4+
5+
const redisClient = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
6+
7+
interface RedisStreamMessage {
8+
id: string;
9+
message: Record<string, string>;
10+
}
11+
interface RedisStreamResult {
12+
name: string;
13+
messages: RedisStreamMessage[];
14+
}
15+
16+
test('Redis client connects successfully', async () => {
17+
await redisClient.connect();
18+
expect(redisClient.isOpen).toBe(true);
19+
await redisClient.quit();
20+
});
21+
22+
test('calls connect after quit', async () => {
23+
await redisClient.connect();
24+
expect(redisClient.isOpen).toBe(true);
25+
await redisClient.quit();
26+
27+
await redisClient.connect();
28+
expect(redisClient.isOpen).toBe(true);
29+
await redisClient.quit();
30+
});
31+
32+
test('calls quit before connect is resolved', async () => {
33+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
34+
const connectPromise = client.connect();
35+
await client.quit();
36+
await connectPromise;
37+
expect(client.isOpen).toBe(false);
38+
});
39+
40+
test('multiple connect calls', async () => {
41+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
42+
const connectPromise1 = client.connect();
43+
const connectPromise2 = client.connect();
44+
await expect(connectPromise2).rejects.toThrow('Socket already opened');
45+
await expect(connectPromise1).resolves.toMatchObject({});
46+
expect(client.isOpen).toBe(true);
47+
await client.quit();
48+
});
49+
50+
test('write to stream and read back', async () => {
51+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
52+
await client.connect();
53+
54+
const streamKey = 'test-stream';
55+
await client.del(streamKey);
56+
const messageId = await client.xAdd(streamKey, '*', { field1: 'value1' });
57+
58+
const result = (await client.xRead({ key: streamKey, id: '0-0' }, { COUNT: 1, BLOCK: 2000 })) as
59+
| RedisStreamResult[]
60+
| null;
61+
expect(result).not.toBeNull();
62+
expect(result).toBeDefined();
63+
64+
const [stream] = result!;
65+
expect(stream).toBeDefined();
66+
expect(stream?.messages.length).toBe(1);
67+
const [message] = stream!.messages;
68+
expect(message!.id).toBe(messageId);
69+
expect(message!.message).toEqual({ field1: 'value1' });
70+
71+
await client.quit();
72+
});
73+
74+
test('quit while reading from stream', async () => {
75+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
76+
await client.connect();
77+
78+
const streamKey = 'test-stream-quit';
79+
80+
const readPromise = client.xRead({ key: streamKey, id: '$' }, { BLOCK: 0 });
81+
82+
// Wait a moment to ensure xRead is blocking
83+
await new Promise((resolve) => {
84+
setTimeout(resolve, 500);
85+
});
86+
87+
client.destroy();
88+
89+
await expect(readPromise).rejects.toThrow();
90+
});
91+
92+
it('expire sets TTL on stream', async () => {
93+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
94+
await client.connect();
95+
96+
const streamKey = 'test-stream-expire';
97+
await client.del(streamKey);
98+
await client.xAdd(streamKey, '*', { field1: 'value1' });
99+
100+
const expireResult = await client.expire(streamKey, 1); // 1 second
101+
expect(expireResult).toBe(1); // 1 means the key existed and TTL was set
102+
103+
const ttl1 = await client.ttl(streamKey);
104+
expect(ttl1).toBeLessThanOrEqual(1);
105+
expect(ttl1).toBeGreaterThan(0);
106+
107+
const existsBeforeTimeout = await client.exists(streamKey);
108+
expect(existsBeforeTimeout).toBe(1); // Key should exist before timeout
109+
110+
// Wait for 1.1 seconds
111+
await new Promise((resolve) => {
112+
setTimeout(resolve, 1100);
113+
});
114+
115+
const existsAfterTimeout = await client.exists(streamKey);
116+
expect(existsAfterTimeout).toBe(0); // Key should have expired
117+
118+
await client.quit();
119+
});

react_on_rails_pro/spec/dummy/client/app/components/ErrorComponent.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const ErrorComponent = ({ error }: { error: Error }) => {
77
<div>
88
<h1>Error happened while rendering RSC Page</h1>
99
<p>{error.message}</p>
10+
<p>{error.stack}</p>
1011
</div>
1112
);
1213
};

react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RSCPostsPageOverRedis.jsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ import RSCPostsPage from '../components/RSCPostsPage/Main';
33
import { listenToRequestData } from '../utils/redisReceiver';
44

55
const RSCPostsPageOverRedis = ({ requestId, ...props }, railsContext) => {
6-
const { getValue, close } = listenToRequestData(requestId);
6+
const { getValue, destroy } = listenToRequestData(requestId);
77

88
const fetchPosts = () => getValue('posts');
99
const fetchComments = (postId) => getValue(`comments:${postId}`);
1010
const fetchUser = (userId) => getValue(`user:${userId}`);
1111

1212
if ('addPostSSRHook' in railsContext) {
13-
railsContext.addPostSSRHook(close);
13+
railsContext.addPostSSRHook(destroy);
1414
}
1515

1616
return () => (

react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RedisReceiver.jsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ const AsyncToggleContainer = async ({ children, childrenTitle, getValue }) => {
3434
};
3535

3636
const RedisReceiver = ({ requestId, asyncToggleContainer }, railsContext) => {
37-
const { getValue, close } = listenToRequestData(requestId);
37+
const { getValue, destroy } = listenToRequestData(requestId);
3838

3939
if ('addPostSSRHook' in railsContext) {
40-
railsContext.addPostSSRHook(close);
40+
railsContext.addPostSSRHook(destroy);
4141
}
4242

4343
const UsedToggleContainer = asyncToggleContainer ? AsyncToggleContainer : ToggleContainer;

0 commit comments

Comments
 (0)