From 69de2537314fe25a5c3fa83f73235cfa7e7f729d Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 7 Mar 2024 12:53:52 -0500 Subject: [PATCH] fix(NODE-5993): memory leak in the `Connection` class (#4022) --- src/cmap/connection.ts | 68 ++++++--------- src/cmap/wire_protocol/on_data.ts | 18 +--- src/utils.ts | 25 ++++++ .../connection.test.ts | 84 +++++++++++++++++++ .../node-specific/resource_clean_up.test.ts | 33 ++++++++ 5 files changed, 169 insertions(+), 59 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index f0e373a825..e4036bb187 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,6 +1,5 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; -import { promisify } from 'util'; import type { BSONSerializeOptions, Document, ObjectId } from '../bson'; import type { AutoEncrypter } from '../client-side-encryption/auto_encrypter'; @@ -37,7 +36,7 @@ import { maxWireVersion, type MongoDBNamespace, now, - promiseWithResolvers, + once, uuidV4 } from '../utils'; import type { WriteConcern } from '../write_concern'; @@ -182,18 +181,18 @@ export class Connection extends TypedEventEmitter { * Once connection is established, command logging can log events (if enabled) */ public established: boolean; + /** Indicates that the connection (including underlying TCP socket) has been closed. */ + public closed = false; private lastUseTime: number; private clusterTime: Document | null = null; + private error: Error | null = null; + private dataEvents: AsyncGenerator | null = null; private readonly socketTimeoutMS: number; private readonly monitorCommands: boolean; private readonly socket: Stream; - private readonly controller: AbortController; - private readonly signal: AbortSignal; private readonly messageStream: Readable; - private readonly socketWrite: (buffer: Uint8Array) => Promise; - private readonly aborted: Promise; /** @event */ static readonly COMMAND_STARTED = COMMAND_STARTED; @@ -213,6 +212,7 @@ export class Connection extends TypedEventEmitter { constructor(stream: Stream, options: ConnectionOptions) { super(); + this.socket = stream; this.id = options.id; this.address = streamIdentifier(stream, options); this.socketTimeoutMS = options.socketTimeoutMS ?? 0; @@ -225,39 +225,12 @@ export class Connection extends TypedEventEmitter { this.generation = options.generation; this.lastUseTime = now(); - this.socket = stream; - - // TODO: Remove signal from connection layer - this.controller = new AbortController(); - const { signal } = this.controller; - this.signal = signal; - const { promise: aborted, reject } = promiseWithResolvers(); - aborted.then(undefined, () => null); // Prevent unhandled rejection - this.signal.addEventListener( - 'abort', - function onAbort() { - reject(signal.reason); - }, - { once: true } - ); - this.aborted = aborted; - this.messageStream = this.socket .on('error', this.onError.bind(this)) .pipe(new SizedMessageTransform({ connection: this })) .on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); - - const socketWrite = promisify(this.socket.write.bind(this.socket)); - this.socketWrite = async buffer => { - return Promise.race([socketWrite(buffer), this.aborted]); - }; - } - - /** Indicates that the connection (including underlying TCP socket) has been closed. */ - public get closed(): boolean { - return this.signal.aborted; } public get hello() { @@ -308,7 +281,7 @@ export class Connection extends TypedEventEmitter { this.lastUseTime = now(); } - public onError(error?: Error) { + public onError(error: Error) { this.cleanup(error); } @@ -351,13 +324,15 @@ export class Connection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(error?: Error): void { + private cleanup(error: Error): void { if (this.closed) { return; } this.socket.destroy(); - this.controller.abort(error); + this.error = error; + this.dataEvents?.throw(error).then(undefined, () => null); // squash unhandled rejection + this.closed = true; this.emit(Connection.CLOSE); } @@ -598,7 +573,7 @@ export class Connection extends TypedEventEmitter { } private throwIfAborted() { - this.signal.throwIfAborted(); + if (this.error) throw this.error; } /** @@ -621,7 +596,8 @@ export class Connection extends TypedEventEmitter { const buffer = Buffer.concat(await finalCommand.toBin()); - return this.socketWrite(buffer); + if (this.socket.write(buffer)) return; + return once(this.socket, 'drain'); } /** @@ -634,13 +610,19 @@ export class Connection extends TypedEventEmitter { * Note that `for-await` loops call `return` automatically when the loop is exited. */ private async *readMany(): AsyncGenerator { - for await (const message of onData(this.messageStream, { signal: this.signal })) { - const response = await decompressResponse(message); - yield response; + try { + this.dataEvents = onData(this.messageStream); + for await (const message of this.dataEvents) { + const response = await decompressResponse(message); + yield response; - if (!response.moreToCome) { - return; + if (!response.moreToCome) { + return; + } } + } finally { + this.dataEvents = null; + this.throwIfAborted(); } } } diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 04c82f709d..b99c950d96 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -16,11 +16,9 @@ type PendingPromises = Omit< * https://nodejs.org/api/events.html#eventsonemitter-eventname-options * * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. - * It will reject upon an error event or if the provided signal is aborted. + * It will reject upon an error event. */ -export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { - const signal = options.signal; - +export function onData(emitter: EventEmitter) { // Setup pending events and pending promise lists /** * When the caller has not yet called .next(), we store the @@ -89,19 +87,8 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) emitter.on('data', eventHandler); emitter.on('error', errorHandler); - if (signal.aborted) { - // If the signal is aborted, set up the first .next() call to be a rejection - queueMicrotask(abortListener); - } else { - signal.addEventListener('abort', abortListener, { once: true }); - } - return iterator; - function abortListener() { - errorHandler(signal.reason); - } - function eventHandler(value: Buffer) { const promise = unconsumedPromises.shift(); if (promise != null) promise.resolve({ value, done: false }); @@ -119,7 +106,6 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) // Adding event handlers emitter.off('data', eventHandler); emitter.off('error', errorHandler); - signal.removeEventListener('abort', abortListener); finished = true; const doneResult = { value: undefined, done: finished } as const; diff --git a/src/utils.ts b/src/utils.ts index 8020d508f8..4a6f7a4e8c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import * as crypto from 'crypto'; import type { SrvRecord } from 'dns'; +import { type EventEmitter } from 'events'; import * as http from 'http'; import { clearTimeout, setTimeout } from 'timers'; import * as url from 'url'; @@ -1295,3 +1296,27 @@ export function promiseWithResolvers() { } export const randomBytes = promisify(crypto.randomBytes); + +/** + * Replicates the events.once helper. + * + * Removes unused signal logic and It **only** supports 0 or 1 argument events. + * + * @param ee - An event emitter that may emit `ev` + * @param name - An event name to wait for + */ +export async function once(ee: EventEmitter, name: string): Promise { + const { promise, resolve, reject } = promiseWithResolvers(); + const onEvent = (data: T) => resolve(data); + const onError = (error: Error) => reject(error); + + ee.once(name, onEvent).once('error', onError); + try { + const res = await promise; + ee.off('error', onError); + return res; + } catch (error) { + ee.off(name, onEvent); + throw error; + } +} diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index de1e455c66..421a9e02bb 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -1,7 +1,11 @@ import { expect } from 'chai'; +import { type EventEmitter, once } from 'events'; +import * as sinon from 'sinon'; +import { setTimeout } from 'timers'; import { addContainerMetadata, + Binary, connect, Connection, type ConnectionOptions, @@ -15,7 +19,9 @@ import { ServerHeartbeatStartedEvent, Topology } from '../../mongodb'; +import * as mock from '../../tools/mongodb-mock/index'; import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration'; +import { getSymbolFrom, sleep } from '../../tools/utils'; import { assert as test, setupDatabase } from '../shared'; const commonConnectOptions = { @@ -200,6 +206,84 @@ describe('Connection', function () { client.connect(); }); + context( + 'when a large message is written to the socket', + { requires: { topology: 'single', auth: 'disabled' } }, + () => { + let client, mockServer: import('../../tools/mongodb-mock/src/server').MockServer; + + beforeEach(async function () { + mockServer = await mock.createServer(); + + mockServer + .addMessageHandler('insert', req => { + setTimeout(() => { + req.reply({ ok: 1 }); + }, 800); + }) + .addMessageHandler('hello', req => { + req.reply(Object.assign({}, mock.HELLO)); + }) + .addMessageHandler(LEGACY_HELLO_COMMAND, req => { + req.reply(Object.assign({}, mock.HELLO)); + }); + + client = new MongoClient(`mongodb://${mockServer.uri()}`, { + minPoolSize: 1, + maxPoolSize: 1 + }); + }); + + afterEach(async function () { + await client.close(); + mockServer.destroy(); + sinon.restore(); + }); + + it('waits for an async drain event because the write was buffered', async () => { + const connectionReady = once(client, 'connectionReady'); + await client.connect(); + await connectionReady; + + // Get the only connection + const pool = [...client.topology.s.servers.values()][0].pool; + + const connections = pool[getSymbolFrom(pool, 'connections')]; + expect(connections).to.have.lengthOf(1); + + const connection = connections.first(); + const socket: EventEmitter = connection.socket; + + // Spy on the socket event listeners + const addedListeners: string[] = []; + const removedListeners: string[] = []; + socket + .on('removeListener', name => removedListeners.push(name)) + .on('newListener', name => addedListeners.push(name)); + + // Make server sockets block + for (const s of mockServer.sockets) s.pause(); + + const insert = client + .db('test') + .collection('test') + // Anything above 16Kb should work I think (10mb to be extra sure) + .insertOne({ a: new Binary(Buffer.alloc(10 * (2 ** 10) ** 2), 127) }); + + // Sleep a bit and unblock server sockets + await sleep(10); + for (const s of mockServer.sockets) s.resume(); + + // Let the operation finish + await insert; + + // Ensure that we used the drain event for this write + expect(addedListeners).to.deep.equal(['drain', 'error']); + expect(removedListeners).to.deep.equal(['drain', 'error']); + }); + } + ); + context('when connecting with a username and password', () => { let utilClient: MongoClient; let client: MongoClient; diff --git a/test/integration/node-specific/resource_clean_up.test.ts b/test/integration/node-specific/resource_clean_up.test.ts index 0d330914c0..e370986a26 100644 --- a/test/integration/node-specific/resource_clean_up.test.ts +++ b/test/integration/node-specific/resource_clean_up.test.ts @@ -1,5 +1,8 @@ +import * as v8 from 'node:v8'; + import { expect } from 'chai'; +import { sleep } from '../../tools/utils'; import { runScript } from './resource_tracking_script_builder'; /** @@ -86,4 +89,34 @@ describe('Driver Resources', () => { }); }); }); + + context('when 100s of operations are executed and complete', () => { + beforeEach(function () { + if (this.currentTest && typeof v8.queryObjects !== 'function') { + this.currentTest.skipReason = 'Test requires v8.queryObjects API to count Promises'; + this.currentTest?.skip(); + } + }); + + let client; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); + }); + + it('does not leave behind additional promises', async () => { + const test = client.db('test').collection('test'); + const promiseCountBefore = v8.queryObjects(Promise, { format: 'count' }); + for (let i = 0; i < 100; i++) { + await test.findOne(); + } + await sleep(10); + const promiseCountAfter = v8.queryObjects(Promise, { format: 'count' }); + + expect(promiseCountAfter).to.be.within(promiseCountBefore - 5, promiseCountBefore + 5); + }); + }); });