Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit d4c1597

Browse files
authored
feat(retryableWrites): adding more support for retries
- Adds support for retryable commands that route through the topology `.commands` function instead of a designated function like `.insert`. This adds coverage for `findAndModify` commands. - Adds support for retrying on more types of errors, and retrying on writeConcern errors Fixes NODE-1456
1 parent d71b21c commit d4c1597

File tree

6 files changed

+125
-47
lines changed

6 files changed

+125
-47
lines changed

lib/connection/pool.js

+13-9
Original file line numberDiff line numberDiff line change
@@ -575,15 +575,19 @@ function messageHandler(self) {
575575
}
576576

577577
// Establish if we have an error
578-
if (
579-
workItem.command &&
580-
message.documents[0] &&
581-
(message.documents[0].ok === 0 ||
582-
message.documents[0]['$err'] ||
583-
message.documents[0]['errmsg'] ||
584-
message.documents[0]['code'])
585-
) {
586-
return handleOperationCallback(self, workItem.cb, new MongoError(message.documents[0]));
578+
if (workItem.command && message.documents[0]) {
579+
const responseDoc = message.documents[0];
580+
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
581+
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
582+
}
583+
584+
if (responseDoc.writeConcernError) {
585+
return handleOperationCallback(
586+
self,
587+
workItem.cb,
588+
new MongoError(responseDoc.writeConcernError)
589+
);
590+
}
587591
}
588592

589593
// Add the connection details

lib/error.js

+31-1
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,40 @@ const MongoTimeoutError = function(message) {
9494
};
9595
util.inherits(MongoTimeoutError, MongoError);
9696

97+
// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms
98+
const RETRYABLE_ERROR_CODES = new Set([
99+
6, // HostUnreachable
100+
7, // HostNotFound
101+
64, // WriteConcernFailed
102+
89, // NetworkTimeout
103+
91, // ShutdownInProgress
104+
189, // PrimarySteppedDown
105+
9001, // SocketException
106+
11600, // InterruptedAtShutdown
107+
11602, // InterruptedDueToReplStateChange
108+
10107, // NotMaster
109+
13435, // NotMasterNoSlaveOk
110+
13436 // NotMasterOrSecondary
111+
]);
112+
113+
function isRetryableError(error) {
114+
if (
115+
RETRYABLE_ERROR_CODES.has(error.code) ||
116+
error instanceof MongoNetworkError ||
117+
error.message.match(/not master/) ||
118+
error.message.match(/node is recovering/)
119+
) {
120+
return true;
121+
}
122+
123+
return false;
124+
}
125+
97126
module.exports = {
98127
MongoError,
99128
MongoNetworkError,
100129
MongoParseError,
101130
MongoTimeoutError,
102-
mongoErrorContextSymbol
131+
mongoErrorContextSymbol,
132+
isRetryableError
103133
};

lib/sessions.js

+1-30
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const BSON = retrieveBSON();
66
const Binary = BSON.Binary;
77
const uuidV4 = require('./utils').uuidV4;
88
const MongoError = require('./error').MongoError;
9-
const MongoNetworkError = require('./error').MongoNetworkError;
9+
const isRetryableError = require('././error').isRetryableError;
1010

1111
function assertAlive(session, callback) {
1212
if (session.serverSession == null) {
@@ -219,35 +219,6 @@ class ClientSession extends EventEmitter {
219219
}
220220
}
221221

222-
// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms
223-
const RETRYABLE_ERROR_CODES = new Set([
224-
6, // HostUnreachable
225-
7, // HostNotFound
226-
64, // WriteConcernFailed
227-
89, // NetworkTimeout
228-
91, // ShutdownInProgress
229-
189, // PrimarySteppedDown
230-
9001, // SocketException
231-
11600, // InterruptedAtShutdown
232-
11602, // InterruptedDueToReplStateChange
233-
10107, // NotMaster
234-
13435, // NotMasterNoSlaveOk
235-
13436 // NotMasterOrSecondary
236-
]);
237-
238-
function isRetryableError(error) {
239-
if (
240-
RETRYABLE_ERROR_CODES.has(error.code) ||
241-
error instanceof MongoNetworkError ||
242-
error.message.match(/not master/) ||
243-
error.message.match(/node is recovering/)
244-
) {
245-
return true;
246-
}
247-
248-
return false;
249-
}
250-
251222
function resetTransactionState(clientSession) {
252223
clientSession.transactionOptions = null;
253224
}

lib/topologies/mongos.js

+37-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ const BasicCursor = require('../cursor');
77
const Logger = require('../connection/logger');
88
const retrieveBSON = require('../connection/utils').retrieveBSON;
99
const MongoError = require('../error').MongoError;
10-
const errors = require('../error');
1110
const Server = require('./server');
1211
const clone = require('./shared').clone;
1312
const diff = require('./shared').diff;
@@ -16,6 +15,7 @@ const createClientInfo = require('./shared').createClientInfo;
1615
const SessionMixins = require('./shared').SessionMixins;
1716
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
1817
const relayEvents = require('./shared').relayEvents;
18+
const isRetryableError = require('../error').isRetryableError;
1919
const BSON = retrieveBSON();
2020

2121
/**
@@ -900,7 +900,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
900900

901901
server[op](ns, ops, options, (err, result) => {
902902
if (!err) return callback(null, result);
903-
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) {
903+
if (!isRetryableError(err)) {
904904
return callback(err);
905905
}
906906

@@ -1019,6 +1019,12 @@ Mongos.prototype.remove = function(ns, ops, options, callback) {
10191019
executeWriteOperation(this, 'remove', ns, ops, options, callback);
10201020
};
10211021

1022+
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
1023+
1024+
function isWriteCommand(command) {
1025+
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
1026+
}
1027+
10221028
/**
10231029
* Execute a command
10241030
* @method
@@ -1057,8 +1063,36 @@ Mongos.prototype.command = function(ns, cmd, options, callback) {
10571063
var clonedOptions = cloneOptions(options);
10581064
clonedOptions.topology = self;
10591065

1066+
const willRetryWrite =
1067+
!options.retrying &&
1068+
options.retryWrites &&
1069+
options.session &&
1070+
isRetryableWritesSupported(self) &&
1071+
!options.session.inTransaction() &&
1072+
isWriteCommand(cmd);
1073+
1074+
const cb = (err, result) => {
1075+
if (!err) return callback(null, result);
1076+
if (!isRetryableError(err)) {
1077+
return callback(err);
1078+
}
1079+
1080+
if (willRetryWrite) {
1081+
const newOptions = Object.assign({}, clonedOptions, { retrying: true });
1082+
return this.command(ns, cmd, newOptions, callback);
1083+
}
1084+
1085+
return callback(err);
1086+
};
1087+
1088+
// increment and assign txnNumber
1089+
if (willRetryWrite) {
1090+
options.session.incrementTransactionNumber();
1091+
options.willRetryWrite = willRetryWrite;
1092+
}
1093+
10601094
// Execute the command
1061-
server.command(ns, cmd, clonedOptions, callback);
1095+
server.command(ns, cmd, clonedOptions, cb);
10621096
};
10631097

10641098
/**

lib/topologies/replset.js

+42-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const BasicCursor = require('../cursor');
88
const retrieveBSON = require('../connection/utils').retrieveBSON;
99
const Logger = require('../connection/logger');
1010
const MongoError = require('../error').MongoError;
11-
const errors = require('../error');
1211
const Server = require('./server');
1312
const ReplSetState = require('./replset_state');
1413
const clone = require('./shared').clone;
@@ -18,6 +17,7 @@ const createClientInfo = require('./shared').createClientInfo;
1817
const SessionMixins = require('./shared').SessionMixins;
1918
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
2019
const relayEvents = require('./shared').relayEvents;
20+
const isRetryableError = require('../error').isRetryableError;
2121

2222
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
2323

@@ -1206,7 +1206,7 @@ function executeWriteOperation(args, options, callback) {
12061206

12071207
const handler = (err, result) => {
12081208
if (!err) return callback(null, result);
1209-
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) {
1209+
if (!isRetryableError(err)) {
12101210
return callback(err);
12111211
}
12121212

@@ -1298,6 +1298,12 @@ ReplSet.prototype.remove = function(ns, ops, options, callback) {
12981298
executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
12991299
};
13001300

1301+
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
1302+
1303+
function isWriteCommand(command) {
1304+
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
1305+
}
1306+
13011307
/**
13021308
* Execute a command
13031309
* @method
@@ -1370,8 +1376,41 @@ ReplSet.prototype.command = function(ns, cmd, options, callback) {
13701376
);
13711377
}
13721378

1379+
const willRetryWrite =
1380+
!options.retrying &&
1381+
options.retryWrites &&
1382+
options.session &&
1383+
isRetryableWritesSupported(self) &&
1384+
!options.session.inTransaction() &&
1385+
isWriteCommand(cmd);
1386+
1387+
const cb = (err, result) => {
1388+
if (!err) return callback(null, result);
1389+
if (!isRetryableError(err)) {
1390+
return callback(err);
1391+
}
1392+
1393+
if (willRetryWrite) {
1394+
const newOptions = Object.assign({}, options, { retrying: true });
1395+
return this.command(ns, cmd, newOptions, callback);
1396+
}
1397+
1398+
// Per SDAM, remove primary from replicaset
1399+
if (this.s.replicaSetState.primary) {
1400+
this.s.replicaSetState.remove(this.s.replicaSetState.primary, { force: true });
1401+
}
1402+
1403+
return callback(err);
1404+
};
1405+
1406+
// increment and assign txnNumber
1407+
if (willRetryWrite) {
1408+
options.session.incrementTransactionNumber();
1409+
options.willRetryWrite = willRetryWrite;
1410+
}
1411+
13731412
// Execute the command
1374-
server.command(ns, cmd, options, callback);
1413+
server.command(ns, cmd, options, cb);
13751414
};
13761415

13771416
/**

lib/wireprotocol/3_2_support.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology,
350350
}
351351

352352
// optionally decorate query with transaction data
353-
decorateWithTransactionsData(query.query, options.session);
353+
decorateWithTransactionsData(query.query, options.session, options.willRetryWrite);
354354

355355
// We need to increment the statement id if we're in a transaction
356356
if (options.session && options.session.inTransaction()) {

0 commit comments

Comments
 (0)