Skip to content

Commit

Permalink
fix(core): possible memory leak when using server side events
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengjitf committed Apr 16, 2024
1 parent af87b2a commit f797e16
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
7 changes: 5 additions & 2 deletions packages/core/router/router-response-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
import { catchError, debounce, map } from 'rxjs/operators';
import { catchError, concatMap, map } from 'rxjs/operators';
import {
AdditionalHeaders,
WritableHeaderStream,
Expand Down Expand Up @@ -128,7 +128,7 @@ export class RouterResponseController {

return { data: message as object | string };
}),
debounce(
concatMap(
message =>
new Promise<void>(resolve =>
stream.writeMessage(message, () => resolve()),
Expand All @@ -153,6 +153,9 @@ export class RouterResponseController {

request.on('close', () => {
subscription.unsubscribe();
if (!stream.writableEnded) {
stream.end();
}
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/router/sse-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class SseStream extends Transform {
message.id = this.lastEventId.toString();
}

if (!this.write(message, 'utf-8', cb)) {
if (!this.write(message, 'utf-8')) {
this.once('drain', cb);
} else {
process.nextTick(cb);
Expand Down
4 changes: 1 addition & 3 deletions packages/core/test/router/router-response-controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,7 @@ data: test

await new Promise(resolve => process.nextTick(resolve));

expect(() => {
expect(maxDrainListenersExceededWarning).to.equal(null);
}, 'it will fail as there is an issue here to be addressed').to.throw();
expect(maxDrainListenersExceededWarning).to.equal(null);
});
});

Expand Down

0 comments on commit f797e16

Please sign in to comment.