From 5ad9fa94d4d3a8921f88d4e22e864d8ef10ad151 Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Tue, 22 Jan 2019 19:04:48 -0500 Subject: [PATCH] fix(changeStream): properly handle changeStream event mid-close (#1902) If a changeStream gets an event while it is in the middle of closing, a race condition can occur where the event is still processed after the stream has closed. This commit adds handling for this edge case by returning an error to callbacks, rejecting promises, and simply ignoring it in emitter mode. Fixes NODE-1831 --- lib/change_stream.js | 14 ++++ test/functional/change_stream_tests.js | 105 +++++++++++++++++++++++++ 2 files changed, 119 insertions(+) diff --git a/lib/change_stream.js b/lib/change_stream.js index 1d2bbe69ea..21f2428542 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -368,6 +368,20 @@ function processNewChange(args) { const change = args.change; const callback = args.callback; const eventEmitter = args.eventEmitter || false; + + // If the changeStream is closed, then it should not process a change. + if (changeStream.isClosed()) { + // We do not error in the eventEmitter case. + if (eventEmitter) { + return; + } + + const error = new MongoError('ChangeStream is closed'); + return typeof callback === 'function' + ? callback(error, null) + : changeStream.promiseLibrary.reject(error); + } + const topology = changeStream.topology; const options = changeStream.cursor.options; diff --git a/test/functional/change_stream_tests.js b/test/functional/change_stream_tests.js index b3dc38b952..114bc0a8b8 100644 --- a/test/functional/change_stream_tests.js +++ b/test/functional/change_stream_tests.js @@ -1677,4 +1677,109 @@ describe('Change Streams', function() { .then(() => finish(), err => finish(err)); } }); + + describe('should properly handle a changeStream event being processed mid-close', function() { + let client, coll; + + function write() { + return Promise.resolve() + .then(() => coll.insertOne({ a: 1 })) + .then(() => coll.insertOne({ b: 2 })) + .then(() => coll.insertOne({ c: 3 })); + } + + beforeEach(function() { + client = this.configuration.newClient(); + return client.connect().then(_client => { + client = _client; + coll = client.db(this.configuration.db).collection('tester'); + }); + }); + + afterEach(function() { + coll = undefined; + if (client) { + return client.close().then(() => { + client = undefined; + }); + } + }); + + it('when invoked with promises', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function() { + function read() { + const changeStream = coll.watch(); + return Promise.resolve() + .then(() => changeStream.next()) + .then(() => changeStream.next()) + .then(() => { + const nextP = changeStream.next(); + + return changeStream.close().then(() => nextP); + }); + } + + return Promise.all([read(), write()]).then( + () => Promise.reject(new Error('Expected operation to fail with error')), + err => expect(err.message).to.equal('ChangeStream is closed') + ); + } + }); + + it('when invoked with callbacks', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function(done) { + const changeStream = coll.watch(); + + changeStream.next(() => { + changeStream.next(() => { + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed'); + } catch (e) { + _err = e; + } finally { + done(_err); + } + }); + changeStream.close(); + }); + }); + + write().catch(() => {}); + } + }); + + it('when invoked using eventEmitter API', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, + test: function(done) { + let closed = false; + const close = _err => { + if (closed) { + return; + } + closed = true; + return done(_err); + }; + + const changeStream = coll.watch(); + + let counter = 0; + changeStream.on('change', () => { + counter += 1; + if (counter === 2) { + changeStream.close(); + setTimeout(() => close()); + } else if (counter >= 3) { + close(new Error('Should not have received more than 2 events')); + } + }); + changeStream.on('error', err => close(err)); + + setTimeout(() => write().catch(() => {})); + } + }); + }); });