Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

NODE-1259 Refactor mongodb-core to use a single Topology type (part 1) #303

Merged
merged 32 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
338e4c1
feat(topology): introduce a single Topology type, and test runner
mbroadst Jan 17, 2018
e029583
refactor(sdam): track logicalSessionTimeoutMinutes from ismasters
mbroadst Apr 21, 2018
6948e33
test(sdam): skip monitoring tests for the moment
mbroadst Apr 21, 2018
ac80267
chore(sdam): create sdam subfolder, split topology up
mbroadst Apr 21, 2018
80ad079
feat(sdam-monitoring): add basic monitoring for new Topology type
mbroadst Apr 21, 2018
fe1be0d
fix(topology-description): we can't use Object.values yet
mbroadst Apr 22, 2018
a7d0e64
fix(sdam): we can't use Array.includes yet
mbroadst Apr 22, 2018
f5e5590
refactor(topology-description): use Maps for server descriptions
mbroadst Apr 23, 2018
f2f08f2
refactor(calculateDurationMs): move this method to common utils
mbroadst Apr 23, 2018
f01e4c8
feat(MongoTimeoutError): add common class for timeout events
mbroadst Apr 23, 2018
a31dfd1
refactor(read-preference): simplify arguments management
mbroadst Apr 23, 2018
92d1c30
feat(server-selection): add basic support for server selection
mbroadst Apr 23, 2018
22f87f5
test(server-selection): add server selection test runner
mbroadst Apr 23, 2018
ca3b831
refactor(latency-window): ensure use of lowest bound for minimum
mbroadst Apr 23, 2018
0db8da2
test(latency-window): modify selection tests to verify latency
mbroadst Apr 23, 2018
cc2e369
refactor(server-description): add `lastUpdateTime`, fix spelling
mbroadst Apr 24, 2018
7438829
refactor(topology-description): add getter for common wire version
mbroadst Apr 24, 2018
73157a6
test(max-staleness): reuse server selection test runner
mbroadst Apr 24, 2018
c4aec15
feat(max-staleness): properly support a max staleness reducer
mbroadst Apr 24, 2018
bb619f0
refactor(read-preference): support legacy non-array for tags
mbroadst Apr 24, 2018
ff2dcf0
test(max-staleness): correct old test runners for redpref changes
mbroadst Apr 24, 2018
6f07c7c
refactor(topology): add stubs for all base topology methods
mbroadst Apr 30, 2018
aa3642f
feat(topology-description): add helper method for server ownership
mbroadst May 1, 2018
8fe2bf7
refactor(topology): add Server type, add serverClosed events
mbroadst May 1, 2018
4da938c
test(sdam): reenable removal test, testing `serverClosed` events
mbroadst May 1, 2018
7995834
chore(sdam): `Server` skeleton takes a description for testing
mbroadst May 1, 2018
135de8c
refactor(server-description): simplify ismaster parsing
mbroadst May 1, 2018
c00ffc0
test(sdam): accommodate for and verify omitted fields
mbroadst May 1, 2018
198f43f
refactor(servrer-selectors): simplify nested ternary statement
mbroadst May 2, 2018
fb5d72a
refactor(topology): simplify default options, type detection
mbroadst May 2, 2018
31eb08a
refactor(topology-description): remove assert, dedupe if branches
mbroadst May 2, 2018
9653cb1
test(server-selection): `var` => `let`
mbroadst May 2, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions lib/connection/apm.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const KillCursor = require('../connection/commands').KillCursor;
const GetMore = require('../connection/commands').GetMore;
const process = require('process');
const calculateDurationInMs = require('../utils').calculateDurationInMs;

/** Commands that we want to redact because of the sensitive nature of their contents */
const SENSITIVE_COMMANDS = new Set([
Expand All @@ -18,11 +18,6 @@ const SENSITIVE_COMMANDS = new Set([

// helper methods
const extractCommandName = command => Object.keys(command)[0];
const calculateDurationInMs = started => {
const hrtime = process.hrtime(started);
return (hrtime[0] * 1e9 + hrtime[1]) / 1e6;
};

const namespace = command => command.ns;
const databaseName = command => command.ns.split('.')[0];
const collectionName = command => command.ns.split('.')[1];
Expand Down
24 changes: 20 additions & 4 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ util.inherits(MongoNetworkError, MongoError);
* @class
* @param {Error|string|object} message The error message
* @property {string} message The error message
* @return {MongoParseError} A MongoNetworkError instance
* @return {MongoParseError} A MongoParseError instance
* @extends {MongoError}
*/
const MongoParseError = function(message) {
Expand All @@ -75,8 +75,24 @@ const MongoParseError = function(message) {
};
util.inherits(MongoParseError, MongoError);

/**
* An error signifying a timeout event
*
* @class
* @param {Error|string|object} message The error message
* @property {string} message The error message
* @return {MongoTimeoutError} A MongoTimeoutError instance
* @extends {MongoError}
*/
const MongoTimeoutError = function(message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done with class and extends?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I was just following existing convention in the file. I'd prefer to convert them in a separate commit wholesale

MongoError.call(this, message);
this.name = 'MongoTimeoutError';
};
util.inherits(MongoTimeoutError, MongoError);

module.exports = {
MongoError: MongoError,
MongoNetworkError: MongoNetworkError,
MongoParseError: MongoParseError
MongoError,
MongoNetworkError,
MongoParseError,
MongoTimeoutError
};
124 changes: 124 additions & 0 deletions lib/sdam/monitoring.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use strict';

/**
* Published when server description changes, but does NOT include changes to the RTT.
*
* @property {Object} topologyId A unique identifier for the topology
* @property {ServerAddress} address The address (host/port pair) of the server
* @property {ServerDescription} previousDescription The previous server description
* @property {ServerDescription} newDescription The new server description
*/
class ServerDescriptionChangedEvent {
constructor(topologyId, address, previousDescription, newDescription) {
Object.assign(this, { topologyId, address, previousDescription, newDescription });
}
}

/**
* Published when server is initialized.
*
* @property {Object} topologyId A unique identifier for the topology
* @property {ServerAddress} address The address (host/port pair) of the server
*/
class ServerOpeningEvent {
constructor(topologyId, address) {
Object.assign(this, { topologyId, address });
}
}

/**
* Published when server is closed.
*
* @property {ServerAddress} address The address (host/port pair) of the server
* @property {Object} topologyId A unique identifier for the topology
*/
class ServerClosedEvent {
constructor(topologyId, address) {
Object.assign(this, { topologyId, address });
}
}

/**
* Published when topology description changes.
*
* @property {Object} topologyId
* @property {TopologyDescription} previousDescription The old topology description
* @property {TopologyDescription} newDescription The new topology description
*/
class TopologyDescriptionChangedEvent {
constructor(topologyId, previousDescription, newDescription) {
Object.assign(this, { topologyId, previousDescription, newDescription });
}
}

/**
* Published when topology is initialized.
*
* @param {Object} topologyId A unique identifier for the topology
*/
class TopologyOpeningEvent {
constructor(topologyId) {
Object.assign(this, { topologyId });
}
}

/**
* Published when topology is closed.
*
* @param {Object} topologyId A unique identifier for the topology
*/
class TopologyClosedEvent {
constructor(topologyId) {
Object.assign(this, { topologyId });
}
}

/**
* Fired when the server monitor’s ismaster command is started - immediately before
* the ismaster command is serialized into raw BSON and written to the socket.
*
* @property {Object} connectionId The connection id for the command
*/
class ServerHeartbeatStartedEvent {
constructor(connectionId) {
Object.assign(this, { connectionId });
}
}

/**
* Fired when the server monitor’s ismaster succeeds.
*
* @param {Number} duration The execution time of the event
* @param {Object} reply The command reply
* @param {Object} connectionId The connection id for the command
*/
class ServerHeartbeatSucceededEvent {
constructor(duration, reply, connectionId) {
Object.assign(this, { duration, reply, connectionId });
}
}

/**
* Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception.
*
* @param {Number} duration The execution time of the event
* @param {MongoError|Object} failure The command failure
* @param {Object} connectionId The connection id for the command
*/
class ServerHearbeatFailedEvent {
constructor(duration, failure, connectionId) {
Object.assign(this, { duration, failure, connectionId });
}
}

module.exports = {
ServerDescriptionChangedEvent,
ServerOpeningEvent,
ServerClosedEvent,
TopologyDescriptionChangedEvent,
TopologyOpeningEvent,
TopologyClosedEvent,
ServerHeartbeatStartedEvent,
ServerHeartbeatSucceededEvent,
ServerHearbeatFailedEvent
};
44 changes: 44 additions & 0 deletions lib/sdam/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';
const EventEmitter = require('events');

class Server extends EventEmitter {
constructor(description) {
super();

this.s = {
description
};
}

get description() {
return this.s.description;
}

/**
* Initiate server connect
*
* @param {Array} [options.auth] Array of auth options to apply on connect
*/
connect(options, callback) {
options = options || {};

if (typeof callback === 'function') {
callback(null, null);
}
}

/**
* Destroy the server connection
*
* @param {Boolean} [options.emitClose=false] Emit close event on destroy
* @param {Boolean} [options.emitDestroy=false] Emit destroy event on destroy
* @param {Boolean} [options.force=false] Force destroy the pool
*/
destroy(callback) {
if (typeof callback === 'function') {
callback(null, null);
}
}
}

module.exports = Server;
141 changes: 141 additions & 0 deletions lib/sdam/server_description.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
'use strict';

// An enumeration of server types we know about
const ServerType = {
Standalone: 'Standalone',
Mongos: 'Mongos',
PossiblePrimary: 'PossiblePrimary',
RSPrimary: 'RSPrimary',
RSSecondary: 'RSSecondary',
RSArbiter: 'RSArbiter',
RSOther: 'RSOther',
RSGhost: 'RSGhost',
Unknown: 'Unknown'
};

const WRITABLE_SERVER_TYPES = new Set([
ServerType.RSPrimary,
ServerType.Standalone,
ServerType.Mongos
]);

const ISMASTER_FIELDS = [
'minWireVersion',
'maxWireVersion',
'me',
'hosts',
'passives',
'arbiters',
'tags',
'setName',
'setVersion',
'electionId',
'primary',
'logicalSessionTimeoutMinutes'
];

/**
* The client's view of a single server, based on the most recent ismaster outcome.
*
* Internal type, not meant to be directly instantiated
*/
class ServerDescription {
/**
* Create a ServerDescription
* @param {String} address The address of the server
* @param {Object} [ismaster] An optional ismaster response for this server
* @param {Object} [options] Optional settings
* @param {Number} [options.roundTripTime] The round trip time to ping this server (in ms)
*/
constructor(address, ismaster, options) {
options = options || {};
ismaster = Object.assign(
{
minWireVersion: 0,
maxWireVersion: 0,
hosts: [],
passives: [],
arbiters: [],
tags: []
},
ismaster
);

this.address = address;
this.error = null;
this.roundTripTime = options.roundTripTime || 0;
this.lastUpdateTime = Date.now();
this.lastWriteDate = ismaster.lastWrite ? ismaster.lastWrite.lastWriteDate : null;
this.opTime = ismaster.lastWrite ? ismaster.lastWrite.opTime : null;
this.type = parseServerType(ismaster);

// direct mappings
ISMASTER_FIELDS.forEach(field => {
if (typeof ismaster[field] !== 'undefined') this[field] = ismaster[field];
});

// normalize case for hosts
this.hosts = this.hosts.map(host => host.toLowerCase());
this.passives = this.passives.map(host => host.toLowerCase());
this.arbiters = this.arbiters.map(host => host.toLowerCase());
}

get allHosts() {
return this.hosts.concat(this.arbiters).concat(this.passives);
}

/**
* @return {Boolean} Is this server available for reads
*/
get isReadable() {
return this.type === ServerType.RSSecondary || this.isWritable;
}

/**
* @return {Boolean} Is this server available for writes
*/
get isWritable() {
return WRITABLE_SERVER_TYPES.has(this.type);
}
}

/**
* Parses an `ismaster` message and determines the server type
*
* @param {Object} ismaster The `ismaster` message to parse
* @return {ServerType}
*/
function parseServerType(ismaster) {
if (!ismaster || !ismaster.ok) {
return ServerType.Unknown;
}

if (ismaster.isreplicaset) {
return ServerType.RSGhost;
}

if (ismaster.msg && ismaster.msg === 'isdbgrid') {
return ServerType.Mongos;
}

if (ismaster.setName) {
if (ismaster.hidden) {
return ServerType.RSOther;
} else if (ismaster.ismaster) {
return ServerType.RSPrimary;
} else if (ismaster.secondary) {
return ServerType.RSSecondary;
} else if (ismaster.arbiterOnly) {
return ServerType.RSArbiter;
} else {
return ServerType.RSOther;
}
}

return ServerType.Standalone;
}

module.exports = {
ServerDescription,
ServerType
};
Loading