From c44c225cb1ed321d4c6060b8dcdadd8d095e5630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Cloux?= Date: Tue, 28 Mar 2023 21:36:24 +0200 Subject: [PATCH 1/4] fix(NODE-5106): prevent multiple mongo client connect()s from leaking topology (#3596) Co-authored-by: Neal Beeken Co-authored-by: Bailey Pearson --- src/mongo_client.ts | 2 + .../node-specific/mongo_client.test.ts | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 06aeacf1f5..cd8d8abd79 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -343,6 +343,8 @@ export class MongoClient extends TypedEventEmitter { topology?: Topology; /** @internal */ readonly mongoLogger: MongoLogger; + /** @internal */ + private connectionLock?: Promise; /** * The consolidate, parsed, transformed and merged options. diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index e0e005a899..e70b0348a6 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -516,6 +516,65 @@ describe('class MongoClient', function () { ); }); + context('concurrent #connect()', () => { + let client: MongoClient; + let topologyOpenEvents; + + /** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */ + let clientConnectCounter: number; + + /** + * Wrap the connect method of the client to keep track + * of number of times connect is called + */ + async function clientConnect() { + if (!client) { + return; + } + clientConnectCounter++; + return client.connect(); + } + + beforeEach(async function () { + client = this.configuration.newClient(); + topologyOpenEvents = []; + clientConnectCounter = 0; + client.on('open', event => topologyOpenEvents.push(event)); + }); + + afterEach(async function () { + // close `clientConnectCounter` times + const clientClosePromises = Array.from({ length: clientConnectCounter }, () => + client.close() + ); + await Promise.all(clientClosePromises); + }); + + it('parallel client connect calls only create one topology', async function () { + await Promise.all([clientConnect(), clientConnect(), clientConnect()]); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + + it('when connect rejects lock is released regardless', async function () { + const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient); + internalConnectStub.onFirstCall().rejects(new Error('cannot connect')); + + // first call rejected to simulate a connection failure + const error = await clientConnect().catch(error => error); + expect(error).to.match(/cannot connect/); + + internalConnectStub.restore(); + + // second call should connect + await clientConnect(); + + expect(topologyOpenEvents).to.have.lengthOf(1); + expect(client.topology?.isConnected()).to.be.true; + }); + }); + context('#close()', () => { let client: MongoClient; let db: Db; From 5887f33dec75b6213919050140de000a5507250a Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 31 May 2023 18:38:33 -0600 Subject: [PATCH 2/4] add caching logic --- src/mongo_client.ts | 81 +++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index cd8d8abd79..890cba90d9 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -449,54 +449,65 @@ export class MongoClient extends TypedEventEmitter { } return maybeCallback(async () => { - if (this.topology && this.topology.isConnected()) { + try { + if (this.connectionLock) return await this.connectionLock; + + this.connectionLock = this._connect(); + await this.connectionLock; return this; + } finally { + this.connectionLock = undefined; } + }, callback); + } - const options = this[kOptions]; + private async _connect(): Promise { + if (this.topology && this.topology.isConnected()) { + return this; + } - if (typeof options.srvHost === 'string') { - const hosts = await resolveSRVRecord(options); + const options = this[kOptions]; - for (const [index, host] of hosts.entries()) { - options.hosts[index] = host; - } + if (typeof options.srvHost === 'string') { + const hosts = await resolveSRVRecord(options); + + for (const [index, host] of hosts.entries()) { + options.hosts[index] = host; } + } - const topology = new Topology(options.hosts, options); - // Events can be emitted before initialization is complete so we have to - // save the reference to the topology on the client ASAP if the event handlers need to access it - this.topology = topology; - topology.client = this; + const topology = new Topology(options.hosts, options); + // Events can be emitted before initialization is complete so we have to + // save the reference to the topology on the client ASAP if the event handlers need to access it + this.topology = topology; + topology.client = this; - topology.once(Topology.OPEN, () => this.emit('open', this)); + topology.once(Topology.OPEN, () => this.emit('open', this)); - for (const event of MONGO_CLIENT_EVENTS) { - topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any))); - } + for (const event of MONGO_CLIENT_EVENTS) { + topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any))); + } - const topologyConnect = async () => { - try { - await promisify(callback => topology.connect(options, callback))(); - } catch (error) { - topology.close({ force: true }); - throw error; - } - }; - - if (this.autoEncrypter) { - const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback)); - await initAutoEncrypter(); - await topologyConnect(); - await options.encrypter.connectInternalClient(); - } else { - await topologyConnect(); + const topologyConnect = async () => { + try { + await promisify(callback => topology.connect(options, callback))(); + } catch (error) { + topology.close({ force: true }); + throw error; } + }; - return this; - }, callback); - } + if (this.autoEncrypter) { + const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback)); + await initAutoEncrypter(); + await topologyConnect(); + await options.encrypter.connectInternalClient(); + } else { + await topologyConnect(); + } + return this; + } /** * Close the db and its underlying connections * From 473f3101b5b841c798cafb751a66d4fe08acc208 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 31 May 2023 18:49:57 -0600 Subject: [PATCH 3/4] consolidate logic --- src/mongo_client.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 890cba90d9..78eb626b30 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -450,9 +450,7 @@ export class MongoClient extends TypedEventEmitter { return maybeCallback(async () => { try { - if (this.connectionLock) return await this.connectionLock; - - this.connectionLock = this._connect(); + this.connectionLock = this.connectionLock ?? this._connect(); await this.connectionLock; return this; } finally { From 98ae1427b5de80ed698851e32a9ce4d68988f288 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 2 Jun 2023 12:28:41 -0600 Subject: [PATCH 4/4] align code with 5.x --- src/mongo_client.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 78eb626b30..12a41c7ab1 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -449,8 +449,11 @@ export class MongoClient extends TypedEventEmitter { } return maybeCallback(async () => { + if (this.connectionLock) { + return this.connectionLock; + } try { - this.connectionLock = this.connectionLock ?? this._connect(); + this.connectionLock = this._connect(); await this.connectionLock; return this; } finally {