Skip to content

Commit dd70d41

Browse files
Update redisReceiver to use destroy instead of quit and set TTL for streams
1 parent 5aeb91a commit dd70d41

File tree

2 files changed

+88
-7
lines changed

2 files changed

+88
-7
lines changed

react_on_rails_pro/packages/node-renderer/tests/redisClient.test.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@ import { createClient } from 'redis';
22

33
const redisClient = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
44

5+
interface RedisStreamMessage {
6+
id: string;
7+
message: Record<string, string>;
8+
}
9+
interface RedisStreamResult {
10+
name: string;
11+
messages: RedisStreamMessage[];
12+
}
13+
514
test('Redis client connects successfully', async () => {
615
await redisClient.connect();
716
expect(redisClient.isOpen).toBe(true);
@@ -35,3 +44,74 @@ test('multiple connect calls', async () => {
3544
expect(client.isOpen).toBe(true);
3645
await client.quit();
3746
});
47+
48+
test('write to stream and read back', async () => {
49+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
50+
await client.connect();
51+
52+
const streamKey = 'test-stream';
53+
await client.del(streamKey);
54+
const messageId = await client.xAdd(streamKey, '*', { field1: 'value1' });
55+
56+
const result = (await client.xRead({ key: streamKey, id: '0-0' }, { COUNT: 1, BLOCK: 2000 })) as
57+
| RedisStreamResult[]
58+
| null;
59+
expect(result).not.toBeNull();
60+
expect(result).toBeDefined();
61+
62+
const [stream] = result!;
63+
expect(stream).toBeDefined();
64+
expect(stream?.messages.length).toBe(1);
65+
const [message] = stream!.messages;
66+
expect(message!.id).toBe(messageId);
67+
expect(message!.message).toEqual({ field1: 'value1' });
68+
69+
await client.quit();
70+
});
71+
72+
test('quit while reading from stream', async () => {
73+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
74+
await client.connect();
75+
76+
const streamKey = 'test-stream-quit';
77+
78+
const readPromise = client.xRead({ key: streamKey, id: '$' }, { BLOCK: 0 });
79+
80+
// Wait a moment to ensure xRead is blocking
81+
await new Promise((resolve) => {
82+
setTimeout(resolve, 500);
83+
});
84+
85+
client.destroy();
86+
87+
await expect(readPromise).rejects.toThrow();
88+
});
89+
90+
it('expire sets TTL on stream', async () => {
91+
const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
92+
await client.connect();
93+
94+
const streamKey = 'test-stream-expire';
95+
await client.del(streamKey);
96+
await client.xAdd(streamKey, '*', { field1: 'value1' });
97+
98+
const expireResult = await client.expire(streamKey, 1); // 1 second
99+
expect(expireResult).toBe(1); // 1 means the key existed and TTL was set
100+
101+
const ttl1 = await client.ttl(streamKey);
102+
expect(ttl1).toBeLessThanOrEqual(1);
103+
expect(ttl1).toBeGreaterThan(0);
104+
105+
const existsBeforeTimeout = await client.exists(streamKey);
106+
expect(existsBeforeTimeout).toBe(1); // Key should exist before timeout
107+
108+
// Wait for 1.1 seconds
109+
await new Promise((resolve) => {
110+
setTimeout(resolve, 1100);
111+
});
112+
113+
const existsAfterTimeout = await client.exists(streamKey);
114+
expect(existsAfterTimeout).toBe(0); // Key should have expired
115+
116+
await client.quit();
117+
});

react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ interface RedisStreamResult {
2323
*/
2424
interface RequestListener {
2525
getValue: (key: string) => Promise<unknown>;
26-
destroy: () => Promise<void>;
26+
destroy: () => void;
2727
}
2828

2929
/**
@@ -51,13 +51,13 @@ export function listenToRequestData(requestId: string): RequestListener {
5151
/**
5252
* Closes the Redis connection and rejects all pending promises
5353
*/
54-
async function close(): Promise<void> {
54+
function close() {
5555
if (isClosed) return;
5656
isClosed = true;
5757

5858
// Close client - this will cause xRead to throw, which rejects pending promises
5959
try {
60-
await redisClient.quit();
60+
redisClient.destroy();
6161
} finally {
6262
isConnected = false;
6363
}
@@ -84,6 +84,7 @@ export function listenToRequestData(requestId: string): RequestListener {
8484
// And `listenToStream` runs only one promise at a time, so no fear of race condition
8585
if (!isConnected) {
8686
await redisClient.connect();
87+
await redisClient.expire(streamKey, REDIS_LISTENER_TIMEOUT / 1000); // Set TTL to avoid stale streams
8788
isConnected = true;
8889
}
8990

@@ -118,7 +119,7 @@ export function listenToRequestData(requestId: string): RequestListener {
118119

119120
// If end message received, close the connection
120121
if (receivedEndMessage) {
121-
await close();
122+
close();
122123
}
123124
})();
124125

@@ -174,19 +175,19 @@ export function listenToRequestData(requestId: string): RequestListener {
174175
/**
175176
* Destroys the listener, closing the connection and preventing further getValue calls
176177
*/
177-
async function destroy(): Promise<void> {
178+
function destroy() {
178179
if (isDestroyed) return;
179180
isDestroyed = true;
180181

181182
// Clear global timeout
182183
clearTimeout(globalTimeout);
183184

184-
await close();
185+
close();
185186
}
186187

187188
// Global timeout - destroys listener after 15 seconds
188189
globalTimeout = setTimeout(() => {
189-
void destroy();
190+
destroy();
190191
}, REDIS_LISTENER_TIMEOUT);
191192

192193
return { getValue, destroy };

0 commit comments

Comments
 (0)