Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -648,21 +648,21 @@ export class ChangeStream<
hasNext(callback?: Callback): Promise<boolean> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
for (;;) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -675,23 +675,22 @@ export class ChangeStream<
next(callback?: Callback<TChange>): Promise<TChange> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
for (;;) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -706,21 +705,21 @@ export class ChangeStream<
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
for (;;) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand Down
50 changes: 0 additions & 50 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1037,56 +1037,6 @@ describe('Change Streams', function () {
});

describe('Change Stream Resume Error Tests', function () {
describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () {
let client: MongoClient;
let changeStream: ChangeStream;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
});

afterEach(async function () {
await changeStream.close();
await client.close();
});

it('should support consecutive resumes', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } },
async test() {
const failCommand: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['getMore'],
closeConnection: true
}
};

await client.db('admin').command(failCommand);

const collection = client.db('test_consecutive_resume').collection('collection');

changeStream = collection.watch([], { batchSize: 1 });

await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });

await sleep(1000);

for (let i = 0; i < 3; ++i) {
const change = await changeStream.next();
expect(change).not.to.be.null;
}
}
});
});

it.skip('should continue piping changes after a resumable error', {
metadata: { requires: { topology: 'replicaset' } },
test: done => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('Change Streams Spec - Unified', function () {
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test =>
test.description === 'Test consecutive resume'
? 'TODO(NODE-4670): fix consecutive resume change stream test'
: false
);
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
});
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => {
return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0';
}

if (description === 'Test consecutive resume') {
return 'TODO(NODE-4670): fix consecutive resume change stream test';
}

if (
process.env.AUTH === 'auth' &&
[
Expand Down