Skip to content

Commit

Permalink
feat(connection): add support for Connection.prototype.bulkWrite() wi…
Browse files Browse the repository at this point in the history
…th MongoDB server 8.0

Fix #15028
  • Loading branch information
vkarpov15 committed Nov 24, 2024
1 parent 7aba322 commit bdf2f2a
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 204 deletions.
177 changes: 177 additions & 0 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ const ChangeStream = require('./cursor/changeStream');
const EventEmitter = require('events').EventEmitter;
const Schema = require('./schema');
const STATES = require('./connectionState');
const MongooseBulkWriteError = require('./error/bulkWriteError');
const MongooseError = require('./error/index');
const ServerSelectionError = require('./error/serverSelection');
const SyncIndexesError = require('./error/syncIndexes');
const applyPlugins = require('./helpers/schema/applyPlugins');
const clone = require('./helpers/clone');
const driver = require('./driver');
const get = require('./helpers/get');
const getDefaultBulkwriteResult = require('./helpers/getDefaultBulkwriteResult');
const immediate = require('./helpers/immediate');
const utils = require('./utils');
const CreateCollectionsError = require('./error/createCollectionsError');
const castBulkWrite = require('./helpers/model/castBulkWrite');
const { modelSymbol } = require('./helpers/symbols');
const isPromise = require('./helpers/isPromise');

const arrayAtomicsSymbol = require('./helpers/symbols').arrayAtomicsSymbol;
const sessionNewDocuments = require('./helpers/symbols').sessionNewDocuments;
Expand Down Expand Up @@ -416,6 +421,178 @@ Connection.prototype.createCollection = async function createCollection(collecti
return this.db.createCollection(collection, options);
};

/**
* _Requires MongoDB Server 8.0 or greater_. Executes bulk write operations across multiple models in a single operation.
* You must specify the `model` for each operation: Mongoose will use `model` for casting and validation, as well as
* determining which collection to apply the operation to.
*
* #### Example:
* const Test = mongoose.model('Test', new Schema({ name: String }));
*
* await db.bulkWrite([
* { model: Test, name: 'insertOne', document: { name: 'test1' } }, // Can specify model as a Model class...
* { model: 'Test', name: 'insertOne', document: { name: 'test2' } } // or as a model name
* ], { ordered: false });
*
* @method bulkWrite
* @param {Array} ops
* @param {Object} [options]
* @param {Boolean} [options.ordered] If false, perform unordered operations. If true, perform ordered operations.
* @param {Session} [options.session] The session to use for the operation.
* @return {Promise}
* @see MongoDB https://www.mongodb.com/docs/manual/reference/command/bulkWrite/#mongodb-dbcommand-dbcmd.bulkWrite
* @api public
*/


Connection.prototype.bulkWrite = async function bulkWrite(ops, options) {
await this._waitForConnect();
options = options || {};

const ordered = options.ordered == null ? true : options.ordered;
const asyncLocalStorage = this.base.transactionAsyncLocalStorage?.getStore();
if ((!options || !options.hasOwnProperty('session')) && asyncLocalStorage?.session != null) {
options = { ...options, session: asyncLocalStorage.session };
}

const now = this.base.now();

let res = null;
if (ordered) {
const opsToSend = [];
for (const op of ops) {
if (typeof op.model !== 'string' && !op.model?.[modelSymbol]) {
throw new MongooseError('Must specify model in Connection.prototype.bulkWrite() operations');
}
const Model = op.model[modelSymbol] ? op.model : this.model(op.model);

if (op.name == null) {
throw new MongooseError('Must specify operation name in Connection.prototype.bulkWrite()');
}
if (!castBulkWrite.cast.hasOwnProperty(op.name)) {
throw new MongooseError(`Unrecognized bulkWrite() operation name ${op.name}`);
}

await castBulkWrite.cast[op.name](Model, op, options, now);
opsToSend.push({ ...op, namespace: Model.namespace() });
}

res = await this.client.bulkWrite(opsToSend, options);
} else {
const validOps = [];
const validOpIndexes = [];
let validationErrors = [];
const asyncValidations = [];
const results = [];
for (let i = 0; i < ops.length; ++i) {
const op = ops[i];
if (typeof op.model !== 'string' && !op.model?.[modelSymbol]) {
const error = new MongooseError('Must specify model in Connection.prototype.bulkWrite() operations');
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
let Model;
try {
Model = op.model[modelSymbol] ? op.model : this.model(op.model);
} catch (error) {
validationErrors.push({ index: i, error: error });
continue;
}
if (op.name == null) {
const error = new MongooseError('Must specify operation name in Connection.prototype.bulkWrite()');
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
if (!castBulkWrite.cast.hasOwnProperty(op.name)) {
const error = new MongooseError(`Unrecognized bulkWrite() operation name ${op.name}`);
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}

let maybePromise = null;
try {
maybePromise = castBulkWrite.cast[op.name](Model, op, options, now);
} catch (error) {
validationErrors.push({ index: i, error: error });
results[i] = error;
continue;
}
if (isPromise(maybePromise)) {
asyncValidations.push(
maybePromise.then(
() => {
validOps.push({ ...op, namespace: Model.namespace() });
validOpIndexes.push(i);
},
error => {
validationErrors.push({ index: i, error: error });
results[i] = error;
}
)
);
} else {
validOps.push({ ...op, namespace: Model.namespace() });
validOpIndexes.push(i);
}
}

if (asyncValidations.length > 0) {
await Promise.all(asyncValidations);
}

validationErrors = validationErrors.
sort((v1, v2) => v1.index - v2.index).
map(v => v.error);

if (validOps.length === 0) {
if (options.throwOnValidationError && validationErrors.length) {
throw new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
);
}
return getDefaultBulkwriteResult();
}

let error;
[res, error] = await this.client.bulkWrite(validOps, options).
then(res => ([res, null])).
catch(err => ([null, err]));

if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}
}

for (let i = 0; i < validOpIndexes.length; ++i) {
results[validOpIndexes[i]] = null;
}
if (validationErrors.length > 0) {
if (options.throwOnValidationError) {
throw new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
);
} else {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
res.mongoose.results = results;
}
}
}

return res;
};

/**
* Calls `createCollection()` on a models in a series.
*
Expand Down
1 change: 0 additions & 1 deletion lib/drivers/node-mongodb-native/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ function _setClient(conn, client, options, dbName) {
}
}


/*!
* Module exports.
*/
Expand Down
Loading

0 comments on commit bdf2f2a

Please sign in to comment.