diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index ec5cac5705f..55cdecfcdc2 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -33,15 +33,18 @@ class ChangeStream extends EventEmitter { ); } - // This wrapper is necessary because of buffering. - changeStreamThunk((err, driverChangeStream) => { - if (err != null) { - this.emit('error', err); - return; - } + this.$driverChangeStreamPromise = new Promise((resolve, reject) => { + // This wrapper is necessary because of buffering. + changeStreamThunk((err, driverChangeStream) => { + if (err != null) { + this.emit('error', err); + return reject(err); + } - this.driverChangeStream = driverChangeStream; - this.emit('ready'); + this.driverChangeStream = driverChangeStream; + this.emit('ready'); + resolve(); + }); }); } @@ -53,20 +56,23 @@ class ChangeStream extends EventEmitter { this.bindedEvents = true; if (this.driverChangeStream == null) { - this.once('ready', () => { - this.driverChangeStream.on('close', () => { - this.closed = true; - }); + this.$driverChangeStreamPromise.then( + () => { + this.driverChangeStream.on('close', () => { + this.closed = true; + }); - driverChangeStreamEvents.forEach(ev => { - this.driverChangeStream.on(ev, data => { - if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { - data.fullDocument = this.options.model.hydrate(data.fullDocument); - } - this.emit(ev, data); + driverChangeStreamEvents.forEach(ev => { + this.driverChangeStream.on(ev, data => { + if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { + data.fullDocument = this.options.model.hydrate(data.fullDocument); + } + this.emit(ev, data); + }); }); - }); - }); + }, + () => {} // No need to register events if opening change stream failed + ); return; } @@ -142,8 +148,12 @@ class ChangeStream extends EventEmitter { this.closed = true; if (this.driverChangeStream) { return this.driverChangeStream.close(); + } else { + return this.$driverChangeStreamPromise.then( + () => this.driverChangeStream.close(), + () => {} // No need to close if opening the change stream failed + ); } - return Promise.resolve(); } } diff --git a/lib/cursor/queryCursor.js b/lib/cursor/queryCursor.js index 2c908de50d0..6f00a316794 100644 --- a/lib/cursor/queryCursor.js +++ b/lib/cursor/queryCursor.js @@ -43,6 +43,7 @@ function QueryCursor(query) { this.cursor = null; this.skipped = false; this.query = query; + this._closed = false; const model = query.model; this._mongooseOptions = {}; this._transforms = []; @@ -229,6 +230,7 @@ QueryCursor.prototype.close = async function close() { } try { await this.cursor.close(); + this._closed = true; this.emit('close'); } catch (error) { this.listeners('error').length > 0 && this.emit('error', error); @@ -266,6 +268,9 @@ QueryCursor.prototype.next = async function next() { if (typeof arguments[0] === 'function') { throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback'); } + if (this._closed) { + throw new MongooseError('Cannot call `next()` on a closed cursor'); + } return new Promise((resolve, reject) => { _next(this, function(error, doc) { if (error) { diff --git a/package.json b/package.json index 013c7325c21..9c9097adc18 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "dependencies": { "bson": "^6.7.0", "kareem": "2.6.3", - "mongodb": "6.7.0", + "mongodb": "6.8.0", "mpath": "0.9.0", "mquery": "5.0.0", "ms": "2.1.3", diff --git a/scripts/tsc-diagnostics-check.js b/scripts/tsc-diagnostics-check.js index 3e1f6c66282..c23da8c1f55 100644 --- a/scripts/tsc-diagnostics-check.js +++ b/scripts/tsc-diagnostics-check.js @@ -3,7 +3,7 @@ const fs = require('fs'); const stdin = fs.readFileSync(0).toString('utf8'); -const maxInstantiations = isNaN(process.argv[2]) ? 127500 : parseInt(process.argv[2], 10); +const maxInstantiations = isNaN(process.argv[2]) ? 135000 : parseInt(process.argv[2], 10); console.log(stdin); diff --git a/test/connection.test.js b/test/connection.test.js index d395be5511b..03f87b40f3d 100644 --- a/test/connection.test.js +++ b/test/connection.test.js @@ -1032,6 +1032,8 @@ describe('connections:', function() { await nextChange; assert.equal(changes.length, 1); assert.equal(changes[0].operationType, 'insert'); + + await changeStream.close(); await conn.close(); }); diff --git a/test/model.test.js b/test/model.test.js index b73757e4721..f2d88586e63 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -7,6 +7,7 @@ const sinon = require('sinon'); const start = require('./common'); const assert = require('assert'); +const { once } = require('events'); const random = require('./util').random; const util = require('./util'); @@ -3508,6 +3509,9 @@ describe('Model', function() { } changeStream.removeListener('change', listener); listener = null; + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); changeStream.close(); changeStream = null; }); @@ -3560,14 +3564,21 @@ describe('Model', function() { it('fullDocument (gh-11936)', async function() { const MyModel = db.model('Test', new Schema({ name: String })); + const doc = await MyModel.create({ name: 'Ned Stark' }); const changeStream = await MyModel.watch([], { fullDocument: 'updateLookup', hydrate: true }); + await changeStream.$driverChangeStreamPromise; - const doc = await MyModel.create({ name: 'Ned Stark' }); - - const p = changeStream.next(); + const p = new Promise((resolve) => { + changeStream.once('change', change => { + resolve(change); + }); + }); + // Need to wait for resume token to be set after the event listener, + // otherwise change stream might not pick up the update. + await once(changeStream.driverChangeStream, 'resumeTokenChanged'); await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' }); const changeData = await p; @@ -3576,6 +3587,8 @@ describe('Model', function() { doc._id.toHexString()); assert.ok(changeData.fullDocument.$__); assert.equal(changeData.fullDocument.get('name'), 'Tony Stark'); + + await changeStream.close(); }); it('fullDocument with immediate watcher and hydrate (gh-14049)', async function() { @@ -3583,15 +3596,22 @@ describe('Model', function() { const doc = await MyModel.create({ name: 'Ned Stark' }); + let changeStream = null; const p = new Promise((resolve) => { - MyModel.watch([], { + changeStream = MyModel.watch([], { fullDocument: 'updateLookup', hydrate: true - }).on('change', change => { + }); + + changeStream.on('change', change => { resolve(change); }); }); + // Need to wait for cursor to be initialized and for resume token to + // be set, otherwise change stream might not pick up the update. + await changeStream.$driverChangeStreamPromise; + await once(changeStream.driverChangeStream, 'resumeTokenChanged'); await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' }); const changeData = await p; @@ -3600,6 +3620,8 @@ describe('Model', function() { doc._id.toHexString()); assert.ok(changeData.fullDocument.$__); assert.equal(changeData.fullDocument.get('name'), 'Tony Stark'); + + await changeStream.close(); }); it('respects discriminators (gh-11007)', async function() { @@ -3639,6 +3661,9 @@ describe('Model', function() { assert.equal(changeData.operationType, 'insert'); assert.equal(changeData.fullDocument.name, 'Ned Stark'); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); await changeStream.close(); await db.close(); }); @@ -3654,11 +3679,16 @@ describe('Model', function() { setTimeout(resolve, 500, false); }); - changeStream.close(); - await db; + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); + + const close = changeStream.close(); + await db.asPromise(); const readyCalled = await ready; assert.strictEqual(readyCalled, false); + await close; await db.close(); }); @@ -3675,6 +3705,10 @@ describe('Model', function() { await MyModel.create({ name: 'Hodor' }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); + changeStream.close(); const closedData = await closed; assert.strictEqual(closedData, true); diff --git a/test/model.watch.test.js b/test/model.watch.test.js index 84d41dc5b14..3e2ad733a24 100644 --- a/test/model.watch.test.js +++ b/test/model.watch.test.js @@ -37,18 +37,22 @@ describe('model: watch: ', function() { const changeData = await changed; assert.equal(changeData.operationType, 'insert'); assert.equal(changeData.fullDocument.name, 'Ned Stark'); + await changeStream.close(); }); it('watch() close() prevents buffered watch op from running (gh-7022)', async function() { const MyModel = db.model('Test', new Schema({})); const changeStream = MyModel.watch(); - const ready = new global.Promise(resolve => { + const ready = new Promise(resolve => { changeStream.once('data', () => { resolve(true); }); setTimeout(resolve, 500, false); }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); const close = changeStream.close(); await db.asPromise(); const readyCalled = await ready; @@ -64,12 +68,16 @@ describe('model: watch: ', function() { await MyModel.init(); const changeStream = MyModel.watch(); - const closed = new global.Promise(resolve => { + const closed = new Promise(resolve => { changeStream.once('close', () => resolve(true)); }); await MyModel.create({ name: 'Hodor' }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); + await changeStream.close(); const closedData = await closed; diff --git a/test/query.cursor.test.js b/test/query.cursor.test.js index a5f7afc2027..d80264c5f2d 100644 --- a/test/query.cursor.test.js +++ b/test/query.cursor.test.js @@ -415,7 +415,8 @@ describe('QueryCursor', function() { await cursor.next(); assert.ok(false); } catch (error) { - assert.equal(error.name, 'MongoCursorExhaustedError'); + assert.equal(error.name, 'MongooseError'); + assert.ok(error.message.includes('closed cursor'), error.message); } }); });