@@ -8,8 +8,14 @@ import { LEGACY_HELLO_COMMAND } from '../constants';
88import { MongoError , MongoErrorLabel , MongoNetworkTimeoutError } from '../error' ;
99import { MongoLoggableComponent } from '../mongo_logger' ;
1010import { CancellationToken , TypedEventEmitter } from '../mongo_types' ;
11- import type { Callback , EventEmitterWithState } from '../utils' ;
12- import { calculateDurationInMs , makeStateMachine , now , ns } from '../utils' ;
11+ import {
12+ calculateDurationInMs ,
13+ type Callback ,
14+ type EventEmitterWithState ,
15+ makeStateMachine ,
16+ now ,
17+ ns
18+ } from '../utils' ;
1319import { ServerType , STATE_CLOSED , STATE_CLOSING } from './common' ;
1420import {
1521 ServerHeartbeatFailedEvent ,
@@ -25,8 +31,6 @@ const kServer = Symbol('server');
2531const kMonitorId = Symbol ( 'monitorId' ) ;
2632/** @internal */
2733const kCancellationToken = Symbol ( 'cancellationToken' ) ;
28- /** @internal */
29- const kRoundTripTime = Symbol ( 'roundTripTime' ) ;
3034
3135const STATE_IDLE = 'idle' ;
3236const STATE_MONITORING = 'monitoring' ;
@@ -100,6 +104,8 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
100104 rttPinger ?: RTTPinger ;
101105 /** @internal */
102106 override component = MongoLoggableComponent . TOPOLOGY ;
107+ /** @internal */
108+ private rttSampler : RTTSampler ;
103109
104110 constructor ( server : Server , options : MonitorOptions ) {
105111 super ( ) ;
@@ -121,6 +127,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
121127 } ) ;
122128 this . isRunningInFaasEnv = getFAASEnv ( ) != null ;
123129 this . mongoLogger = this [ kServer ] . topology . client ?. mongoLogger ;
130+ this . rttSampler = new RTTSampler ( 10 ) ;
124131
125132 const cancellationToken = this [ kCancellationToken ] ;
126133 // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
@@ -203,6 +210,26 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
203210 this . emit ( 'close' ) ;
204211 stateTransition ( this , STATE_CLOSED ) ;
205212 }
213+
214+ get roundTripTime ( ) : number {
215+ return this . rttSampler . average ( ) ;
216+ }
217+
218+ get minRoundTripTime ( ) : number {
219+ return this . rttSampler . min ( ) ;
220+ }
221+
222+ get latestRtt ( ) : number {
223+ return this . rttSampler . last ?? 0 ; // FIXME: Check if this is acceptable
224+ }
225+
226+ addRttSample ( rtt : number ) {
227+ this . rttSampler . addSample ( rtt ) ;
228+ }
229+
230+ clearRttSamples ( ) {
231+ this . rttSampler . clear ( ) ;
232+ }
206233}
207234
208235function resetMonitorState ( monitor : Monitor ) {
@@ -216,6 +243,8 @@ function resetMonitorState(monitor: Monitor) {
216243
217244 monitor . connection ?. destroy ( ) ;
218245 monitor . connection = null ;
246+
247+ monitor . clearRttSamples ( ) ;
219248}
220249
221250function useStreamingProtocol ( monitor : Monitor , topologyVersion : TopologyVersion | null ) : boolean {
@@ -249,7 +278,6 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
249278 function onHeartbeatFailed ( err : Error ) {
250279 monitor . connection ?. destroy ( ) ;
251280 monitor . connection = null ;
252-
253281 monitor . emitAndLogHeartbeat (
254282 Server . SERVER_HEARTBEAT_FAILED ,
255283 monitor [ kServer ] . topology . s . id ,
@@ -275,11 +303,15 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
275303 hello . isWritablePrimary = hello [ LEGACY_HELLO_COMMAND ] ;
276304 }
277305
306+ // NOTE: here we use the latestRtt as this measurement corresponds with the value
307+ // obtained for this successful heartbeat
278308 const duration =
279309 isAwaitable && monitor . rttPinger
280- ? monitor . rttPinger . roundTripTime
310+ ? monitor . rttPinger . latestRtt ?? calculateDurationInMs ( start )
281311 : calculateDurationInMs ( start ) ;
282312
313+ monitor . addRttSample ( duration ) ;
314+
283315 monitor . emitAndLogHeartbeat (
284316 Server . SERVER_HEARTBEAT_SUCCEEDED ,
285317 monitor [ kServer ] . topology . s . id ,
@@ -328,13 +360,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
328360 : { socketTimeoutMS : connectTimeoutMS } ;
329361
330362 if ( isAwaitable && monitor . rttPinger == null ) {
331- monitor . rttPinger = new RTTPinger (
332- monitor [ kCancellationToken ] ,
333- Object . assign (
334- { heartbeatFrequencyMS : monitor . options . heartbeatFrequencyMS } ,
335- monitor . connectOptions
336- )
337- ) ;
363+ monitor . rttPinger = new RTTPinger ( monitor ) ;
338364 }
339365
340366 // Record new start time before sending handshake
@@ -377,6 +403,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
377403 connection . destroy ( ) ;
378404 return ;
379405 }
406+ const duration = calculateDurationInMs ( start ) ;
407+ monitor . addRttSample ( duration ) ;
380408
381409 monitor . connection = connection ;
382410 monitor . emitAndLogHeartbeat (
@@ -385,7 +413,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
385413 connection . hello ?. connectionId ,
386414 new ServerHeartbeatSucceededEvent (
387415 monitor . address ,
388- calculateDurationInMs ( start ) ,
416+ duration ,
389417 connection . hello ,
390418 useStreamingProtocol ( monitor , connection . hello ?. topologyVersion )
391419 )
@@ -458,23 +486,30 @@ export class RTTPinger {
458486 /** @internal */
459487 [ kCancellationToken ] : CancellationToken ;
460488 /** @internal */
461- [ kRoundTripTime ] : number ;
462- /** @internal */
463489 [ kMonitorId ] : NodeJS . Timeout ;
490+ /** @internal */
491+ monitor : Monitor ;
464492 closed : boolean ;
493+ /** @internal */
494+ latestRtt ?: number ;
465495
466- constructor ( cancellationToken : CancellationToken , options : RTTPingerOptions ) {
496+ constructor ( monitor : Monitor ) {
467497 this . connection = undefined ;
468- this [ kCancellationToken ] = cancellationToken ;
469- this [ kRoundTripTime ] = 0 ;
498+ this [ kCancellationToken ] = monitor [ kCancellationToken ] ;
470499 this . closed = false ;
500+ this . monitor = monitor ;
501+ this . latestRtt = monitor . latestRtt ;
471502
472- const heartbeatFrequencyMS = options . heartbeatFrequencyMS ;
473- this [ kMonitorId ] = setTimeout ( ( ) => measureRoundTripTime ( this , options ) , heartbeatFrequencyMS ) ;
503+ const heartbeatFrequencyMS = monitor . options . heartbeatFrequencyMS ;
504+ this [ kMonitorId ] = setTimeout ( ( ) => this . measureRoundTripTime ( ) , heartbeatFrequencyMS ) ;
474505 }
475506
476507 get roundTripTime ( ) : number {
477- return this [ kRoundTripTime ] ;
508+ return this . monitor . roundTripTime ;
509+ }
510+
511+ get minRoundTripTime ( ) : number {
512+ return this . monitor . minRoundTripTime ;
478513 }
479514
480515 close ( ) : void {
@@ -484,61 +519,60 @@ export class RTTPinger {
484519 this . connection ?. destroy ( ) ;
485520 this . connection = undefined ;
486521 }
487- }
488-
489- function measureRoundTripTime ( rttPinger : RTTPinger , options : RTTPingerOptions ) {
490- const start = now ( ) ;
491- options . cancellationToken = rttPinger [ kCancellationToken ] ;
492- const heartbeatFrequencyMS = options . heartbeatFrequencyMS ;
493-
494- if ( rttPinger . closed ) {
495- return ;
496- }
497522
498- function measureAndReschedule ( conn ?: Connection ) {
499- if ( rttPinger . closed ) {
523+ private measureAndReschedule ( start ?: number , conn ?: Connection ) {
524+ if ( start == null ) {
525+ start = now ( ) ;
526+ }
527+ if ( this . closed ) {
500528 conn ?. destroy ( ) ;
501529 return ;
502530 }
503531
504- if ( rttPinger . connection == null ) {
505- rttPinger . connection = conn ;
532+ if ( this . connection == null ) {
533+ this . connection = conn ;
506534 }
507535
508- rttPinger [ kRoundTripTime ] = calculateDurationInMs ( start ) ;
509- rttPinger [ kMonitorId ] = setTimeout (
510- ( ) => measureRoundTripTime ( rttPinger , options ) ,
511- heartbeatFrequencyMS
536+ this . latestRtt = calculateDurationInMs ( start ) ;
537+ this [ kMonitorId ] = setTimeout (
538+ ( ) => this . measureRoundTripTime ( ) ,
539+ this . monitor . options . heartbeatFrequencyMS
512540 ) ;
513541 }
514542
515- const connection = rttPinger . connection ;
516- if ( connection == null ) {
543+ private measureRoundTripTime ( ) {
544+ const start = now ( ) ;
545+
546+ if ( this . closed ) {
547+ return ;
548+ }
549+
550+ const connection = this . connection ;
551+ if ( connection == null ) {
552+ // eslint-disable-next-line github/no-then
553+ connect ( this . monitor . connectOptions ) . then (
554+ connection => {
555+ this . measureAndReschedule ( start , connection ) ;
556+ } ,
557+ ( ) => {
558+ this . connection = undefined ;
559+ }
560+ ) ;
561+ return ;
562+ }
563+
564+ const commandName =
565+ connection . serverApi ?. version || connection . helloOk ? 'hello' : LEGACY_HELLO_COMMAND ;
517566 // eslint-disable-next-line github/no-then
518- connect ( options ) . then (
519- connection => {
520- measureAndReschedule ( connection ) ;
521- } ,
567+ connection . command ( ns ( 'admin.$cmd' ) , { [ commandName ] : 1 } , undefined ) . then (
568+ ( ) => this . measureAndReschedule ( ) ,
522569 ( ) => {
523- rttPinger . connection = undefined ;
524- rttPinger [ kRoundTripTime ] = 0 ;
570+ this . connection ?. destroy ( ) ;
571+ this . connection = undefined ;
572+ return ;
525573 }
526574 ) ;
527- return ;
528575 }
529-
530- const commandName =
531- connection . serverApi ?. version || connection . helloOk ? 'hello' : LEGACY_HELLO_COMMAND ;
532- // eslint-disable-next-line github/no-then
533- connection . command ( ns ( 'admin.$cmd' ) , { [ commandName ] : 1 } , undefined ) . then (
534- ( ) => measureAndReschedule ( ) ,
535- ( ) => {
536- rttPinger . connection ?. destroy ( ) ;
537- rttPinger . connection = undefined ;
538- rttPinger [ kRoundTripTime ] = 0 ;
539- return ;
540- }
541- ) ;
542576}
543577
544578/**
@@ -666,3 +700,82 @@ export class MonitorInterval {
666700 } ) ;
667701 } ;
668702}
703+
704+ /** @internal
705+ * This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
706+ *
707+ * This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
708+ * the most recent `windowSize` samples
709+ * */
710+ export class RTTSampler {
711+ /** Index of the next slot to be overwritten */
712+ private writeIndex : number ;
713+ private length : number ;
714+ private rttSamples : Float64Array ;
715+
716+ constructor ( windowSize = 10 ) {
717+ this . rttSamples = new Float64Array ( windowSize ) ;
718+ this . length = 0 ;
719+ this . writeIndex = 0 ;
720+ }
721+
722+ /**
723+ * Adds an rtt sample to the end of the circular buffer
724+ * When `windowSize` samples have been collected, `addSample` overwrites the least recently added
725+ * sample
726+ */
727+ addSample ( sample : number ) {
728+ this . rttSamples [ this . writeIndex ++ ] = sample ;
729+ if ( this . length < this . rttSamples . length ) {
730+ this . length ++ ;
731+ }
732+
733+ this . writeIndex %= this . rttSamples . length ;
734+ }
735+
736+ /**
737+ * When \< 2 samples have been collected, returns 0
738+ * Otherwise computes the minimum value samples contained in the buffer
739+ */
740+ min ( ) : number {
741+ if ( this . length < 2 ) return 0 ;
742+ let min = this . rttSamples [ 0 ] ;
743+ for ( let i = 1 ; i < this . length ; i ++ ) {
744+ if ( this . rttSamples [ i ] < min ) min = this . rttSamples [ i ] ;
745+ }
746+
747+ return min ;
748+ }
749+
750+ /**
751+ * Returns mean of samples contained in the buffer
752+ */
753+ average ( ) : number {
754+ if ( this . length === 0 ) return 0 ;
755+ let sum = 0 ;
756+ for ( let i = 0 ; i < this . length ; i ++ ) {
757+ sum += this . rttSamples [ i ] ;
758+ }
759+
760+ return sum / this . length ;
761+ }
762+
763+ /**
764+ * Returns most recently inserted element in the buffer
765+ * Returns null if the buffer is empty
766+ * */
767+ get last ( ) : number | null {
768+ if ( this . length === 0 ) return null ;
769+ return this . rttSamples [ this . writeIndex === 0 ? this . length - 1 : this . writeIndex - 1 ] ;
770+ }
771+
772+ /**
773+ * Clear the buffer
774+ * NOTE: this does not overwrite the data held in the internal array, just the pointers into
775+ * this array
776+ */
777+ clear ( ) {
778+ this . length = 0 ;
779+ this . writeIndex = 0 ;
780+ }
781+ }
0 commit comments