Skip to content
This repository was archived by the owner on Feb 4, 2022. It is now read-only.

Commit ed76be0

Browse files
committed
feat(txns): add initial transaction interface for sessions
NODE-1374
1 parent b472d45 commit ed76be0

File tree

5 files changed

+198
-60
lines changed

5 files changed

+198
-60
lines changed

lib/sessions.js

+136-8
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,37 @@
11
'use strict';
22

3-
const retrieveBSON = require('./connection/utils').retrieveBSON,
4-
EventEmitter = require('events'),
5-
BSON = retrieveBSON(),
6-
Binary = BSON.Binary,
7-
uuidV4 = require('./utils').uuidV4;
3+
const retrieveBSON = require('./connection/utils').retrieveBSON;
4+
const EventEmitter = require('events');
5+
const BSON = retrieveBSON();
6+
const Binary = BSON.Binary;
7+
const uuidV4 = require('./utils').uuidV4;
8+
const MongoError = require('./error').MongoError;
89

9-
/**
10-
*
11-
*/
10+
function assertAlive(session, callback) {
11+
if (session.serverSession == null) {
12+
const error = new MongoError('Cannot use a session that has ended');
13+
if (typeof callback === 'function') {
14+
return callback(error, null);
15+
}
16+
17+
throw error;
18+
}
19+
};
20+
21+
/** A class representing a client session on the server */
1222
class ClientSession extends EventEmitter {
23+
24+
/**
25+
* Create a client session.
26+
* WARNING: not meant to be instantiated directly
27+
*
28+
* @param {Topology} topology The current client's topology
29+
* @param {ServerSessionPool} sessionPool The server session pool
30+
* @param {Object} [options] Optional settings
31+
* @param {Boolean} [options.causalConsistency] Whether causal consistency should be enabled on this session
32+
* @param {Boolean} [options.autoStartTransaction=false] When enabled this session automatically starts a transaction with the provided defaultTransactionOptions.
33+
* @param {Object} [options.defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
34+
*/
1335
constructor(topology, sessionPool, options) {
1436
super();
1537

@@ -42,10 +64,20 @@ class ClientSession extends EventEmitter {
4264

4365
this.explicit = !!options.explicit;
4466
this.owner = options.owner;
67+
this.transactionOptions = null;
68+
this.defaultTransactionOptions = options.defaultTransactionOptions || {};
69+
70+
if (options.autoStartTransaction) {
71+
this.startTransaction();
72+
}
4573
}
4674

4775
/**
76+
* Ends this session on the server
4877
*
78+
* @param {Object} [options] Optional settings
79+
* @param {Boolean} [options.skipCommand] Skip sending the actual endSessions command to the server
80+
* @param {Function} [callback] Optional callback for completion of this operation
4981
*/
5082
endSession(options, callback) {
5183
if (typeof options === 'function') (callback = options), (options = {});
@@ -56,6 +88,10 @@ class ClientSession extends EventEmitter {
5688
return;
5789
}
5890

91+
if (this.serverSession && this.inTransaction()) {
92+
this.abortTransaction(); // pass in callback?
93+
}
94+
5995
if (!options.skipCommand) {
6096
// send the `endSessions` command
6197
this.topology.endSessions(this.id);
@@ -98,6 +134,98 @@ class ClientSession extends EventEmitter {
98134

99135
return this.id.id.buffer.equals(session.id.id.buffer);
100136
}
137+
138+
/**
139+
* @returns whether this session is current in a transaction or not
140+
*/
141+
inTransaction() {
142+
return this.transactionOptions != null;
143+
}
144+
145+
/**
146+
* Starts a new transaction with the given options.
147+
*
148+
* @param {Object} options Optional settings
149+
* @param {ReadConcern} [options.readConcern] The readConcern to use for this transaction
150+
* @param {WriteConcern} [options.writeConcern] The writeConcern to use for this transaction
151+
*/
152+
startTransaction(options) {
153+
assertAlive(this);
154+
if (this.inTransaction()) {
155+
throw new MongoError('Transaction already started');
156+
}
157+
158+
// increment txnNumber and reset stmtId to zero.
159+
this.serverSession.txnNumber += 1;
160+
this.serverSession.stmtId = 0;
161+
162+
// set transaction options, we will use this to determine if we are in a transaction
163+
this.transactionOptions = options || this.defaultTransactionOptions;
164+
}
165+
166+
/**
167+
* Commits the currently active transaction in this session.
168+
*
169+
* @param {Function} [callback] optional callback for completion of this operation
170+
* @return {Promise} A promise is returned if no callback is provided
171+
*/
172+
commitTransaction(callback) {
173+
if (typeof callback === 'function') {
174+
endTransaction(this, 'commitTransaction', callback);
175+
return;
176+
}
177+
178+
return new Promise((resolve, reject) => {
179+
endTransaction(this, 'commitTransaction', (err, reply) => err ? reject(err) : resolve(reply));
180+
});
181+
}
182+
183+
/**
184+
* Aborts the currently active transaction in this session.
185+
*
186+
* @param {Function} [callback] optional callback for completion of this operation
187+
* @return {Promise} A promise is returned if no callback is provided
188+
*/
189+
abortTransaction(callback) {
190+
if (typeof callback === 'function') {
191+
endTransaction(this, 'abortTransaction', callback);
192+
return;
193+
}
194+
195+
return new Promise((resolve, reject) => {
196+
endTransaction(this, 'abortTransaction', (err, reply) => err ? reject(err) : resolve(reply));
197+
});
198+
199+
}
200+
}
201+
202+
function endTransaction(clientSession, commandName, callback) {
203+
assertAlive(clientSession, callback);
204+
205+
if (!clientSession.inTransaction()) {
206+
callback(new MongoError('No transaction started'));
207+
return;
208+
}
209+
210+
if (clientSession.serverSession.stmtId === 0) {
211+
// The server transaction was never started.
212+
callback(null, null);
213+
return;
214+
}
215+
216+
// send the command
217+
clientSession.topology.command('admin.$cmd', { [commandName]: 1 }, {
218+
writeConcern: clientSession.transactionOptions.writeConcern
219+
}, (err, reply) => {
220+
// reset internal transaction state
221+
if (clientSession.options.autoStartTransaction) {
222+
clientSession.startTransaction();
223+
} else {
224+
clientSession.transactionOptions = null;
225+
}
226+
227+
callback(err, reply);
228+
});
101229
}
102230

103231
Object.defineProperty(ClientSession.prototype, 'id', {

lib/topologies/mongos.js

+18-19
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
'use strict';
22

3-
const inherits = require('util').inherits,
4-
f = require('util').format,
5-
EventEmitter = require('events').EventEmitter,
6-
BasicCursor = require('../cursor'),
7-
Logger = require('../connection/logger'),
8-
retrieveBSON = require('../connection/utils').retrieveBSON,
9-
MongoError = require('../error').MongoError,
10-
errors = require('../error'),
11-
Server = require('./server'),
12-
clone = require('./shared').clone,
13-
diff = require('./shared').diff,
14-
cloneOptions = require('./shared').cloneOptions,
15-
createClientInfo = require('./shared').createClientInfo,
16-
SessionMixins = require('./shared').SessionMixins,
17-
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
18-
getNextTransactionNumber = require('./shared').getNextTransactionNumber,
19-
relayEvents = require('./shared').relayEvents;
20-
3+
const inherits = require('util').inherits;
4+
const f = require('util').format;
5+
const EventEmitter = require('events').EventEmitter;
6+
const BasicCursor = require('../cursor');
7+
const Logger = require('../connection/logger');
8+
const retrieveBSON = require('../connection/utils').retrieveBSON;
9+
const MongoError = require('../error').MongoError;
10+
const errors = require('../error');
11+
const Server = require('./server');
12+
const clone = require('./shared').clone;
13+
const diff = require('./shared').diff;
14+
const cloneOptions = require('./shared').cloneOptions;
15+
const createClientInfo = require('./shared').createClientInfo;
16+
const SessionMixins = require('./shared').SessionMixins;
17+
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
18+
const incrementTransactionNumber = require('./shared').incrementTransactionNumber;
19+
const relayEvents = require('./shared').relayEvents;
2120
const BSON = retrieveBSON();
2221

2322
/**
@@ -909,7 +908,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
909908
}
910909

911910
// increment and assign txnNumber
912-
options.txnNumber = getNextTransactionNumber(options.session);
911+
incrementTransactionNumber(options.session);
913912

914913
server[op](ns, ops, options, (err, result) => {
915914
if (!err) return callback(null, result);

lib/topologies/replset.js

+20-20
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
'use strict';
22

3-
var inherits = require('util').inherits,
4-
f = require('util').format,
5-
EventEmitter = require('events').EventEmitter,
6-
ReadPreference = require('./read_preference'),
7-
BasicCursor = require('../cursor'),
8-
retrieveBSON = require('../connection/utils').retrieveBSON,
9-
Logger = require('../connection/logger'),
10-
MongoError = require('../error').MongoError,
11-
errors = require('../error'),
12-
Server = require('./server'),
13-
ReplSetState = require('./replset_state'),
14-
clone = require('./shared').clone,
15-
Timeout = require('./shared').Timeout,
16-
Interval = require('./shared').Interval,
17-
createClientInfo = require('./shared').createClientInfo,
18-
SessionMixins = require('./shared').SessionMixins,
19-
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
20-
getNextTransactionNumber = require('./shared').getNextTransactionNumber,
21-
relayEvents = require('./shared').relayEvents;
3+
const inherits = require('util').inherits;
4+
const f = require('util').format;
5+
const EventEmitter = require('events').EventEmitter;
6+
const ReadPreference = require('./read_preference');
7+
const BasicCursor = require('../cursor');
8+
const retrieveBSON = require('../connection/utils').retrieveBSON;
9+
const Logger = require('../connection/logger');
10+
const MongoError = require('../error').MongoError;
11+
const errors = require('../error');
12+
const Server = require('./server');
13+
const ReplSetState = require('./replset_state');
14+
const clone = require('./shared').clone;
15+
const Timeout = require('./shared').Timeout;
16+
const Interval = require('./shared').Interval;
17+
const createClientInfo = require('./shared').createClientInfo;
18+
const SessionMixins = require('./shared').SessionMixins;
19+
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
20+
const incrementTransactionNumber = require('./shared').incrementTransactionNumber;
21+
const relayEvents = require('./shared').relayEvents;
2222

2323
var MongoCR = require('../auth/mongocr'),
2424
X509 = require('../auth/x509'),
@@ -1230,7 +1230,7 @@ function executeWriteOperation(args, options, callback) {
12301230

12311231
// increment and assign txnNumber
12321232
if (willRetryWrite) {
1233-
options.txnNumber = getNextTransactionNumber(options.session);
1233+
incrementTransactionNumber(options.session);
12341234
}
12351235

12361236
return self.s.replicaSetState.primary[op](ns, ops, options, handler);

lib/topologies/shared.js

+2-3
Original file line numberDiff line numberDiff line change
@@ -425,9 +425,8 @@ const isRetryableWritesSupported = function(topology) {
425425
*
426426
* @param {ClientSession} session
427427
*/
428-
const getNextTransactionNumber = function(session) {
428+
const incrementTransactionNumber = function(session) {
429429
session.serverSession.txnNumber++;
430-
return BSON.Long.fromNumber(session.serverSession.txnNumber);
431430
};
432431

433432
/**
@@ -454,5 +453,5 @@ module.exports.diff = diff;
454453
module.exports.Interval = Interval;
455454
module.exports.Timeout = Timeout;
456455
module.exports.isRetryableWritesSupported = isRetryableWritesSupported;
457-
module.exports.getNextTransactionNumber = getNextTransactionNumber;
456+
module.exports.incrementTransactionNumber = incrementTransactionNumber;
458457
module.exports.relayEvents = relayEvents;

lib/wireprotocol/3_2_support.js

+22-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
'use strict';
22

3-
var Query = require('../connection/commands').Query,
4-
retrieveBSON = require('../connection/utils').retrieveBSON,
5-
f = require('util').format,
6-
MongoError = require('../error').MongoError,
7-
MongoNetworkError = require('../error').MongoNetworkError,
8-
getReadPreference = require('./shared').getReadPreference;
3+
const Query = require('../connection/commands').Query;
4+
const retrieveBSON = require('../connection/utils').retrieveBSON;
5+
const f = require('util').format;
6+
const MongoError = require('../error').MongoError;
7+
const MongoNetworkError = require('../error').MongoNetworkError;
8+
const getReadPreference = require('./shared').getReadPreference;
99

10-
var BSON = retrieveBSON(),
11-
Long = BSON.Long;
10+
const BSON = retrieveBSON();
11+
const Long = BSON.Long;
1212

1313
var WireProtocol = function(legacyWireProtocol) {
1414
this.legacyWireProtocol = legacyWireProtocol;
@@ -57,8 +57,20 @@ var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callba
5757
}
5858

5959
// optionally add a `txnNumber` if retryable writes are being attempted
60-
if (typeof options.txnNumber !== 'undefined') {
61-
writeCommand.txnNumber = options.txnNumber;
60+
if (options.session && options.session.serverSession) {
61+
const serverSession = options.session.serverSession;
62+
if (serverSession.txnNumber) {
63+
writeCommand.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
64+
}
65+
66+
if (typeof serverSession.stmtId !== 'undefined') {
67+
writeCommand.stmtId = serverSession.stmtId;
68+
69+
if (serverSession.stmtId === 0) {
70+
writeCommand.startTransaction = true;
71+
writeCommand.autocommit = false;
72+
}
73+
}
6274
}
6375

6476
// Options object

0 commit comments

Comments
 (0)