From b50aadca7e5e471283ebaccff5b98c099c57fa55 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 17 Oct 2023 23:28:23 +0200 Subject: [PATCH] feat(NODE-5613): add `awaited` field to SDAM heartbeat events (#3895) --- src/sdam/events.ts | 15 ++++++++++++--- src/sdam/monitor.ts | 30 ++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/sdam/events.ts b/src/sdam/events.ts index c55eb09575..4c8a1c1312 100644 --- a/src/sdam/events.ts +++ b/src/sdam/events.ts @@ -132,10 +132,13 @@ export class TopologyClosedEvent { export class ServerHeartbeatStartedEvent { /** The connection id for the command */ connectionId: string; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string) { + constructor(connectionId: string, awaited: boolean) { this.connectionId = connectionId; + this.awaited = awaited; } } @@ -151,12 +154,15 @@ export class ServerHeartbeatSucceededEvent { duration: number; /** The command reply */ reply: Document; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string, duration: number, reply: Document | null) { + constructor(connectionId: string, duration: number, reply: Document | null, awaited: boolean) { this.connectionId = connectionId; this.duration = duration; this.reply = reply ?? {}; + this.awaited = awaited; } } @@ -172,11 +178,14 @@ export class ServerHeartbeatFailedEvent { duration: number; /** The command failure */ failure: Error; + /** Is true when using the streaming protocol. */ + awaited: boolean; /** @internal */ - constructor(connectionId: string, duration: number, failure: Error) { + constructor(connectionId: string, duration: number, failure: Error, awaited: boolean) { this.connectionId = connectionId; this.duration = duration; this.failure = failure; + this.awaited = awaited; } } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index bd5702b4af..1e510d0dea 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -209,7 +209,12 @@ function resetMonitorState(monitor: Monitor) { function checkServer(monitor: Monitor, callback: Callback) { let start = now(); - monitor.emit(Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address)); + const topologyVersion = monitor[kServer].description.topologyVersion; + const isAwaitable = topologyVersion != null; + monitor.emit( + Server.SERVER_HEARTBEAT_STARTED, + new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) + ); function failureHandler(err: Error) { monitor[kConnection]?.destroy({ force: true }); @@ -217,7 +222,12 @@ function checkServer(monitor: Monitor, callback: Callback) { monitor.emit( Server.SERVER_HEARTBEAT_FAILED, - new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err) + new ServerHeartbeatFailedEvent( + monitor.address, + calculateDurationInMs(start), + err, + isAwaitable + ) ); const error = !(err instanceof MongoError) @@ -237,8 +247,6 @@ function checkServer(monitor: Monitor, callback: Callback) { const { serverApi, helloOk } = connection; const connectTimeoutMS = monitor.options.connectTimeoutMS; const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; - const topologyVersion = monitor[kServer].description.topologyVersion; - const isAwaitable = topologyVersion != null; const cmd = { [serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1, @@ -278,17 +286,18 @@ function checkServer(monitor: Monitor, callback: Callback) { const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start); + const awaited = isAwaitable && hello.topologyVersion != null; monitor.emit( Server.SERVER_HEARTBEAT_SUCCEEDED, - new ServerHeartbeatSucceededEvent(monitor.address, duration, hello) + new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited) ); // if we are using the streaming protocol then we immediately issue another `started` // event, otherwise the "check" is complete and return to the main monitor loop - if (isAwaitable && hello.topologyVersion) { + if (awaited) { monitor.emit( Server.SERVER_HEARTBEAT_STARTED, - new ServerHeartbeatStartedEvent(monitor.address) + new ServerHeartbeatStartedEvent(monitor.address, true) ); start = now(); } else { @@ -324,7 +333,12 @@ function checkServer(monitor: Monitor, callback: Callback) { monitor[kConnection] = conn; monitor.emit( Server.SERVER_HEARTBEAT_SUCCEEDED, - new ServerHeartbeatSucceededEvent(monitor.address, calculateDurationInMs(start), conn.hello) + new ServerHeartbeatSucceededEvent( + monitor.address, + calculateDurationInMs(start), + conn.hello, + false + ) ); callback(undefined, conn.hello);