Skip to content

Commit

Permalink
feat(NODE-5613): add awaited field to SDAM heartbeat events (#3895)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Oct 17, 2023
1 parent db90293 commit b50aadc
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
15 changes: 12 additions & 3 deletions src/sdam/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ export class TopologyClosedEvent {
export class ServerHeartbeatStartedEvent {
/** The connection id for the command */
connectionId: string;
/** Is true when using the streaming protocol. */
awaited: boolean;

/** @internal */
constructor(connectionId: string) {
constructor(connectionId: string, awaited: boolean) {
this.connectionId = connectionId;
this.awaited = awaited;
}
}

Expand All @@ -151,12 +154,15 @@ export class ServerHeartbeatSucceededEvent {
duration: number;
/** The command reply */
reply: Document;
/** Is true when using the streaming protocol. */
awaited: boolean;

/** @internal */
constructor(connectionId: string, duration: number, reply: Document | null) {
constructor(connectionId: string, duration: number, reply: Document | null, awaited: boolean) {
this.connectionId = connectionId;
this.duration = duration;
this.reply = reply ?? {};
this.awaited = awaited;
}
}

Expand All @@ -172,11 +178,14 @@ export class ServerHeartbeatFailedEvent {
duration: number;
/** The command failure */
failure: Error;
/** Is true when using the streaming protocol. */
awaited: boolean;

/** @internal */
constructor(connectionId: string, duration: number, failure: Error) {
constructor(connectionId: string, duration: number, failure: Error, awaited: boolean) {
this.connectionId = connectionId;
this.duration = duration;
this.failure = failure;
this.awaited = awaited;
}
}
30 changes: 22 additions & 8 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,25 @@ function resetMonitorState(monitor: Monitor) {

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
monitor.emit(Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address));
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err)
new ServerHeartbeatFailedEvent(
monitor.address,
calculateDurationInMs(start),
err,
isAwaitable
)
);

const error = !(err instanceof MongoError)
Expand All @@ -237,8 +247,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const { serverApi, helloOk } = connection;
const connectTimeoutMS = monitor.options.connectTimeoutMS;
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;

const cmd = {
[serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
Expand Down Expand Up @@ -278,17 +286,18 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

const awaited = isAwaitable && hello.topologyVersion != null;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello)
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
);

// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (isAwaitable && hello.topologyVersion) {
if (awaited) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address)
new ServerHeartbeatStartedEvent(monitor.address, true)
);
start = now();
} else {
Expand Down Expand Up @@ -324,7 +333,12 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
monitor[kConnection] = conn;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, calculateDurationInMs(start), conn.hello)
new ServerHeartbeatSucceededEvent(
monitor.address,
calculateDurationInMs(start),
conn.hello,
false
)
);

callback(undefined, conn.hello);
Expand Down

0 comments on commit b50aadc

Please sign in to comment.