Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit ccc5e1d

Browse files
committed
feat(server-selection): add basic support for server selection
NODE-1259
1 parent cc843cf commit ccc5e1d

File tree

4 files changed

+265
-20
lines changed

4 files changed

+265
-20
lines changed

lib/sdam/server_description.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@ class ServerDescription {
2323
* Create a ServerDescription
2424
* @param {String} address The address of the server
2525
* @param {Object} [ismaster] An optional ismaster response for this server
26+
* @param {Object} [options] Optional settings
27+
* @param {Number} [options.roundTripTime] The round trip time to ping this server (in ms)
2628
*/
27-
constructor(address, ismaster) {
29+
constructor(address, ismaster, options) {
30+
options = options || {};
31+
2832
this.address = address;
2933
this.error = null;
30-
this.roundTripTime = null;
34+
this.roundTripTime = options.roundTripTime;
3135
this.lastWriteDate = ismaster && ismaster.lastWrite ? ismaster.lastWrite.lasteWriteDate : null;
3236
this.opTime = ismaster && ismaster.lastWrite ? ismaster.lastWrite.opTime : null;
3337
this.type = parseServerType(ismaster);

lib/sdam/server_selectors.js

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
'use strict';
2+
const ServerType = require('./server_description').ServerType;
3+
const TopologyType = require('./topology_description').TopologyType;
4+
const ReadPreference = require('../topologies/read_preference');
5+
6+
function writableServerSelector() {
7+
return function(topologyDescription, servers) {
8+
if (topologyDescription === TopologyType.ReplicaSetNoPrimary) return [];
9+
if (
10+
topologyDescription.type === TopologyType.Sharded ||
11+
topologyDescription.type === TopologyType.Single
12+
) {
13+
return servers;
14+
}
15+
16+
return servers.filter(s => s.isWritable);
17+
};
18+
}
19+
20+
// reducers
21+
function maxStalenessReducer(readPreference, topologyDescription, servers) {
22+
if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
23+
return servers;
24+
}
25+
26+
if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) {
27+
const primary = servers.filter(primaryFilter);
28+
return servers.reduce((result, server) => {
29+
const staleness =
30+
server.lastUpdateTime -
31+
server.lastWriteDate -
32+
(primary.lastUpdateTime - primary.lastWriteDate) +
33+
topologyDescription.heartbeatFrequencyMS;
34+
35+
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
36+
return result;
37+
}, []);
38+
} else if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) {
39+
const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max));
40+
return servers.reduce((result, server) => {
41+
const staleness =
42+
sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
43+
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
44+
return result;
45+
}, []);
46+
}
47+
48+
return servers;
49+
}
50+
51+
function tagSetMatch(tagSet, serverTags) {
52+
const keys = Object.keys(tagSet);
53+
const serverTagKeys = Object.keys(serverTags);
54+
for (let i = 0; i < keys.length; ++i) {
55+
const key = keys[i];
56+
if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
57+
return false;
58+
}
59+
}
60+
61+
return true;
62+
}
63+
64+
function tagSetReducer(readPreference, servers) {
65+
if (
66+
readPreference.tags == null ||
67+
(Array.isArray(readPreference.tags) && readPreference.tags.length === 0)
68+
) {
69+
return servers;
70+
}
71+
72+
for (let i = 0; i < readPreference.tags.length; ++i) {
73+
const tagSet = readPreference.tags[i];
74+
const serversMatchingTagset = servers.reduce((matched, server) => {
75+
if (tagSetMatch(tagSet, server.tags)) matched.push(server);
76+
return matched;
77+
}, []);
78+
79+
if (serversMatchingTagset.length) {
80+
return serversMatchingTagset;
81+
}
82+
}
83+
84+
return [];
85+
}
86+
87+
function latencyWindowReducer(readPreference, servers) {
88+
return servers;
89+
}
90+
91+
// filters
92+
function primaryFilter(server) {
93+
return server.type === ServerType.RSPrimary;
94+
}
95+
96+
function secondaryFilter(server) {
97+
return server.type === ServerType.RSSecondary;
98+
}
99+
100+
function nearestFilter(server) {
101+
return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary;
102+
}
103+
104+
function readPreferenceServerSelector(readPreference) {
105+
if (!readPreference.isValid()) {
106+
throw new TypeError('Invalid read preference specified');
107+
}
108+
109+
return function(topologyDescription, servers) {
110+
if (
111+
topologyDescription.type === TopologyType.Single ||
112+
topologyDescription.type === TopologyType.Sharded ||
113+
topologyDescription.type === TopologyType.Unknown
114+
) {
115+
return servers;
116+
}
117+
118+
if (readPreference.mode === ReadPreference.PRIMARY) {
119+
return servers.filter(s => s.type === ServerType.RSPrimary);
120+
}
121+
122+
if (readPreference.mode === ReadPreference.SECONDARY) {
123+
return latencyWindowReducer(
124+
readPreference,
125+
tagSetReducer(
126+
readPreference,
127+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
128+
)
129+
);
130+
} else if (readPreference.mode === ReadPreference.NEAREST) {
131+
return latencyWindowReducer(
132+
readPreference,
133+
tagSetReducer(
134+
readPreference,
135+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
136+
)
137+
);
138+
} else if (readPreference.mode === ReadPreference.SECONDARY_PREFERRED) {
139+
const result = latencyWindowReducer(
140+
readPreference,
141+
tagSetReducer(
142+
readPreference,
143+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
144+
)
145+
);
146+
147+
return result.length === 0 ? servers.filter(primaryFilter) : result;
148+
} else if (readPreference.mode === ReadPreference.PRIMARY_PREFERRED) {
149+
const result = servers.filter(primaryFilter);
150+
if (result.length) {
151+
return result;
152+
}
153+
154+
return latencyWindowReducer(
155+
readPreference,
156+
tagSetReducer(
157+
readPreference,
158+
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
159+
)
160+
);
161+
}
162+
};
163+
}
164+
165+
module.exports = {
166+
writableServerSelector,
167+
readPreferenceServerSelector
168+
};

lib/sdam/topology.js

+84-7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,18 @@ const ServerDescription = require('./server_description').ServerDescription;
44
const TopologyDescription = require('./topology_description').TopologyDescription;
55
const TopologyType = require('./topology_description').TopologyType;
66
const monitoring = require('./monitoring');
7+
const calculateDurationInMs = require('../utils').calculateDurationInMs;
8+
const MongoTimeoutError = require('../error').MongoTimeoutError;
9+
const MongoError = require('../error').MongoError;
710

811
// Global state
912
let globalTopologyCounter = 0;
1013

14+
// Constants
15+
const DEFAULT_LOCAL_THRESHOLD_MS = 15;
16+
const DEFAULT_HEARTBEAT_FREQUENCY = 10000;
17+
const DEFAULT_SERVER_SELECTION_TIMEOUT = 30000;
18+
1119
/**
1220
* A container of server instances representing a connection to a MongoDB topology.
1321
*
@@ -27,11 +35,22 @@ class Topology extends EventEmitter {
2735
*
2836
* @param {Array|String} seedlist a string list, or array of Server instances to connect to
2937
* @param {Object} [options] Optional settings
38+
* @param {Number} [options.localThresholdMS=15] The size of the latency window for selecting among multiple suitable servers
39+
* @param {Number} [options.serverSelectionTimeoutMS=30000] How long to block for server selection before throwing an error
40+
* @param {Number} [options.heartbeatFrequencyMS=10000] The frequency with which topology updates are scheduled
3041
*/
3142
constructor(seedlist, options) {
3243
super();
3344
seedlist = seedlist || [];
34-
options = options || {};
45+
options = Object.assign(
46+
{},
47+
{
48+
localThresholdMS: DEFAULT_LOCAL_THRESHOLD_MS,
49+
serverSelectionTimeoutMS: DEFAULT_SERVER_SELECTION_TIMEOUT,
50+
heartbeatFrequencyMS: DEFAULT_HEARTBEAT_FREQUENCY
51+
},
52+
options
53+
);
3554

3655
const topologyType =
3756
seedlist.length === 1 && !options.replicaset
@@ -62,7 +81,11 @@ class Topology extends EventEmitter {
6281
null,
6382
null,
6483
options
65-
)
84+
),
85+
serverSelectionTimeoutMS:
86+
options.serverSelectionTimeoutMS || DEFAULT_SERVER_SELECTION_TIMEOUT,
87+
heartbeatFrequencyMS: options.heartbeatFrequencyMS || DEFAULT_HEARTBEAT_FREQUENCY,
88+
ServerClass: options.ServerClass || null /* eventually our Server class, but null for now */
6689
};
6790
}
6891

@@ -111,17 +134,33 @@ class Topology extends EventEmitter {
111134
/**
112135
* Selects a server according to the selection predicate provided
113136
*
114-
* @param {function} [predicate] An optional predicate to select servers by, defaults to a random selection within a latency window
137+
* @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window
115138
* @return {Server} An instance of a `Server` meeting the criteria of the predicate provided
116139
*/
117-
selectServer(/* predicate */) {
118-
return;
140+
selectServer(selector, options, callback) {
141+
if (typeof options === 'function') (callback = options), (options = {});
142+
options = Object.assign(
143+
{},
144+
{ serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS },
145+
options
146+
);
147+
148+
selectServers(
149+
this,
150+
selector,
151+
options.serverSelectionTimeoutMS,
152+
process.hrtime(),
153+
(err, servers) => {
154+
if (err) return callback(err, null);
155+
callback(null, randomSelection(servers));
156+
}
157+
);
119158
}
120159

121160
/**
122-
* Update the topology with a ServerDescription
161+
* Update the internal TopologyDescription with a ServerDescription
123162
*
124-
* @param {object} serverDescription the server to update
163+
* @param {object} serverDescription The server to update in the internal list of server descriptions
125164
*/
126165
update(serverDescription) {
127166
// these will be used for monitoring events later
@@ -153,6 +192,44 @@ class Topology extends EventEmitter {
153192
}
154193
}
155194

195+
function randomSelection(array) {
196+
return array[Math.floor(Math.random() * array.length)];
197+
}
198+
199+
class FakeServer {
200+
constructor(description) {
201+
this.description = description;
202+
}
203+
}
204+
205+
/**
206+
*
207+
* @param {*} topology
208+
* @param {*} selector
209+
* @param {*} options
210+
* @param {*} callback
211+
*/
212+
function selectServers(topology, selector, timeout, start, callback) {
213+
if (!topology.description.compatible) {
214+
return callback(new MongoError(topology.description.compatibilityError));
215+
}
216+
217+
const serverDescriptions = Array.from(topology.description.servers.values());
218+
let descriptions = selector(topology.description, serverDescriptions);
219+
if (descriptions.length) {
220+
// TODO: obviously return the actual server in the future
221+
const servers = descriptions.map(d => new FakeServer(d));
222+
return callback(null, servers);
223+
}
224+
225+
const duration = calculateDurationInMs(process.hrtime(start));
226+
if (duration > timeout) {
227+
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
228+
}
229+
230+
// TODO: loop this, add monitoring
231+
}
232+
156233
/**
157234
* A server opening SDAM monitoring event
158235
*

lib/sdam/topology_description.js

+7-11
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,12 @@ class TopologyDescription {
2929
* @param {number} maxSetVersion
3030
* @param {ObjectId} maxElectionId
3131
*/
32-
constructor(
33-
topologyType,
34-
serverDescriptions,
35-
setName,
36-
maxSetVersion,
37-
maxElectionId
38-
/*, options */
39-
) {
32+
constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) {
33+
options = options || {};
34+
4035
// TODO: consider assigning all these values to a temporary value `s` which
4136
// we use `Object.freeze` on, ensuring the internal state of this type
4237
// is immutable.
43-
4438
this.type = topologyType || TopologyType.Unknown;
4539
this.setName = setName || null;
4640
this.maxSetVersion = maxSetVersion || null;
@@ -50,6 +44,8 @@ class TopologyDescription {
5044
this.compatible = true;
5145
this.compatibilityError = null;
5246
this.logicalSessionTimeoutMinutes = null;
47+
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 0;
48+
this.options = options;
5349

5450
// determine server compatibility
5551
for (const serverDescription of this.servers.values()) {
@@ -112,7 +108,7 @@ class TopologyDescription {
112108
setName,
113109
maxSetVersion,
114110
maxElectionId,
115-
{}
111+
this.options
116112
);
117113
}
118114

@@ -192,7 +188,7 @@ class TopologyDescription {
192188
setName,
193189
maxSetVersion,
194190
maxElectionId,
195-
{}
191+
this.options
196192
);
197193
}
198194

0 commit comments

Comments
 (0)