Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3515): do proper opTime merging in bulk results #3011

Merged
merged 2 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions lib/bulk/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ class WriteError {
}
}

/**
* Converts the number to a Long or returns it.
*
* @ignore
*/
function longOrConvert(value) {
return typeof value === 'number' ? Long.fromNumber(value) : value;
}

/**
* Merges results into shared data structure
* @ignore
Expand Down Expand Up @@ -445,42 +454,37 @@ function mergeBatchResults(batch, bulkResult, err, result) {
return;
}

// Deal with opTime if available
// The server write command specification states that lastOp is an optional
// mongod only field that has a type of timestamp. Across various scarce specs
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
// "t" field with Timestamp and Long as their types respectively.
// The "lastOp" field of the bulk write result is never mentioned in the driver
// specifications or the bulk write spec, so we should probably just keep its
// value consistent since it seems to vary.
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
if (result.opTime || result.lastOp) {
const opTime = result.lastOp || result.opTime;
let lastOpTS = null;
let lastOpT = null;
let opTime = result.lastOp || result.opTime;

// We have a time stamp
if (opTime && opTime._bsontype === 'Timestamp') {
if (bulkResult.lastOp == null) {
bulkResult.lastOp = opTime;
} else if (opTime.greaterThan(bulkResult.lastOp)) {
bulkResult.lastOp = opTime;
}
} else {
// Existing TS
if (bulkResult.lastOp) {
lastOpTS =
typeof bulkResult.lastOp.ts === 'number'
? Long.fromNumber(bulkResult.lastOp.ts)
: bulkResult.lastOp.ts;
lastOpT =
typeof bulkResult.lastOp.t === 'number'
? Long.fromNumber(bulkResult.lastOp.t)
: bulkResult.lastOp.t;
}

// Current OpTime TS
const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
// If the opTime is a Timestamp, convert it to a consistent format to be
// able to compare easily. Converting to the object from a timestamp is
// much more straightforward than the other direction.
if (opTime._bsontype === 'Timestamp') {
opTime = { ts: opTime, t: Long.ZERO };
}

// Compare the opTime's
if (bulkResult.lastOp == null) {
bulkResult.lastOp = opTime;
} else if (opTimeTS.greaterThan(lastOpTS)) {
// If there's no lastOp, just set it.
if (!bulkResult.lastOp) {
bulkResult.lastOp = opTime;
} else {
// First compare the ts values and set if the opTimeTS value is greater.
const lastOpTS = longOrConvert(bulkResult.lastOp.ts);
const opTimeTS = longOrConvert(opTime.ts);
if (opTimeTS.greaterThan(lastOpTS)) {
bulkResult.lastOp = opTime;
} else if (opTimeTS.equals(lastOpTS)) {
// If the ts values are equal, then compare using the t values.
const lastOpT = longOrConvert(bulkResult.lastOp.t);
const opTimeT = longOrConvert(opTime.t);
if (opTimeT.greaterThan(lastOpT)) {
bulkResult.lastOp = opTime;
}
Expand Down Expand Up @@ -1387,6 +1391,7 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
module.exports = {
Batch,
BulkOperationBase,
mergeBatchResults,
bson,
INSERT: INSERT,
UPDATE: UPDATE,
Expand Down
128 changes: 127 additions & 1 deletion test/unit/bulk_write.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

const expect = require('chai').expect;
const mock = require('mongodb-mock-server');
const BulkWriteResult = require('../../lib/bulk/common').BulkWriteResult;
const Long = require('../../lib/core').BSON.Long;
const Timestamp = require('../../lib/core').BSON.Timestamp;
const common = require('../../lib/bulk/common');
const BulkWriteResult = common.BulkWriteResult;
const mergeBatchResults = common.mergeBatchResults;

describe('Bulk Writes', function() {
const test = {};
Expand Down Expand Up @@ -131,4 +135,126 @@ describe('Bulk Writes', function() {

expect(() => result.insertedIds).to.not.throw();
});

describe('#mergeBatchResults', function() {
let opTime;
let lastOp;
const bulkResult = {
ok: 1,
writeErrors: [],
writeConcernErrors: [],
insertedIds: [],
nInserted: 0,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 1,
upserted: []
};
const result = {
n: 8,
nModified: 8,
electionId: '7fffffff0000000000000028',
ok: 1,
$clusterTime: {
clusterTime: '7020546605669417498',
signature: {
hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=',
keyId: 0
}
},
operationTime: '7020546605669417498'
};
const batch = [];

context('when lastOp is an object', function() {
context('when the opTime is a Timestamp', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = Timestamp.fromNumber(8020546605669417496);
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the properly formatted object', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal({ ts: opTime, t: Long.ZERO });
});
});

context('when the opTime is an object', function() {
context('when the ts is greater', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417497, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(opTime);
});
});

context('when the ts is equal', function() {
context('when the t is greater', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 20 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(opTime);
});
});

context('when the t is equal', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});

context('when the t is less', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 5 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});
});

context('when the ts is less', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417495, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});
});
});
});
});