Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit c910706

Browse files
committed
feat(cluster-time): track incoming cluster time gossiping
This is the first part of the requirement to "gossip the cluster time" by tracking the clusterTime provided on incoming server responses. NODE-1088
1 parent d80d956 commit c910706

File tree

11 files changed

+432
-31
lines changed

11 files changed

+432
-31
lines changed

lib/connection/pool.js

+11-8
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,13 @@ var _id = 0;
6565
* @fires Pool#parseError
6666
* @return {Pool} A cursor instance
6767
*/
68-
var Pool = function(options) {
68+
var Pool = function(topology, options) {
6969
// Add event listener
7070
EventEmitter.call(this);
71+
72+
// Store topology for later use
73+
this.topology = topology;
74+
7175
// Add the options
7276
this.options = assign(
7377
{
@@ -104,9 +108,6 @@ var Pool = function(options) {
104108
options
105109
);
106110

107-
// console.log("=================================== pool options")
108-
// console.dir(this.options)
109-
110111
// Identification information
111112
this.id = _id++;
112113
// Current reconnect retries
@@ -269,9 +270,6 @@ function reauthenticate(pool, connection, cb) {
269270

270271
function connectionFailureHandler(self, event) {
271272
return function(err) {
272-
// console.log("========== connectionFailureHandler :: " + event)
273-
// console.dir(err)
274-
275273
if (this._connectionFailHandled) return;
276274
this._connectionFailHandled = true;
277275
// Destroy the connection
@@ -323,7 +321,6 @@ function connectionFailureHandler(self, event) {
323321

324322
function attemptReconnect(self) {
325323
return function() {
326-
// console.log("========================= attemptReconnect")
327324
self.emit('attemptReconnect', self);
328325
if (self.state === DESTROYED || self.state === DESTROYING) return;
329326

@@ -542,6 +539,12 @@ function messageHandler(self) {
542539
return handleOperationCallback(self, workItem.cb, new MongoError(err));
543540
}
544541

542+
// Look for clusterTime, and update it if necessary
543+
if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) {
544+
const $clusterTime = message.documents[0].$clusterTime;
545+
self.topology.clusterTime = $clusterTime;
546+
}
547+
545548
// Establish if we have an error
546549
if (
547550
workItem.command &&

lib/topologies/mongos.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ var MongoCR = require('../auth/mongocr'),
4040
Plain = require('../auth/plain'),
4141
GSSAPI = require('../auth/gssapi'),
4242
SSPI = require('../auth/sspi'),
43-
ScramSHA1 = require('../auth/scram');
43+
ScramSHA1 = require('../auth/scram'),
44+
resolveClusterTime = require('./shared').resolveClusterTime;
4445

4546
//
4647
// States
@@ -230,6 +231,9 @@ var Mongos = function(seedlist, options) {
230231
servers: []
231232
};
232233

234+
// Highest clusterTime seen in responses from the current deployment
235+
this.clusterTime = null;
236+
233237
// Add event listener
234238
EventEmitter.call(this);
235239
};
@@ -283,7 +287,9 @@ Mongos.prototype.connect = function(options) {
283287
var servers = this.s.seedlist.map(function(x) {
284288
return new Server(
285289
assign(
286-
{},
290+
{
291+
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
292+
},
287293
self.s.options,
288294
x,
289295
{
@@ -606,7 +612,9 @@ function reconnectProxies(self, proxies, callback) {
606612
// Create a new server instance
607613
var server = new Server(
608614
assign(
609-
{},
615+
{
616+
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
617+
},
610618
self.s.options,
611619
{
612620
host: _server.name.split(':')[0],

lib/topologies/replset.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ var inherits = require('util').inherits,
1414
clone = require('./shared').clone,
1515
Timeout = require('./shared').Timeout,
1616
Interval = require('./shared').Interval,
17-
createClientInfo = require('./shared').createClientInfo;
17+
createClientInfo = require('./shared').createClientInfo,
18+
resolveClusterTime = require('./shared').resolveClusterTime;
1819

1920
var MongoCR = require('../auth/mongocr'),
2021
X509 = require('../auth/x509'),
@@ -252,6 +253,9 @@ var ReplSet = function(seedlist, options) {
252253
this.ismaster = null;
253254
// Contains the intervalId
254255
this.intervalIds = [];
256+
257+
// Highest clusterTime seen in responses from the current deployment
258+
this.clusterTime = null;
255259
};
256260

257261
inherits(ReplSet, EventEmitter);
@@ -377,7 +381,9 @@ function connectNewServers(self, servers, callback) {
377381
// Create a new server instance
378382
var server = new Server(
379383
assign(
380-
{},
384+
{
385+
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
386+
},
381387
self.s.options,
382388
{
383389
host: _server.split(':')[0],
@@ -956,7 +962,9 @@ ReplSet.prototype.connect = function(options) {
956962
var servers = this.s.seedlist.map(function(x) {
957963
return new Server(
958964
assign(
959-
{},
965+
{
966+
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
967+
},
960968
self.s.options,
961969
x,
962970
{

lib/topologies/server.js

+27-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ var Server = function(options) {
154154
compression: { compressors: createCompressionInfo(options) }
155155
};
156156

157+
// special case for Mongos and ReplSet deployments
158+
if (options.clusterTimeWatcher) {
159+
this.s.clusterTimeWatcher = options.clusterTimeWatcher;
160+
} else {
161+
// otherwise this is a single deployment and we need to track the clusterTime here
162+
this.s.clusterTime = null;
163+
}
164+
157165
// Curent ismaster
158166
this.ismaster = null;
159167
// Current ping time
@@ -203,6 +211,24 @@ Object.defineProperty(Server.prototype, 'logicalSessionTimeoutMinutes', {
203211
}
204212
});
205213

214+
// In single server deployments we track the clusterTime directly on the topology, however
215+
// in Mongos and ReplSet deployments we instead need to delegate the clusterTime up to the
216+
// tracking objects so we can ensure we are gossiping the maximum time received from the
217+
// server.
218+
Object.defineProperty(Server.prototype, 'clusterTime', {
219+
enumerable: true,
220+
set: function(clusterTime) {
221+
if (this.s.clusterTimeWatcher) {
222+
this.s.clusterTimeWatcher(clusterTime);
223+
} else {
224+
this.s.clusterTime = clusterTime;
225+
}
226+
},
227+
get: function() {
228+
return this.s.clusterTime || null;
229+
}
230+
});
231+
206232
Server.enableServerAccounting = function() {
207233
serverAccounting = true;
208234
servers = {};
@@ -503,7 +529,7 @@ Server.prototype.connect = function(options) {
503529
}
504530

505531
// Create a pool
506-
self.s.pool = new Pool(assign(self.s.options, options, { bson: this.s.bson }));
532+
self.s.pool = new Pool(this, assign(self.s.options, options, { bson: this.s.bson }));
507533

508534
// Set up listeners
509535
self.s.pool.on('close', eventHandler(self, 'close'));

lib/topologies/shared.js

+17
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,23 @@ function diff(previous, current) {
354354
return diff;
355355
}
356356

357+
/**
358+
* Shared function to determine clusterTime for a given topology
359+
*
360+
* @param {*} topology
361+
* @param {*} clusterTime
362+
*/
363+
function resolveClusterTime(topology, $clusterTime) {
364+
if (topology.clusterTime == null) {
365+
topology.clusterTime = $clusterTime;
366+
} else {
367+
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
368+
topology.clusterTime = $clusterTime;
369+
}
370+
}
371+
}
372+
373+
module.exports.resolveClusterTime = resolveClusterTime;
357374
module.exports.inquireServerState = inquireServerState;
358375
module.exports.getTopologyType = getTopologyType;
359376
module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged;

test/tests/functional/pool_tests.js

+14-14
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ describe('Pool tests', function() {
1717
Connection.enableConnectionAccounting();
1818

1919
// Attempt to connect
20-
var pool = new Pool({
20+
var pool = new Pool(null, {
2121
host: this.configuration.host,
2222
port: this.configuration.port,
2323
bson: new Bson(),
@@ -45,7 +45,7 @@ describe('Pool tests', function() {
4545
Connection.enableConnectionAccounting();
4646

4747
// Attempt to connect
48-
var pool = new Pool({
48+
var pool = new Pool(null, {
4949
host: this.configuration.host,
5050
port: this.configuration.port,
5151
bson: new Bson()
@@ -85,7 +85,7 @@ describe('Pool tests', function() {
8585
var index = 0;
8686

8787
// Attempt to connect
88-
var pool = new Pool({
88+
var pool = new Pool(null, {
8989
host: this.configuration.host,
9090
port: this.configuration.port,
9191
bson: new Bson()
@@ -205,7 +205,7 @@ describe('Pool tests', function() {
205205
this.timeout(0);
206206

207207
// Attempt to connect
208-
var pool = new Pool({
208+
var pool = new Pool(null, {
209209
host: this.configuration.host,
210210
port: this.configuration.port,
211211
socketTimeout: 3000,
@@ -242,7 +242,7 @@ describe('Pool tests', function() {
242242
Connection.enableConnectionAccounting();
243243

244244
// Attempt to connect
245-
var pool = new Pool({
245+
var pool = new Pool(null, {
246246
host: this.configuration.host,
247247
port: this.configuration.port,
248248
socketTimeout: 3000,
@@ -300,7 +300,7 @@ describe('Pool tests', function() {
300300
Connection.enableConnectionAccounting();
301301

302302
// Attempt to connect
303-
var pool = new Pool({
303+
var pool = new Pool(null, {
304304
host: this.configuration.host,
305305
port: this.configuration.port,
306306
socketTimeout: 3000,
@@ -382,7 +382,7 @@ describe('Pool tests', function() {
382382
Connection.enableConnectionAccounting();
383383

384384
// Attempt to connect
385-
var pool = new Pool({
385+
var pool = new Pool(null, {
386386
host: this.configuration.host,
387387
port: this.configuration.port,
388388
socketTimeout: 3000,
@@ -459,7 +459,7 @@ describe('Pool tests', function() {
459459
Connection.enableConnectionAccounting();
460460

461461
// Attempt to connect
462-
var pool = new Pool({
462+
var pool = new Pool(null, {
463463
host: this.configuration.host,
464464
port: this.configuration.port,
465465
socketTimeout: 1000,
@@ -526,7 +526,7 @@ describe('Pool tests', function() {
526526
expect(createUserRes).to.exist;
527527
expect(createUserErr).to.be.null;
528528
// Attempt to connect
529-
var pool = new Pool({
529+
var pool = new Pool(null, {
530530
host: self.configuration.host,
531531
port: self.configuration.port,
532532
bson: new Bson()
@@ -606,7 +606,7 @@ describe('Pool tests', function() {
606606
expect(createAdminUserErr).to.be.null;
607607

608608
// Attempt to connect
609-
var pool = new Pool({
609+
var pool = new Pool(null, {
610610
host: self.configuration.host,
611611
port: self.configuration.port,
612612
bson: new Bson()
@@ -812,7 +812,7 @@ describe('Pool tests', function() {
812812
expect(createAdminUserErr).to.be.null;
813813

814814
// Attempt to connect
815-
var pool = new Pool({
815+
var pool = new Pool(null, {
816816
host: self.configuration.host,
817817
port: self.configuration.port,
818818
bson: new Bson()
@@ -936,7 +936,7 @@ describe('Pool tests', function() {
936936
expect(createAdminUserRes).to.exist;
937937
expect(createAdminUserErr).to.be.null;
938938
// Attempt to connect
939-
var pool = new Pool({
939+
var pool = new Pool(null, {
940940
host: self.configuration.host,
941941
port: self.configuration.port,
942942
bson: new Bson()
@@ -1030,7 +1030,7 @@ describe('Pool tests', function() {
10301030
expect(createAdminUserRes).to.exist;
10311031

10321032
// Attempt to connect
1033-
var pool = new Pool({
1033+
var pool = new Pool(null, {
10341034
host: self.configuration.host,
10351035
port: self.configuration.port,
10361036
bson: new Bson()
@@ -1098,7 +1098,7 @@ describe('Pool tests', function() {
10981098
Connection.enableConnectionAccounting();
10991099

11001100
// Attempt to connect
1101-
var pool = new Pool({
1101+
var pool = new Pool(null, {
11021102
host: this.configuration.host,
11031103
port: this.configuration.port,
11041104
bson: new Bson(),

test/tests/functional/shared.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ function executeCommand(configuration, db, cmd, options, cb) {
1717
var port = options.port || configuration.port;
1818

1919
// Attempt to connect
20-
var pool = new Pool({
20+
var pool = new Pool(null, {
2121
host: host,
2222
port: port,
2323
bson: new bson()
@@ -60,7 +60,7 @@ function locateAuthMethod(configuration, cb) {
6060
var cmd = { ismaster: true };
6161

6262
// Attempt to connect
63-
var pool = new Pool({
63+
var pool = new Pool(null, {
6464
host: configuration.host,
6565
port: configuration.port,
6666
bson: new bson()

0 commit comments

Comments
 (0)