Skip to content

Commit 10f1644

Browse files
Fix memory leaks and race conditions in redisReceiver
- Use delete instead of undefined to properly remove map entries - Clean up resolved promises to prevent indefinite accumulation - Delete timed-out promises to avoid memory leaks - Add isActive check in getValue to prevent promises after close - Track lastProcessedId to prevent message loss with >100 messages All changes maintain existing behavior while fixing memory leaks and race conditions identified in code review. Co-authored-by: Abanoub Ghadban <AbanoubGhadban@users.noreply.github.com>
1 parent dfc8551 commit 10f1644

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export function listenToRequestData(requestId: string): RequestListener {
4848
let isActive = true;
4949
let isEnded = false;
5050
let initializationError: Error | null = null;
51+
let lastProcessedId = '0'; // Track last processed message ID
5152

5253
// Create dedicated Redis client for THIS listener
5354
const url = process.env.REDIS_URL || 'redis://localhost:6379';
@@ -100,7 +101,8 @@ export function listenToRequestData(requestId: string): RequestListener {
100101
if (pendingPromise && !pendingPromise.resolved) {
101102
clearTimeout(pendingPromise.timer);
102103
pendingPromise.reject(new Error(`Key ${key} not found before stream ended`));
103-
pendingPromises[key] = undefined;
104+
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
105+
delete pendingPromises[key];
104106
}
105107
});
106108

@@ -121,7 +123,10 @@ export function listenToRequestData(requestId: string): RequestListener {
121123
clearTimeout(pendingPromise.timer);
122124
pendingPromise.resolve(parsedValue);
123125
pendingPromise.resolved = true; // Mark as resolved
126+
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
127+
delete pendingPromises[normalizedKey];
124128
} else {
129+
// Value arrived before getValue was called - store for immediate resolution
125130
pendingPromises[normalizedKey] = {
126131
promise: Promise.resolve(parsedValue),
127132
resolve: () => {},
@@ -171,6 +176,7 @@ export function listenToRequestData(requestId: string): RequestListener {
171176

172177
// Process each message
173178
for (const { id, message } of messages) {
179+
lastProcessedId = id; // Track last processed ID
174180
processMessage(message, id);
175181
}
176182

@@ -193,8 +199,8 @@ export function listenToRequestData(requestId: string): RequestListener {
193199
try {
194200
const client = await ensureConnected();
195201

196-
// Use $ as the ID to read only new messages
197-
let lastId = '$';
202+
// Start from last processed message, or $ for new messages if no messages were processed yet
203+
let lastId = lastProcessedId === '0' ? '$' : lastProcessedId;
198204

199205
// Start reading from the stream
200206
const readStream = async () => {
@@ -241,6 +247,11 @@ export function listenToRequestData(requestId: string): RequestListener {
241247
* @returns A promise that resolves when the key is found
242248
*/
243249
getValue: async (key: string) => {
250+
// If listener is closed, reject immediately
251+
if (!isActive) {
252+
return Promise.reject(new Error('Redis listener has been closed'));
253+
}
254+
244255
// If initialization failed, reject immediately with the initialization error
245256
if (initializationError) {
246257
return Promise.reject(
@@ -275,7 +286,8 @@ export function listenToRequestData(requestId: string): RequestListener {
275286
pendingPromise.reject(
276287
new Error(`Timeout waiting for key: ${key}, available keys: ${receivedKeys.join(', ')}`),
277288
);
278-
// Keep the pending promise in the dictionary with the error state
289+
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
290+
delete pendingPromises[key];
279291
}
280292
}, REDIS_READ_TIMEOUT);
281293

0 commit comments

Comments
 (0)