Skip to content

Commit

Permalink
fix(NODE-5316): prevent parallel topology creation in MongoClient.con…
Browse files Browse the repository at this point in the history
…nect (#3696)

Co-authored-by: Clément Cloux <clement.cloux@ynov.com>
Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
  • Loading branch information
3 people committed Jun 2, 2023
1 parent 261199f commit e13038d
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 35 deletions.
84 changes: 49 additions & 35 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
topology?: Topology;
/** @internal */
readonly mongoLogger: MongoLogger;
/** @internal */
private connectionLock?: Promise<this>;

/**
* The consolidate, parsed, transformed and merged options.
Expand Down Expand Up @@ -447,54 +449,66 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}

return maybeCallback(async () => {
if (this.topology && this.topology.isConnected()) {
if (this.connectionLock) {
return this.connectionLock;
}
try {
this.connectionLock = this._connect();
await this.connectionLock;
return this;
} finally {
this.connectionLock = undefined;
}
}, callback);
}

const options = this[kOptions];
private async _connect(): Promise<this> {
if (this.topology && this.topology.isConnected()) {
return this;
}

if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);
const options = this[kOptions];

for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
if (typeof options.srvHost === 'string') {
const hosts = await resolveSRVRecord(options);

for (const [index, host] of hosts.entries()) {
options.hosts[index] = host;
}
}

const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;
const topology = new Topology(options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;

topology.once(Topology.OPEN, () => this.emit('open', this));
topology.once(Topology.OPEN, () => this.emit('open', this));

for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
}
for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
throw error;
}
};

return this;
}, callback);
}
if (this.autoEncrypter) {
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
await initAutoEncrypter();
await topologyConnect();
await options.encrypter.connectInternalClient();
} else {
await topologyConnect();
}

return this;
}
/**
* Close the db and its underlying connections
*
Expand Down
59 changes: 59 additions & 0 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,65 @@ describe('class MongoClient', function () {
);
});

context('concurrent #connect()', () => {
let client: MongoClient;
let topologyOpenEvents;

/** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */
let clientConnectCounter: number;

/**
* Wrap the connect method of the client to keep track
* of number of times connect is called
*/
async function clientConnect() {
if (!client) {
return;
}
clientConnectCounter++;
return client.connect();
}

beforeEach(async function () {
client = this.configuration.newClient();
topologyOpenEvents = [];
clientConnectCounter = 0;
client.on('open', event => topologyOpenEvents.push(event));
});

afterEach(async function () {
// close `clientConnectCounter` times
const clientClosePromises = Array.from({ length: clientConnectCounter }, () =>
client.close()
);
await Promise.all(clientClosePromises);
});

it('parallel client connect calls only create one topology', async function () {
await Promise.all([clientConnect(), clientConnect(), clientConnect()]);

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});

it('when connect rejects lock is released regardless', async function () {
const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient);
internalConnectStub.onFirstCall().rejects(new Error('cannot connect'));

// first call rejected to simulate a connection failure
const error = await clientConnect().catch(error => error);
expect(error).to.match(/cannot connect/);

internalConnectStub.restore();

// second call should connect
await clientConnect();

expect(topologyOpenEvents).to.have.lengthOf(1);
expect(client.topology?.isConnected()).to.be.true;
});
});

context('#close()', () => {
let client: MongoClient;
let db: Db;
Expand Down

0 comments on commit e13038d

Please sign in to comment.