Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit 73ac688

Browse files
committed
feat(retryable-writes): initial support on replicasets
NODE-1105
1 parent cceca99 commit 73ac688

File tree

3 files changed

+129
-13
lines changed

3 files changed

+129
-13
lines changed

lib/topologies/replset.js

+33-11
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ var inherits = require('util').inherits,
88
retrieveBSON = require('../connection/utils').retrieveBSON,
99
Logger = require('../connection/logger'),
1010
MongoError = require('../error').MongoError,
11+
errors = require('../error'),
1112
Server = require('./server'),
1213
ReplSetState = require('./replset_state'),
1314
clone = require('./shared').clone,
1415
Timeout = require('./shared').Timeout,
1516
Interval = require('./shared').Interval,
1617
createClientInfo = require('./shared').createClientInfo,
17-
SessionMixins = require('./shared').SessionMixins;
18+
SessionMixins = require('./shared').SessionMixins,
19+
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
20+
txnNumber = require('./shared').txnNumber;
1821

1922
var MongoCR = require('../auth/mongocr'),
2023
X509 = require('../auth/x509'),
@@ -1154,20 +1157,39 @@ ReplSet.prototype.getServers = function() {
11541157
//
11551158
// Execute write operation
11561159
var executeWriteOperation = function(self, op, ns, ops, options, callback) {
1157-
if (typeof options === 'function') {
1158-
(callback = options), (options = {}), (options = options || {});
1159-
}
1160-
1161-
// Ensure we have no options
1160+
if (typeof options === 'function') (callback = options), (options = {});
11621161
options = options || {};
11631162

1164-
// No server returned we had an error
1165-
if (self.s.replicaSetState.primary == null) {
1166-
return callback(new MongoError('no primary server found'));
1163+
if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) {
1164+
// No server returned we had an error
1165+
if (self.s.replicaSetState.primary == null) {
1166+
return callback(new MongoError('no primary server found'));
1167+
}
1168+
1169+
// Execute the command
1170+
return self.s.replicaSetState.primary[op](ns, ops, options, callback);
11671171
}
11681172

1169-
// Execute the command
1170-
self.s.replicaSetState.primary[op](ns, ops, options, callback);
1173+
// increment and assign txnNumber
1174+
options.txnNumber = txnNumber(options.session);
1175+
1176+
self.s.replicaSetState.primary[op](ns, ops, options, (err, result) => {
1177+
if (!err) return callback(null, result);
1178+
if (err instanceof errors.MongoNetworkError) {
1179+
return callback(err);
1180+
}
1181+
1182+
// check again, this might have changed in the interim
1183+
if (self.s.replicaSetState.primary == null) {
1184+
return callback(new MongoError('no primary server found'));
1185+
}
1186+
1187+
// increment and assign txnNumber
1188+
options.txnNumber = txnNumber(options.session);
1189+
1190+
// rerun the operation
1191+
self.s.replicaSetState.primary[op](ns, ops, options, callback);
1192+
});
11711193
};
11721194

11731195
/**

lib/topologies/shared.js

+37-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict';
22

3-
var os = require('os'),
3+
const os = require('os'),
44
f = require('util').format,
5-
ReadPreference = require('./read_preference');
5+
ReadPreference = require('./read_preference'),
6+
retrieveBSON = require('../connection/utils').retrieveBSON;
7+
8+
const BSON = retrieveBSON();
69

710
/**
811
* Emit event if it exists
@@ -402,6 +405,36 @@ const SessionMixins = {
402405
}
403406
};
404407

408+
const RETRYABLE_WIRE_VERSION = 6;
409+
410+
/**
411+
* Determines whether the provided topology supports retryable writes
412+
*
413+
* @param {Mongos|Replset} topology
414+
*/
415+
const isRetryableWritesSupported = function(topology) {
416+
const maxWireVersion = topology.lastIsMaster().maxWireVersion;
417+
if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
418+
return false;
419+
}
420+
421+
if (!topology.logicalSessionTimeoutMinutes) {
422+
return false;
423+
}
424+
425+
return true;
426+
};
427+
428+
/**
429+
* Increment the transaction number on the ServerSession contained by the provided ClientSession
430+
*
431+
* @param {ClientSession} session
432+
*/
433+
const txnNumber = function(session) {
434+
session.serverSession.txnNumber++;
435+
return BSON.Long.fromNumber(session.serverSession.txnNumber);
436+
};
437+
405438
module.exports.SessionMixins = SessionMixins;
406439
module.exports.resolveClusterTime = resolveClusterTime;
407440
module.exports.inquireServerState = inquireServerState;
@@ -415,3 +448,5 @@ module.exports.clone = clone;
415448
module.exports.diff = diff;
416449
module.exports.Interval = Interval;
417450
module.exports.Timeout = Timeout;
451+
module.exports.isRetryableWritesSupported = isRetryableWritesSupported;
452+
module.exports.txnNumber = txnNumber;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
var expect = require('chai').expect,
3+
ReplSet = require('../../../../lib/topologies/replset'),
4+
mock = require('../../../mock'),
5+
ReplSetFixture = require('../common').ReplSetFixture,
6+
ClientSession = require('../../../../lib/sessions').ClientSession,
7+
ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool;
8+
9+
const test = new ReplSetFixture();
10+
describe('Sessions (ReplSet)', function() {
11+
afterEach(() => mock.cleanup());
12+
beforeEach(() => test.setup({ ismaster: mock.DEFAULT_ISMASTER_36 }));
13+
14+
it('should add `txnNumber` to write commands where `retryWrites` is true', {
15+
metadata: { requires: { topology: ['single'] } },
16+
test: function(done) {
17+
var replset = new ReplSet(
18+
[test.primaryServer.address(), test.firstSecondaryServer.address()],
19+
{
20+
setName: 'rs',
21+
connectionTimeout: 3000,
22+
socketTimeout: 0,
23+
haInterval: 100,
24+
size: 1
25+
}
26+
);
27+
28+
const sessionPool = new ServerSessionPool(replset);
29+
const session = new ClientSession(replset, sessionPool);
30+
31+
let command = null;
32+
test.primaryServer.setMessageHandler(request => {
33+
const doc = request.document;
34+
if (doc.ismaster) {
35+
request.reply(test.primaryStates[0]);
36+
} else if (doc.insert) {
37+
command = doc;
38+
request.reply({ ok: 1 });
39+
}
40+
});
41+
42+
replset.on('all', () => {
43+
replset.insert('test.test', [{ a: 1 }], { retryWrites: true, session: session }, function(
44+
err
45+
) {
46+
expect(err).to.not.exist;
47+
expect(command).to.have.property('txnNumber');
48+
expect(command.txnNumber).to.eql(1);
49+
50+
replset.destroy();
51+
done();
52+
});
53+
});
54+
55+
replset.on('error', done);
56+
replset.connect();
57+
}
58+
});
59+
});

0 commit comments

Comments
 (0)