Skip to content

Commit b16872b

Browse files
Refactor redisReceiver to eliminate global state and prevent memory leaks
This commit completely refactors the redisReceiver module to eliminate all module-level global state and move to request-scoped encapsulation. Changes: - Remove global sharedRedisClient, activeListeners, and pendingPromises - Create private Redis client per listenToRequestData() call - Move pendingPromises to per-listener scope for complete request isolation - Remove listener caching mechanism (no reuse via activeListeners) - Fix race condition bug in connection logic using connectionPromise tracking - Always close Redis client in close() method (no conditional logic) - Use delete instead of setting to undefined for proper memory cleanup Benefits: - Complete request isolation - no shared state between concurrent requests - Eliminates memory leaks from accumulating undefined map entries - Prevents race condition where multiple concurrent connection attempts fail - Simpler code without global state management complexity - Better resource management with guaranteed cleanup External API unchanged - no breaking changes to consumers. Fixes #1893 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 43accf3 commit b16872b

File tree

1 file changed

+47
-66
lines changed

1 file changed

+47
-66
lines changed

react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts

Lines changed: 47 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -34,68 +34,49 @@ interface PendingPromise {
3434
resolved?: boolean;
3535
}
3636

37-
// Shared Redis client
38-
let sharedRedisClient: RedisClientType | null = null;
39-
let isClientConnected = false;
40-
41-
// Store active listeners by requestId
42-
const activeListeners: Record<string, RequestListener | undefined> = {};
43-
44-
// Store pending promises
45-
const pendingPromises: Record<string, PendingPromise | undefined> = {};
46-
47-
/**
48-
* Gets or creates the shared Redis client
49-
*/
50-
async function getRedisClient() {
51-
if (!sharedRedisClient) {
52-
const url = process.env.REDIS_URL || 'redis://localhost:6379';
53-
sharedRedisClient = createClient({ url });
54-
}
55-
56-
if (!isClientConnected) {
57-
await sharedRedisClient.connect();
58-
isClientConnected = true;
59-
}
60-
61-
return sharedRedisClient;
62-
}
63-
64-
/**
65-
* Closes the shared Redis client
66-
*/
67-
async function closeRedisClient() {
68-
if (sharedRedisClient && isClientConnected) {
69-
await sharedRedisClient.quit();
70-
isClientConnected = false;
71-
}
72-
}
73-
7437
/**
7538
* Listens to a Redis stream for data based on a requestId
7639
* @param requestId - The stream key to listen on
7740
* @returns An object with a getValue function to get values by key
7841
*/
7942
export function listenToRequestData(requestId: string): RequestListener {
80-
// If a listener for this requestId already exists, return it
81-
const existingListener = activeListeners[requestId];
82-
if (existingListener) {
83-
return existingListener;
84-
}
85-
43+
// Private state for THIS listener only - no global state
44+
const pendingPromises: Record<string, PendingPromise | undefined> = {};
8645
const receivedKeys: string[] = [];
87-
88-
// Stream key for this request
8946
const streamKey = `stream:${requestId}`;
90-
91-
// IDs of messages that need to be deleted
9247
const messagesToDelete: string[] = [];
93-
94-
// Track if this listener is active
9548
let isActive = true;
96-
// Track if we've received the end message
9749
let isEnded = false;
9850

51+
// Create dedicated Redis client for THIS listener
52+
const url = process.env.REDIS_URL || 'redis://localhost:6379';
53+
const redisClient: RedisClientType = createClient({ url });
54+
let isClientConnected = false;
55+
let connectionPromise: Promise<void> | null = null;
56+
57+
/**
58+
* Ensures the Redis client is connected
59+
* Prevents race condition where multiple concurrent calls try to connect
60+
*/
61+
async function ensureConnected(): Promise<RedisClientType> {
62+
// Fast path: already connected
63+
if (isClientConnected) {
64+
return redisClient;
65+
}
66+
67+
// Start connection if not already in progress
68+
if (!connectionPromise) {
69+
connectionPromise = redisClient.connect().then(() => {
70+
isClientConnected = true;
71+
connectionPromise = null; // Clear after successful connection
72+
});
73+
}
74+
75+
// Wait for connection to complete (handles concurrent calls)
76+
await connectionPromise;
77+
return redisClient;
78+
}
79+
9980
/**
10081
* Process a message from the Redis stream
10182
*/
@@ -154,7 +135,7 @@ export function listenToRequestData(requestId: string): RequestListener {
154135
}
155136

156137
try {
157-
const client = await getRedisClient();
138+
const client = await ensureConnected();
158139
await client.xDel(streamKey, messagesToDelete);
159140
messagesToDelete.length = 0; // Clear the array
160141
} catch (error) {
@@ -171,7 +152,7 @@ export function listenToRequestData(requestId: string): RequestListener {
171152
}
172153

173154
try {
174-
const client = await getRedisClient();
155+
const client = await ensureConnected();
175156

176157
// Read all messages from the beginning of the stream
177158
const results = (await client.xRead({ key: streamKey, id: '0' }, { COUNT: 100 })) as
@@ -203,7 +184,7 @@ export function listenToRequestData(requestId: string): RequestListener {
203184
}
204185

205186
try {
206-
const client = await getRedisClient();
187+
const client = await ensureConnected();
207188

208189
// Use $ as the ID to read only new messages
209190
let lastId = '$';
@@ -316,28 +297,28 @@ export function listenToRequestData(requestId: string): RequestListener {
316297
close: async () => {
317298
isActive = false;
318299

319-
// Delete this listener from active listeners
320-
activeListeners[requestId] = undefined;
321-
322-
// Reject any pending promises
300+
// Reject and cleanup all pending promises
323301
Object.entries(pendingPromises).forEach(([key, pendingPromise]) => {
324-
if (pendingPromise) {
302+
if (pendingPromise && !pendingPromise.resolved) {
325303
clearTimeout(pendingPromise.timer);
326304
pendingPromise.reject(new Error('Redis connection closed'));
327-
pendingPromises[key] = undefined;
328305
}
329306
});
330307

331-
// Only close the Redis client if no other listeners are active
332-
const hasActiveListeners = Object.values(activeListeners).some(Boolean);
333-
if (!hasActiveListeners) {
334-
await closeRedisClient();
308+
// Clear the pendingPromises map completely
309+
Object.keys(pendingPromises).forEach((key) => delete pendingPromises[key]);
310+
311+
// Always close THIS listener's Redis client
312+
try {
313+
if (isClientConnected) {
314+
await redisClient.quit();
315+
isClientConnected = false;
316+
}
317+
} catch (error) {
318+
console.error('Error closing Redis client:', error);
335319
}
336320
},
337321
};
338322

339-
// Store the listener in active listeners
340-
activeListeners[requestId] = listener;
341-
342323
return listener;
343324
}

0 commit comments

Comments
 (0)