Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: pull CursorStream out of Cursor #2543

Merged
merged 42 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ee06e82
refactor: pull CursorStream out of Cursor
emadum Sep 12, 2020
4d25f65
fix
emadum Sep 12, 2020
3367d29
fix tests
emadum Sep 12, 2020
2e03d42
fix lint
emadum Sep 12, 2020
111eea6
make Cursor an EventEmitter
emadum Sep 15, 2020
4b5e096
refactor: extract ChangeStreamStream from ChangeStream
emadum Sep 16, 2020
7816e3c
fix lint and es2017 examples
emadum Sep 16, 2020
b2b93bb
refactor: use symbols
emadum Sep 16, 2020
aa838f1
fix isClosed in cursor tests
emadum Sep 16, 2020
8bd8679
add integration_tests_2 to setupDatabase
emadum Sep 16, 2020
d7cc008
Revert "refactor: use symbols"
emadum Sep 22, 2020
05ef816
Revert "fix lint and es2017 examples"
emadum Sep 22, 2020
34f350d
Revert "refactor: extract ChangeStreamStream from ChangeStream"
emadum Sep 22, 2020
ec08cdc
review feedback
emadum Sep 24, 2020
8290023
fix lint
emadum Sep 24, 2020
50037c4
added new ChangeStream stream resumability test
emadum Sep 28, 2020
6911aa2
skip piped resume test
emadum Sep 28, 2020
ef24fa7
fix tests
emadum Sep 28, 2020
ddf019c
fix for close after error test
emadum Sep 28, 2020
a25ff54
refactor: more consistent handling of final errors via `closeWithError`
emadum Sep 28, 2020
157ef99
refactor: use Symbol for ChangeStreamCursor's internal CursorStream
emadum Sep 28, 2020
f684bf1
Merge remote-tracking branch 'origin/master' into NODE-2820/remove-re…
emadum Sep 30, 2020
19117f0
fix merge
emadum Sep 30, 2020
2ccf60c
Apply suggestions from code review
emadum Sep 30, 2020
14b14fa
review feedback, round one
emadum Sep 30, 2020
2ed1705
remove unnecessary close override
emadum Sep 30, 2020
7714317
review feedback, round 2
emadum Sep 30, 2020
aa5fa57
Merge remote-tracking branch 'origin/master' into NODE-2820/remove-re…
emadum Oct 1, 2020
13bd377
review feedback
emadum Oct 1, 2020
af4c41d
fix test
emadum Oct 1, 2020
237b221
matt's fixes
emadum Oct 2, 2020
d6ac058
hacky fix for test
emadum Oct 6, 2020
34254be
better fix
emadum Oct 6, 2020
12d0901
fix lint
emadum Oct 6, 2020
05b919a
fix
emadum Oct 6, 2020
2684620
revert and skip async iterator test
emadum Oct 6, 2020
486b4ac
skip all async iterator tests pending NODE-2590
emadum Oct 6, 2020
94f08e0
Merge remote-tracking branch 'origin/master' into NODE-2820/remove-re…
emadum Oct 7, 2020
174a475
add comment explaining skipped test
emadum Oct 8, 2020
25cf9fd
add comment explaining skipped test
emadum Oct 8, 2020
cdd1397
remove triggerError on public API
mbroadst Oct 13, 2020
175ed39
revert test and style changes
mbroadst Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 66 additions & 117 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Denque = require('denque');
import { EventEmitter } from 'events';
import { MongoError, AnyError, isResumableError } from './error';
import { Cursor } from './cursor';
import { Cursor, CursorOptions, CursorStream } from './cursor/cursor';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { loadCollection, loadDb, loadMongoClient } from './dynamic_loaders';
import {
Expand All @@ -13,15 +13,14 @@ import {
MongoDBNamespace,
Callback
} from './utils';
import type { CursorOptions } from './cursor/cursor';
import type { ReadPreference } from './read_preference';
import type { Timestamp, Document } from './bson';
import type { Topology } from './sdam/topology';
import type { Writable } from 'stream';
import type { StreamOptions } from './cursor/core_cursor';
import type { OperationParent } from './operations/command';
import type { CollationOptions } from './cmap/wire_protocol/write_command';
import type { CursorStreamOptions } from './cursor/core_cursor';
const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand All @@ -34,6 +33,11 @@ const CHANGE_DOMAIN_TYPES = {
CLUSTER: Symbol('Cluster')
};

const NO_RESUME_TOKEN_ERROR = new MongoError(
'A change stream document has been received that lacks a resume token (_id).'
);
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');

/** @public */
export interface ResumeOptions {
startAtOperationTime?: Timestamp;
Expand Down Expand Up @@ -155,6 +159,12 @@ interface UpdateDescription {
removedFields: string[];
}

export class ChangeStreamStream extends CursorStream {
constructor(cursor: ChangeStreamCursor) {
super(cursor);
}
}

/**
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @public
Expand All @@ -168,9 +178,9 @@ export class ChangeStream extends EventEmitter {
topology: Topology;
cursor?: ChangeStreamCursor;
closed: boolean;
pipeDestinations: Writable[] = [];
streamOptions?: StreamOptions;
streamOptions?: CursorStreamOptions;
[kResumeQueue]: Denque;
[kCursorStream]?: CursorStream;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -239,20 +249,24 @@ export class ChangeStream extends EventEmitter {
this.closed = false;

// Listen for any `change` listeners being added to ChangeStream
mbroadst marked this conversation as resolved.
Show resolved Hide resolved
this.on('newListener', (eventName: string) => {
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
this.cursor.on('data', change => processNewChange(this, change));
streamEvents(this, this.cursor);
}
});

// Listen for all `change` listeners being removed from ChangeStream
this.on('removeListener', (eventName: string) => {
this.on('removeListener', eventName => {
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
this.cursor.removeAllListeners('data');
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
}
});
}

/** @internal */
get cursorStream(): CursorStream | undefined {
return this[kCursorStream];
}

/** The cached resume token that is used to resume after the most recently returned change. */
get resumeToken(): ResumeToken {
return this.cursor?.resumeToken;
Expand Down Expand Up @@ -305,83 +319,24 @@ export class ChangeStream extends EventEmitter {
const cursor = this.cursor;

return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
endStream(this);
this.cursor = undefined;

return cb(err);
});
});
}

/**
* This method pulls all the data out of a readable stream, and writes it to the supplied destination,
* automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
*
* @param destination - The destination for writing data
* @param options - {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options| NodeJS Pipe options}
* @throws MongoError if this.cursor is undefined
*/
pipe(destination: Writable, options?: PipeOptions): Writable {
if (!this.pipeDestinations) {
this.pipeDestinations = [];
}
this.pipeDestinations.push(destination);
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to pipe');
}
return this.cursor.pipe(destination, options);
}

/**
* This method will remove the hooks set up for a previous pipe() call.
*
* @param destination - The destination for writing data
* @throws MongoError if this.cursor is undefined
*/
unpipe(destination?: Writable): ChangeStreamCursor {
const destinationIndex = destination ? this.pipeDestinations.indexOf(destination) : -1;
if (this.pipeDestinations && destinationIndex > -1) {
this.pipeDestinations.splice(destinationIndex, 1);
}
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to unpipe');
}
return this.cursor.unpipe(destination);
}

/**
* Return a modified Readable stream including a possible transform method.
* @throws MongoError if this.cursor is undefined
*/
stream(options?: StreamOptions): ChangeStreamCursor {
stream(options?: CursorStreamOptions): ChangeStreamStream {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
}
return this.cursor.stream(options);
}

/**
* This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
* @throws MongoError if this.cursor is undefined
*/
pause(): ChangeStreamCursor {
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to pause');
}
return this.cursor.pause();
}

/**
* This method will cause the readable stream to resume emitting data events.
* @throws MongoError if this.cursor is undefined
*/
resume(): ChangeStreamCursor {
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to resume');
}
return this.cursor.resume();
}
}

/** @public */
Expand Down Expand Up @@ -524,7 +479,6 @@ function createChangeStreamCursor(

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand All @@ -533,23 +487,7 @@ function createChangeStreamCursor(

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);

if (self.listenerCount(ChangeStream.CHANGE) > 0) {
changeStreamCursor.on(ChangeStreamCursor.DATA, function (change) {
processNewChange(self, change);
});
}

changeStreamCursor.on(ChangeStream.ERROR, function (error) {
processError(self, error);
});

if (self.pipeDestinations) {
const cursorStream = changeStreamCursor.stream(self.streamOptions);
for (const pipeDestination of self.pipeDestinations) {
cursorStream.pipe(pipeDestination);
}
}

if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
return changeStreamCursor;
}

Expand Down Expand Up @@ -595,28 +533,48 @@ function waitForTopologyConnected(
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
changeStream.close(() => callback && callback(error));
}

function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
}

function endStream(changeStream: ChangeStream): void {
const cursorStream = changeStream[kCursorStream];
if (cursorStream) {
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
cursorStream.removeAllListeners(event)
);

cursorStream.destroy();
}

changeStream[kCursorStream] = undefined;
}

function processNewChange(
changeStream: ChangeStream,
change: ChangeStreamDocument,
callback?: Callback
) {
// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
changeStream.closed = true;
}

if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
return;
}

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);
// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
return closeWithError(changeStream, CHANGESTREAM_CLOSED_ERROR, callback);
}

if (!callback) return changeStream.emit(ChangeStream.ERROR, noResumeTokenError);
return callback(noResumeTokenError);
if (change && !change._id) {
return closeWithError(changeStream, NO_RESUME_TOKEN_ERROR, callback);
}

// cache the resume token
Expand All @@ -631,7 +589,7 @@ function processNewChange(
return callback(undefined, change);
}

function processError(changeStream: ChangeStream, error?: AnyError, callback?: Callback) {
function processError(changeStream: ChangeStream, error: AnyError, callback?: Callback) {
const topology = changeStream.topology;
const cursor = changeStream.cursor;

Expand All @@ -649,24 +607,15 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C

// otherwise, raise an error and close the change stream
function unresumableError(err: AnyError) {
if (!callback) {
changeStream.emit(ChangeStream.ERROR, err);
changeStream.emit(ChangeStream.CLOSE);
}
processResumeQueue(changeStream, err);
changeStream.closed = true;
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
changeStream.close(() => processResumeQueue(changeStream, err));
}

if (cursor && isResumableError(error as MongoError, maxWireVersion(cursor.server))) {
changeStream.cursor = undefined;

// stop listening to all events from old cursor
[
ChangeStreamCursor.DATA,
ChangeStreamCursor.CLOSE,
ChangeStreamCursor.END,
ChangeStreamCursor.ERROR
].forEach(event => cursor.removeAllListeners(event));
endStream(changeStream);

// close internal cursor, ignore errors
cursor.close();
Expand All @@ -691,8 +640,8 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C
return;
}

if (!callback) return changeStream.emit(ChangeStream.ERROR, error);
return callback(error);
// if initial error wasn't resumable, raise an error and close the change stream
return closeWithError(changeStream, error, callback);
}

/**
Expand Down
Loading