Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5749): RTTPinger always sends legacy hello #3921

Merged
merged 11 commits into from
Nov 15, 2023
52 changes: 26 additions & 26 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRTTPinger = Symbol('rttPinger');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
Expand Down Expand Up @@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
Expand Down Expand Up @@ -198,8 +196,8 @@ function resetMonitorState(monitor: Monitor) {
monitor[kMonitorId]?.stop();
monitor[kMonitorId] = undefined;

monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

monitor[kCancellationToken].emit('cancel');

Expand Down Expand Up @@ -262,8 +260,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(
if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand All @@ -282,9 +280,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const rttPinger = monitor[kRTTPinger];
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

const awaited = isAwaitable && hello.topologyVersion != null;
monitor.emit(
Expand All @@ -301,8 +300,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
start = now();
} else {
monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
Expand Down Expand Up @@ -399,8 +398,7 @@ export interface RTTPingerOptions extends ConnectionOptions {

/** @internal */
export class RTTPinger {
/** @internal */
[kConnection]?: Connection;
connection?: Connection;
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
Expand All @@ -410,7 +408,7 @@ export class RTTPinger {
closed: boolean;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
this[kConnection] = undefined;
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
Expand All @@ -427,8 +425,8 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this[kConnection]?.destroy({ force: true });
this[kConnection] = undefined;
this.connection?.destroy({ force: true });
this.connection = undefined;
}
}

Expand All @@ -447,8 +445,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
Expand All @@ -458,11 +456,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
);
}

const connection = rttPinger[kConnection];
const connection = rttPinger.connection;
if (connection == null) {
connect(options, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand All @@ -473,15 +471,17 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
if (err) {
rttPinger[kConnection] = undefined;
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
connection.commandAsync(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy({ force: true });
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}

measureAndReschedule();
});
);
}

/**
Expand Down
25 changes: 10 additions & 15 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kMonitor = Symbol('monitor');

/** @internal */
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
MonitorOptions;
Expand Down Expand Up @@ -119,7 +116,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
serverApi?: ServerApi;
hello?: Document;
commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;
[kMonitor]: Monitor | null;
monitor: Monitor | null;

/** @event */
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
Expand Down Expand Up @@ -175,22 +172,20 @@ export class Server extends TypedEventEmitter<ServerEvents> {
});

if (this.loadBalanced) {
this[kMonitor] = null;
this.monitor = null;
// monitoring is disabled in load balancing mode
return;
}

// create the monitor
// TODO(NODE-4144): Remove new variable for type narrowing
const monitor = new Monitor(this, this.s.options);
this[kMonitor] = monitor;
this.monitor = new Monitor(this, this.s.options);

for (const event of HEARTBEAT_EVENTS) {
monitor.on(event, (e: any) => this.emit(event, e));
this.monitor.on(event, (e: any) => this.emit(event, e));
}

monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
Expand Down Expand Up @@ -246,7 +241,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// a load balancer. It never transitions out of this state and
// has no monitor.
if (!this.loadBalanced) {
this[kMonitor]?.connect();
this.monitor?.connect();
} else {
stateTransition(this, STATE_CONNECTED);
this.emit(Server.CONNECT, this);
Expand All @@ -272,7 +267,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
stateTransition(this, STATE_CLOSING);

if (!this.loadBalanced) {
this[kMonitor]?.close();
this.monitor?.close();
}

this.pool.close(options, err => {
Expand All @@ -290,7 +285,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
requestCheck(): void {
if (!this.loadBalanced) {
this[kMonitor]?.requestCheck();
this.monitor?.requestCheck();
}
}

Expand Down Expand Up @@ -465,7 +460,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
}

if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor]?.reset();
server.monitor?.reset();
}

server.emit(
Expand Down
109 changes: 109 additions & 0 deletions test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { expect } from 'chai';
import * as semver from 'semver';
import * as sinon from 'sinon';

import { type Connection, type MongoClient, type RTTPinger } from '../../mongodb';
import { sleep } from '../../tools/utils';

/**
* RTTPinger creation depends on getting a response to the monitor's initial hello
* and that hello containing a topologyVersion.
* Subsequently the rttPinger creates its connection asynchronously
*
* I just went with a sleepy loop, until we have what we need, One could also use SDAM events in a clever way perhaps?
dariakp marked this conversation as resolved.
Show resolved Hide resolved
*/
async function getRTTPingers(client: MongoClient) {
// eslint-disable-next-line no-constant-condition
while (true) {
const rttPingers = Array.from(client.topology?.s.servers.values() ?? [], s => {
if (s.monitor?.rttPinger?.connection != null) return s.monitor?.rttPinger;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
else null;
}).filter(rtt => rtt != null);

if (rttPingers.length !== 0) {
return rttPingers as (Omit<RTTPinger, 'connection'> & { connection: Connection })[];
}

await sleep(5);
}
}

describe('class RTTPinger', () => {
afterEach(() => sinon.restore());

beforeEach(async function () {
if (this.configuration.isLoadBalanced) {
if (this.currentTest)
dariakp marked this conversation as resolved.
Show resolved Hide resolved
this.currentTest.skipReason = 'No monitoring in LB mode, test not relevant';
return this.skip();
}
if (semver.gte('4.4.0', this.configuration.version)) {
if (this.currentTest)
this.currentTest.skipReason =
'Test requires streaming monitoring, needs to be on MongoDB 4.4+';
return this.skip();
}
});

context('when serverApi is enabled', () => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
let serverApiClient: MongoClient;
beforeEach(async function () {
if (semver.gte('5.0.0', this.configuration.version)) {
if (this.currentTest)
this.currentTest.skipReason = 'Test requires serverApi, needs to be on MongoDB 5.0+';
return this.skip();
}

serverApiClient = this.configuration.newClient(
{},
{ serverApi: { version: '1', strict: true }, heartbeatFrequencyMS: 10 }
);
});

afterEach(async () => {
await serverApiClient?.close();
});

it('measures rtt with a hello command', async function () {
await serverApiClient.connect();
const rttPingers = await getRTTPingers(serverApiClient);

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
}
});
});

context('when rtt hello receives an error', () => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
let client: MongoClient;
beforeEach(async function () {
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
});

afterEach(async () => {
await client?.close();
});

it('destroys the connection', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

for (const rtt of rttPingers) {
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any'));
}
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.called;
}
});
});
});
5 changes: 2 additions & 3 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,8 @@ describe('Topology (unit)', function () {
afterEach(() => {
// The srv event starts a monitor that we need to clean up
for (const [, server] of topology.s.servers) {
const kMonitor = getSymbolFrom(server, 'monitor');
const kMonitorId = getSymbolFrom(server[kMonitor], 'monitorId');
server[kMonitor][kMonitorId].stop();
const kMonitorId = getSymbolFrom(server.monitor, 'monitorId');
server.monitor[kMonitorId].stop();
}
});

Expand Down