Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit 30a394d

Browse files
committed
feat(monitoring): add support for server monitoring to Server
This adds the final piece of SDAM into the implementation, allowing for a monitoring loop that informs our topology state machine.
1 parent a252cc7 commit 30a394d

File tree

2 files changed

+110
-5
lines changed

2 files changed

+110
-5
lines changed

lib/sdam/monitoring.js

+96-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
'use strict';
22

3+
const ServerDescription = require('./server_description').ServerDescription;
4+
const calculateDurationInMs = require('../utils').calculateDurationInMs;
5+
36
/**
47
* Published when server description changes, but does NOT include changes to the RTT.
58
*
@@ -88,7 +91,7 @@ class ServerHeartbeatStartedEvent {
8891
/**
8992
* Fired when the server monitor’s ismaster succeeds.
9093
*
91-
* @param {Number} duration The execution time of the event
94+
* @param {Number} duration The execution time of the event in ms
9295
* @param {Object} reply The command reply
9396
* @param {Object} connectionId The connection id for the command
9497
*/
@@ -101,16 +104,104 @@ class ServerHeartbeatSucceededEvent {
101104
/**
102105
* Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception.
103106
*
104-
* @param {Number} duration The execution time of the event
107+
* @param {Number} duration The execution time of the event in ms
105108
* @param {MongoError|Object} failure The command failure
106109
* @param {Object} connectionId The connection id for the command
107110
*/
108-
class ServerHearbeatFailedEvent {
111+
class ServerHeartbeatFailedEvent {
109112
constructor(duration, failure, connectionId) {
110113
Object.assign(this, { duration, failure, connectionId });
111114
}
112115
}
113116

117+
/**
118+
* Performs a server check as described by the SDAM spec.
119+
*
120+
* NOTE: This method automatically reschedules itself, so that there is always an active
121+
* monitoring process
122+
*
123+
* @param {Server} server The server to monitor
124+
*/
125+
function monitorServer(server) {
126+
// executes a single check of a server
127+
const checkServer = callback => {
128+
let start = process.hrtime();
129+
130+
// emit a signal indicating we have started the heartbeat
131+
server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));
132+
133+
server.command(
134+
'admin.$cmd',
135+
{ ismaster: true },
136+
{
137+
monitoring: true,
138+
socketTimeout: server.s.options.connectionTimeout || 2000
139+
},
140+
function(err, result) {
141+
let duration = calculateDurationInMs(start);
142+
143+
if (err) {
144+
server.emit(
145+
'serverHeartbeatFailed',
146+
new ServerHeartbeatFailedEvent(duration, err, server.name)
147+
);
148+
149+
return callback(err, null);
150+
}
151+
152+
const isMaster = result.result;
153+
server.emit(
154+
'serverHeartbeatSucceded',
155+
new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
156+
);
157+
158+
return callback(null, isMaster);
159+
}
160+
);
161+
};
162+
163+
const successHandler = isMaster => {
164+
server.s.monitoring = false;
165+
166+
// emit an event indicating that our description has changed
167+
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
168+
169+
// schedule the next monitoring process
170+
server.s.monitorId = setTimeout(
171+
() => monitorServer(server),
172+
server.s.options.heartbeatFrequencyMS
173+
);
174+
};
175+
176+
// run the actual monitoring loop
177+
server.s.monitoring = true;
178+
checkServer((err, isMaster) => {
179+
if (err) {
180+
// According to the SDAM specification's "Network error during server check" section, if
181+
// an ismaster call fails we reset the server's pool. If a server was once connected,
182+
// change its type to `Unknown` only after retrying once.
183+
184+
// TODO: we need to reset the pool here
185+
186+
return checkServer((err, isMaster) => {
187+
if (err) {
188+
server.s.monitoring = false;
189+
190+
// we revert to an `Unknown` by emitting a default description with no isMaster
191+
server.emit('descriptionReceived', new ServerDescription(server.description.address));
192+
193+
// we do not reschedule monitoring in this case
194+
return;
195+
}
196+
197+
successHandler(isMaster);
198+
});
199+
}
200+
201+
successHandler(isMaster);
202+
});
203+
}
204+
114205
module.exports = {
115206
ServerDescriptionChangedEvent,
116207
ServerOpeningEvent,
@@ -120,5 +211,6 @@ module.exports = {
120211
TopologyClosedEvent,
121212
ServerHeartbeatStartedEvent,
122213
ServerHeartbeatSucceededEvent,
123-
ServerHearbeatFailedEvent
214+
ServerHeartbeatFailedEvent,
215+
monitorServer
124216
};

lib/sdam/server.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const createClientInfo = require('../topologies/shared').createClientInfo;
1212
const Logger = require('../connection/logger');
1313
const ServerDescription = require('./server_description').ServerDescription;
1414
const ReadPreference = require('../topologies/read_preference');
15+
const monitorServer = require('./monitoring').monitorServer;
1516

1617
/**
1718
*
@@ -56,7 +57,9 @@ class Server extends EventEmitter {
5657
BSON.Timestamp
5758
]),
5859
// client metadata for the initial handshake
59-
clientInfo: createClientInfo(options)
60+
clientInfo: createClientInfo(options),
61+
// state variable to determine if there is an active server check in progress
62+
monitoring: false
6063
};
6164
}
6265

@@ -119,6 +122,16 @@ class Server extends EventEmitter {
119122
}
120123
}
121124

125+
/**
126+
* Immediately schedule monitoring of this server. If there already an attempt being made
127+
* this will be a no-op.
128+
*/
129+
monitor() {
130+
if (this.s.monitoring) return;
131+
if (this.s.monitorId) clearTimeout(this.s.monitorId);
132+
monitorServer(this);
133+
}
134+
122135
/**
123136
* Execute a command
124137
*

0 commit comments

Comments
 (0)