Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5749): RTTPinger always sends legacy hello #3921

Merged
merged 11 commits into from
Nov 15, 2023
52 changes: 26 additions & 26 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRTTPinger = Symbol('rttPinger');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
Expand Down Expand Up @@ -100,7 +98,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
Expand Down Expand Up @@ -219,8 +217,8 @@ function resetMonitorState(monitor: Monitor) {
monitor[kMonitorId]?.stop();
monitor[kMonitorId] = undefined;

monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

monitor[kCancellationToken].emit('cancel');

Expand Down Expand Up @@ -294,8 +292,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(
if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand All @@ -314,9 +312,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const rttPinger = monitor[kRTTPinger];
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
Expand All @@ -332,8 +331,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
start = now();
} else {
monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
Expand Down Expand Up @@ -430,8 +429,7 @@ export interface RTTPingerOptions extends ConnectionOptions {

/** @internal */
export class RTTPinger {
/** @internal */
[kConnection]?: Connection;
connection?: Connection;
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
Expand All @@ -441,7 +439,7 @@ export class RTTPinger {
closed: boolean;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
this[kConnection] = undefined;
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
Expand All @@ -458,8 +456,8 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this[kConnection]?.destroy({ force: true });
this[kConnection] = undefined;
this.connection?.destroy({ force: true });
this.connection = undefined;
}
}

Expand All @@ -478,8 +476,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
Expand All @@ -489,11 +487,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
);
}

const connection = rttPinger[kConnection];
const connection = rttPinger.connection;
if (connection == null) {
connect(options, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand All @@ -504,15 +502,17 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
if (err) {
rttPinger[kConnection] = undefined;
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
connection.commandAsync(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy({ force: true });
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}

measureAndReschedule();
});
);
}

/**
Expand Down
25 changes: 10 additions & 15 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kMonitor = Symbol('monitor');

/** @internal */
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
MonitorOptions;
Expand Down Expand Up @@ -119,7 +116,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
serverApi?: ServerApi;
hello?: Document;
commandAsync: (ns: MongoDBNamespace, cmd: Document, options: CommandOptions) => Promise<Document>;
[kMonitor]: Monitor | null;
monitor: Monitor | null;

/** @event */
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
Expand Down Expand Up @@ -175,22 +172,20 @@ export class Server extends TypedEventEmitter<ServerEvents> {
});

if (this.loadBalanced) {
this[kMonitor] = null;
this.monitor = null;
// monitoring is disabled in load balancing mode
return;
}

// create the monitor
// TODO(NODE-4144): Remove new variable for type narrowing
const monitor = new Monitor(this, this.s.options);
this[kMonitor] = monitor;
this.monitor = new Monitor(this, this.s.options);

for (const event of HEARTBEAT_EVENTS) {
monitor.on(event, (e: any) => this.emit(event, e));
this.monitor.on(event, (e: any) => this.emit(event, e));
}

monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
Expand Down Expand Up @@ -246,7 +241,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// a load balancer. It never transitions out of this state and
// has no monitor.
if (!this.loadBalanced) {
this[kMonitor]?.connect();
this.monitor?.connect();
} else {
stateTransition(this, STATE_CONNECTED);
this.emit(Server.CONNECT, this);
Expand All @@ -272,7 +267,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
stateTransition(this, STATE_CLOSING);

if (!this.loadBalanced) {
this[kMonitor]?.close();
this.monitor?.close();
}

this.pool.close(options, err => {
Expand All @@ -290,7 +285,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
requestCheck(): void {
if (!this.loadBalanced) {
this[kMonitor]?.requestCheck();
this.monitor?.requestCheck();
}
}

Expand Down Expand Up @@ -465,7 +460,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
}

if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor]?.reset();
server.monitor?.reset();
}

server.emit(
Expand Down
Loading