From 4588ff2fa68cffb5f6769390d8c3eabe6926d483 Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 29 Oct 2024 14:09:25 -0400 Subject: [PATCH] feat(NODE-6387): Add CSOT support to change streams (#4256) --- src/change_stream.ts | 177 +++++++++---- src/cmap/connection.ts | 8 +- src/cursor/abstract_cursor.ts | 47 ++-- src/cursor/change_stream_cursor.ts | 2 +- ...ient_side_operations_timeout.prose.test.ts | 52 ++-- ...lient_side_operations_timeout.spec.test.ts | 14 +- .../node_csot.test.ts | 233 +++++++++++++++++- .../change-streams.json | 135 ++++++++++ .../tailable-awaitData.json | 83 +++++++ .../node-specific/abstract_cursor.test.ts | 52 ++-- 10 files changed, 676 insertions(+), 127 deletions(-) create mode 100644 test/integration/client-side-operations-timeout/unified-csot-node-specs/change-streams.json diff --git a/src/change_stream.ts b/src/change_stream.ts index 34f92a4477c..ae57fb45f95 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -3,7 +3,7 @@ import type { Readable } from 'stream'; import type { Binary, Document, Timestamp } from './bson'; import { Collection } from './collection'; import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants'; -import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor'; +import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor'; import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor'; import { Db } from './db'; import { @@ -11,6 +11,7 @@ import { isResumableError, MongoAPIError, MongoChangeStreamError, + MongoOperationTimeoutError, MongoRuntimeError } from './error'; import { MongoClient } from './mongo_client'; @@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command'; import type { ReadPreference } from './read_preference'; import { type AsyncDisposable, configureResourceManagement } from './resource_management'; import type { ServerSessionId } from './sessions'; +import { CSOTTimeoutContext, type TimeoutContext } from './timeout'; import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; /** @internal */ @@ -538,7 +540,12 @@ export type ChangeStreamEvents< end(): void; error(error: Error): void; change(change: TChange): void; -} & AbstractCursorEvents; + /** + * @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor` + * instance is closed, which can occur multiple times for a given `ChangeStream` instance. + */ + close(): void; +}; /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. @@ -609,6 +616,13 @@ export class ChangeStream< */ static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED; + private timeoutContext?: TimeoutContext; + /** + * Note that this property is here to uniquely identify a ChangeStream instance as the owner of + * the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure + * that {@link AbstractCursor.close} does not mutate the timeoutContext. + */ + private contextOwner: symbol; /** * @internal * @@ -624,20 +638,25 @@ export class ChangeStream< this.pipeline = pipeline; this.options = { ...options }; + let serverSelectionTimeoutMS: number; delete this.options.writeConcern; if (parent instanceof Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; + serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS; } else if (parent instanceof Db) { this.type = CHANGE_DOMAIN_TYPES.DATABASE; + serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS; } else if (parent instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; + serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS; } else { throw new MongoChangeStreamError( 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient' ); } + this.contextOwner = Symbol(); this.parent = parent; this.namespace = parent.s.namespace; if (!this.options.readPreference && parent.readPreference) { @@ -662,6 +681,13 @@ export class ChangeStream< this[kCursorStream]?.removeAllListeners('data'); } }); + + if (this.options.timeoutMS != null) { + this.timeoutContext = new CSOTTimeoutContext({ + timeoutMS: this.options.timeoutMS, + serverSelectionTimeoutMS + }); + } } /** @internal */ @@ -681,22 +707,30 @@ export class ChangeStream< // This loop continues until either a change event is received or until a resume attempt // fails. - while (true) { - try { - const hasNext = await this.cursor.hasNext(); - return hasNext; - } catch (error) { + this.timeoutContext?.refresh(); + try { + while (true) { try { - await this._processErrorIteratorMode(error); + const hasNext = await this.cursor.hasNext(); + return hasNext; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, this.cursor.id != null); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) { + throw error; + } + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -706,24 +740,32 @@ export class ChangeStream< // Change streams must resume indefinitely while each resume event succeeds. // This loop continues until either a change event is received or until a resume attempt // fails. + this.timeoutContext?.refresh(); - while (true) { - try { - const change = await this.cursor.next(); - const processedChange = this._processChange(change ?? null); - return processedChange; - } catch (error) { + try { + while (true) { try { - await this._processErrorIteratorMode(error); + const change = await this.cursor.next(); + const processedChange = this._processChange(change ?? null); + return processedChange; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, this.cursor.id != null); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) { + throw error; + } + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -735,23 +777,29 @@ export class ChangeStream< // Change streams must resume indefinitely while each resume event succeeds. // This loop continues until either a change event is received or until a resume attempt // fails. + this.timeoutContext?.refresh(); - while (true) { - try { - const change = await this.cursor.tryNext(); - return change ?? null; - } catch (error) { + try { + while (true) { try { - await this._processErrorIteratorMode(error); + const change = await this.cursor.tryNext(); + return change ?? null; } catch (error) { try { - await this.close(); + await this._processErrorIteratorMode(error, this.cursor.id != null); } catch (error) { - squashError(error); + if (error instanceof MongoOperationTimeoutError && this.cursor.id == null) throw error; + try { + await this.close(); + } catch (error) { + squashError(error); + } + throw error; } - throw error; } } + } finally { + this.timeoutContext?.clear(); } } @@ -784,6 +832,8 @@ export class ChangeStream< * Frees the internal resources used by the change stream. */ async close(): Promise { + this.timeoutContext?.clear(); + this.timeoutContext = undefined; this[kClosed] = true; const cursor = this.cursor; @@ -866,7 +916,12 @@ export class ChangeStream< client, this.namespace, pipeline, - options + { + ...options, + timeoutContext: this.timeoutContext + ? new CursorTimeoutContext(this.timeoutContext, this.contextOwner) + : undefined + } ); for (const event of CHANGE_STREAM_EVENTS) { @@ -899,8 +954,9 @@ export class ChangeStream< } catch (error) { this.emit(ChangeStream.ERROR, error); } + this.timeoutContext?.refresh(); }); - stream.on('error', error => this._processErrorStreamMode(error)); + stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null)); } /** @internal */ @@ -942,24 +998,30 @@ export class ChangeStream< } /** @internal */ - private _processErrorStreamMode(changeStreamError: AnyError) { + private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) { // If the change stream has been closed explicitly, do not process error. if (this[kClosed]) return; - if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + if ( + cursorInitialized && + (isResumableError(changeStreamError, this.cursor.maxWireVersion) || + changeStreamError instanceof MongoOperationTimeoutError) + ) { this._endStream(); - this.cursor.close().then(undefined, squashError); - - const topology = getTopology(this.parent); - topology - .selectServer(this.cursor.readPreference, { - operationName: 'reconnect topology in change stream' - }) - + this.cursor + .close() + .then( + () => this._resume(changeStreamError), + e => { + squashError(e); + return this._resume(changeStreamError); + } + ) .then( () => { - this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); + if (changeStreamError instanceof MongoOperationTimeoutError) + this.emit(ChangeStream.ERROR, changeStreamError); }, () => this._closeEmitterModeWithError(changeStreamError) ); @@ -969,33 +1031,44 @@ export class ChangeStream< } /** @internal */ - private async _processErrorIteratorMode(changeStreamError: AnyError) { + private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } if ( - this.cursor.id == null || - !isResumableError(changeStreamError, this.cursor.maxWireVersion) + cursorInitialized && + (isResumableError(changeStreamError, this.cursor.maxWireVersion) || + changeStreamError instanceof MongoOperationTimeoutError) ) { + try { + await this.cursor.close(); + } catch (error) { + squashError(error); + } + + await this._resume(changeStreamError); + + if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError; + } else { try { await this.close(); } catch (error) { squashError(error); } + throw changeStreamError; } + } - try { - await this.cursor.close(); - } catch (error) { - squashError(error); - } + private async _resume(changeStreamError: AnyError) { + this.timeoutContext?.refresh(); const topology = getTopology(this.parent); try { await topology.selectServer(this.cursor.readPreference, { - operationName: 'reconnect topology in change stream' + operationName: 'reconnect topology in change stream', + timeoutContext: this.timeoutContext }); this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); } catch { diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 6b1d3c24171..ca7c86a0bad 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter { responseType?: MongoDBResponseConstructor ) { const message = this.prepareCommand(ns.db, command, options); - let started = 0; if (this.shouldEmitAndLogCommand) { started = now(); @@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter { try { return await Promise.race([drainEvent, timeout]); } catch (error) { + let err = error; if (TimeoutError.is(error)) { - throw new MongoOperationTimeoutError('Timed out at socket write'); + err = new MongoOperationTimeoutError('Timed out at socket write'); + this.cleanup(err); } throw error; } finally { @@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter { } } } catch (readError) { + const err = readError; if (TimeoutError.is(readError)) { const error = new MongoOperationTimeoutError( `Timed out during socket read (${readError.duration}ms)` @@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter { this.onError(error); throw error; } - throw readError; + throw err; } finally { this.dataEvents = null; this.messageStream.pause(); diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 4188c1e943e..4eb5904f433 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -171,7 +171,11 @@ export abstract class AbstractCursor< private cursorClient: MongoClient; /** @internal */ private transform?: (doc: TSchema) => any; - /** @internal */ + /** + * @internal + * This is true whether or not the first command fails. It only indicates whether or not the first + * command has been run. + */ private initialized: boolean; /** @internal */ private isClosed: boolean; @@ -211,15 +215,16 @@ export abstract class AbstractCursor< ? options.readPreference : ReadPreference.primary, ...pluckBSONSerializeOptions(options), - timeoutMS: options.timeoutMS, + timeoutMS: options?.timeoutContext?.csotEnabled() + ? options.timeoutContext.timeoutMS + : options.timeoutMS, tailable: options.tailable, awaitData: options.awaitData }; + if (this.cursorOptions.timeoutMS != null) { if (options.timeoutMode == null) { if (options.tailable) { - this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; - if (options.awaitData) { if ( options.maxAwaitTimeMS != null && @@ -229,22 +234,21 @@ export abstract class AbstractCursor< 'Cannot specify maxAwaitTimeMS >= timeoutMS for a tailable awaitData cursor' ); } + + this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; } else { this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME; } } else { - if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { + if (options.tailable && options.timeoutMode === CursorTimeoutMode.LIFETIME) { throw new MongoInvalidArgumentError( "Cannot set tailable cursor's timeoutMode to LIFETIME" ); } this.cursorOptions.timeoutMode = options.timeoutMode; } - this.cursorOptions.timeoutMode = - options.timeoutMode ?? - (options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME); } else { - if (options.timeoutMode != null && options.timeoutContext == null) + if (options.timeoutMode != null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); } @@ -291,15 +295,6 @@ export abstract class AbstractCursor< } }; - if ( - options.timeoutContext != null && - options.timeoutMS != null && - this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME - ) { - throw new MongoAPIError( - `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.` - ); - } this.timeoutContext = options.timeoutContext; } @@ -490,7 +485,7 @@ export abstract class AbstractCursor< await this.fetchBatch(); } while (!this.isDead || (this.documents?.length ?? 0) !== 0); } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -503,6 +498,7 @@ export abstract class AbstractCursor< if (this.cursorId === Long.ZERO) { throw new MongoCursorExhaustedError(); } + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { this.timeoutContext?.refresh(); } @@ -517,7 +513,7 @@ export abstract class AbstractCursor< await this.fetchBatch(); } while (!this.isDead || (this.documents?.length ?? 0) !== 0); } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -551,7 +547,7 @@ export abstract class AbstractCursor< return doc; } } finally { - if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) { + if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION) { this.timeoutContext?.clear(); } } @@ -888,7 +884,6 @@ export abstract class AbstractCursor< // otherwise need to call getMore const batchSize = this.cursorOptions.batchSize || 1000; - this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null; try { const response = await this.getMore(batchSize); @@ -1132,15 +1127,19 @@ export class CursorTimeoutContext extends TimeoutContext { return this.timeoutContext.csotEnabled(); } override refresh(): void { - return this.timeoutContext.refresh(); + if (typeof this.owner !== 'symbol') return this.timeoutContext.refresh(); } override clear(): void { - return this.timeoutContext.clear(); + if (typeof this.owner !== 'symbol') return this.timeoutContext.clear(); } override get maxTimeMS(): number | null { return this.timeoutContext.maxTimeMS; } + get timeoutMS(): number | null { + return this.timeoutContext.csotEnabled() ? this.timeoutContext.timeoutMS : null; + } + override refreshed(): CursorTimeoutContext { return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner); } diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts index 13f58675552..73a256cdeea 100644 --- a/src/cursor/change_stream_cursor.ts +++ b/src/cursor/change_stream_cursor.ts @@ -55,7 +55,7 @@ export class ChangeStreamCursor< pipeline: Document[] = [], options: ChangeStreamCursorOptions = {} ) { - super(client, namespace, options); + super(client, namespace, { ...options, tailable: true, awaitData: true }); this.pipeline = pipeline; this.changeStreamCursorOptions = options; diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index c7d5173a50e..6094e2f8a61 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -444,26 +444,38 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a timeout error. * 1. Verify that an `aggregate` command and two `getMore` commands were executed against the `db.coll` collection during the test. */ - it.skip('sends correct number of aggregate and getMores', metadata, async function () { - const changeStream = client - .db('db') - .collection('coll') - .watch([], { timeoutMS: 20, maxAwaitTimeMS: 19 }); - const maybeError = await changeStream.next().then( - () => null, - e => e - ); - - expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); - const aggregates = commandStarted - .filter(e => e.command.aggregate != null) - .map(e => e.command); - const getMores = commandStarted.filter(e => e.command.getMore != null).map(e => e.command); - // Expect 1 aggregate - expect(aggregates).to.have.lengthOf(1); - // Expect 2 getMores - expect(getMores).to.have.lengthOf(2); - }).skipReason = 'TODO(NODE-6387)'; + it( + 'sends correct number of aggregate and getMores', + { requires: { mongodb: '>=4.4', topology: '!single' } }, + async function () { + // NOTE: we don't check for a non-zero ID since we lazily send the initial aggregate to the + // server. See ChangeStreamCursor._initialize + const changeStream = client + .db('db') + .collection('coll') + .watch([], { timeoutMS: 120, maxAwaitTimeMS: 10 }); + + // @ts-expect-error private method + await changeStream.cursor.cursorInit(); + + const maybeError = await changeStream.next().then( + () => null, + e => e + ); + + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); + const aggregates = commandStarted + .filter(e => e.command.aggregate != null) + .map(e => e.command); + const getMores = commandStarted + .filter(e => e.command.getMore != null) + .map(e => e.command); + // Expect 1 aggregate + expect(aggregates).to.have.lengthOf(1); + // Expect 2 getMores + expect(getMores).to.have.lengthOf(2); + } + ); }); }); diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index 6708d7da89f..2ea3a38bb79 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -4,12 +4,7 @@ import * as semver from 'semver'; import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; -const skippedSpecs = { - 'change-streams': 'TODO(NODE-6035)', - 'convenient-transactions': 'TODO(NODE-5687)', - 'tailable-awaitData': 'TODO(NODE-6035)', - 'tailable-non-awaitData': 'TODO(NODE-6035)' -}; +const skippedSpecs = {}; const skippedTests = { 'timeoutMS can be configured on a MongoClient - createChangeStream on client': 'TODO(NODE-6305)', @@ -25,11 +20,10 @@ const skippedTests = { 'TODO(DRIVERS-2965)', 'maxTimeMS value in the command is less than timeoutMS': 'TODO(DRIVERS-2970): see modified test in unified-csot-node-specs', - 'Tailable cursor awaitData iteration timeoutMS is refreshed for getMore - failure': - 'TODO(DRIVERS-2965)', - 'Tailable cursor iteration timeoutMS is refreshed for getMore - failure': 'TODO(DRIVERS-2965)', 'timeoutMS is refreshed for getMore - failure': - 'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs' // Skipping for both tailable awaitData and tailable non-awaitData cursors + 'TODO(DRIVERS-2965): see modified test in unified-csot-node-specs', // Skipping for both tailable awaitData and tailable non-awaitData cursors + 'timeoutMS applies to full resume attempt in a next call': 'TODO(DRIVERS-3006)', + 'timeoutMS is refreshed for getMore if maxAwaitTimeMS is set': 'TODO(DRIVERS-3018)' }; describe('CSOT spec tests', function () { diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 12b380d8f1a..b63ffa8da22 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -1,5 +1,5 @@ /* Anything javascript specific relating to timeouts */ -import { once } from 'node:events'; +import { on, once } from 'node:events'; import { Readable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { setTimeout } from 'node:timers/promises'; @@ -10,6 +10,8 @@ import * as sinon from 'sinon'; import { BSON, + type ChangeStream, + type ChangeStreamDocument, type ClientSession, type Collection, type CommandFailedEvent, @@ -24,7 +26,9 @@ import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoServerError, - ObjectId + ObjectId, + promiseWithResolvers, + TopologyType } from '../../mongodb'; import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils'; @@ -815,6 +819,231 @@ describe('CSOT driver tests', metadata, () => { }); }); + describe('Change Streams', function () { + const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: '!single' } }; + let internalClient: MongoClient; + let client: MongoClient; + let commandsStarted: CommandStartedEvent[]; + + beforeEach(async function () { + this.configuration.url({ useMultipleMongoses: false }); + internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .dropCollection('coll') + .catch(() => null); + commandsStarted = []; + + client = await this.configuration.newClient(undefined, { monitorCommands: true }).connect(); + client.on('commandStarted', ev => { + commandsStarted.push(ev); + }); + }); + + afterEach(async function () { + await internalClient + .db() + .admin() + ?.command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient?.close(); + await client?.close(); + }); + + context('when in stream mode', function () { + let data: any[]; + let cs: ChangeStream; + let errorIter: AsyncIterableIterator; + + afterEach(async function () { + await cs?.close(); + }); + + context('when the initial aggregate times out', function () { + beforeEach(async function () { + data = []; + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, // fail twice to account for executeOperation's retry attempt + data: { + failCommands: ['aggregate'], + blockConnection: true, + blockTimeMS: 130 + } + }; + + await internalClient.db().admin().command(failpoint); + cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); + errorIter = on(cs, 'error'); + cs.on('change', () => { + // Add empty listener just to get the change stream running + }); + }); + + it('emits an error event', metadata, async function () { + const err = (await errorIter.next()).value[0]; + + expect(data).to.have.lengthOf(0); + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it('closes the change stream', metadata, async function () { + const err = (await errorIter.next()).value[0]; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + expect(cs.closed).to.be.true; + }); + }); + + context('when the getMore times out', function () { + let onSharded: boolean; + beforeEach(async function () { + onSharded = + this.configuration.topologyType === TopologyType.LoadBalanced || + this.configuration.topologyType === TopologyType.Sharded; + data = []; + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + blockConnection: true, + blockTimeMS: onSharded ? 5100 : 120 + } + }; + + await internalClient.db().admin().command(failpoint); + cs = client + .db('db') + .collection('coll') + .watch([], { timeoutMS: onSharded ? 5000 : 100 }); + errorIter = on(cs, 'error'); + cs.on('change', () => { + // Add empty listener just to get the change stream running + }); + }); + + it('emits an error event', metadata, async function () { + const [err] = (await errorIter.next()).value; + expect(data).to.have.lengthOf(0); + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it( + 'continues emitting change events', + { + requires: { + mongodb: '>=8.0', // NOTE: we are only testing on >= 8.0 because this version has increased performance and this test is sensitive to server performance. This feature should continue to work on server versions down to 4.4, but would require a larger value of timeoutMS which would either significantly slow down our CI testing or make the test flaky + topology: '!single', + os: 'linux' + } + }, + async function () { + // NOTE: duplicating setup code here so its particular configuration requirements don't + // affect other tests. + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + blockConnection: true, + blockTimeMS: onSharded ? 5100 : 520 + } + }; + + await internalClient.db().admin().command(failpoint); + const cs = client + .db('db') + .collection('coll') + .watch([], { timeoutMS: onSharded ? 5000 : 500 }); + const errorIter = on(cs, 'error'); + cs.on('change', () => { + // Add empty listener just to get the change stream running + }); + + const err = (await errorIter.next()).value[0]; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + + await once(cs.cursor, 'resumeTokenChanged'); + + const { + promise: changePromise, + resolve, + reject + } = promiseWithResolvers>(); + + cs.once('change', resolve); + + cs.once('error', reject); + + await internalClient.db('db').collection('coll').insertOne({ x: 1 }); + const change = await changePromise; + expect(change).to.have.ownProperty('operationType', 'insert'); + } + ); + + it('does not close the change stream', metadata, async function () { + const [err] = (await errorIter.next()).value; + expect(err).to.be.instanceof(MongoOperationTimeoutError); + + expect(cs.closed).to.be.false; + }); + + it('attempts to create a new change stream cursor', metadata, async function () { + await errorIter.next(); + let aggregates = commandsStarted + .filter(x => x.commandName === 'aggregate') + .map(x => x.command); + expect(aggregates).to.have.lengthOf(1); + + await once(cs, 'resumeTokenChanged'); + + aggregates = commandsStarted + .filter(x => x.commandName === 'aggregate') + .map(x => x.command); + + expect(aggregates).to.have.lengthOf(2); + + expect(aggregates[0].pipeline).to.deep.equal([{ $changeStream: {} }]); + expect(aggregates[1].pipeline).to.deep.equal([ + { $changeStream: { resumeAfter: cs.resumeToken } } + ]); + }); + }); + + context('when the resume attempt times out', function () { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 2 }, // timeout the getMore, and the aggregate + data: { + failCommands: ['getMore', 'aggregate'], + blockConnection: true, + blockTimeMS: 130 + } + }; + + beforeEach(async function () { + cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); + const _changePromise = once(cs, 'change'); + await once(cs.cursor, 'init'); + + await internalClient.db().admin().command(failpoint); + }); + + it('emits an error event', metadata, async function () { + let [err] = await once(cs, 'error'); // getMore failure + expect(err).to.be.instanceof(MongoOperationTimeoutError); + [err] = await once(cs, 'error'); // aggregate failure + expect(err).to.be.instanceof(MongoOperationTimeoutError); + }); + + it('closes the change stream', metadata, async function () { + await once(cs, 'error'); // await the getMore Failure + await once(cs, 'error'); // await the aggregate failure + expect(cs.closed).to.be.true; + }); + }); + }); + }); + describe('GridFSBucket', () => { const blockTimeMS = 200; let internalClient: MongoClient; diff --git a/test/integration/client-side-operations-timeout/unified-csot-node-specs/change-streams.json b/test/integration/client-side-operations-timeout/unified-csot-node-specs/change-streams.json new file mode 100644 index 00000000000..4708939d009 --- /dev/null +++ b/test/integration/client-side-operations-timeout/unified-csot-node-specs/change-streams.json @@ -0,0 +1,135 @@ +{ + "description": "timeoutMS behaves correctly for change streams", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "4.4", + "topologies": [ + "replicaset", + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + }, + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "killCursors" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "coll" + } + } + ], + "initialData": [ + { + "collectionName": "coll", + "databaseName": "test", + "documents": [] + } + ], + "tests": [ + { + "description": "timeoutMS is refreshed for getMore if maxAwaitTimeMS is set", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "aggregate", + "getMore" + ], + "blockConnection": true, + "blockTimeMS": 150 + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection", + "arguments": { + "pipeline": [], + "timeoutMS": 200, + "batchSize": 2, + "maxAwaitTimeMS": 10 + }, + "saveResultAsEntity": "changeStream" + }, + { + "name": "iterateOnce", + "object": "changeStream" + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "commandName": "aggregate", + "databaseName": "test", + "command": { + "aggregate": "coll", + "maxTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + } + }, + { + "commandStartedEvent": { + "commandName": "getMore", + "databaseName": "test", + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "coll", + "maxTimeMS": 10 + } + } + } + ] + } + ] + } + ] +} diff --git a/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json index 17da3e3c0c9..aabc39abb37 100644 --- a/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json +++ b/test/integration/client-side-operations-timeout/unified-csot-node-specs/tailable-awaitData.json @@ -141,6 +141,89 @@ ] } ] + }, + { + "description": "timeoutMS is refreshed for getMore if maxAwaitTimeMS is set", + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "find", + "getMore" + ], + "blockConnection": true, + "blockTimeMS": 150 + } + } + } + }, + { + "name": "createFindCursor", + "object": "collection", + "arguments": { + "filter": {}, + "cursorType": "tailableAwait", + "timeoutMS": 250, + "batchSize": 1, + "maxAwaitTimeMS": 10 + }, + "saveResultAsEntity": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor" + }, + { + "name": "iterateUntilDocumentOrError", + "object": "tailableCursor" + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "commandName": "find", + "databaseName": "test", + "command": { + "find": "coll", + "tailable": true, + "awaitData": true, + "maxTimeMS": { + "$$exists": true + } + } + } + }, + { + "commandStartedEvent": { + "commandName": "getMore", + "databaseName": "test", + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "coll", + "maxTimeMS": 10 + } + } + } + ] + } + ] } ] } diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index 8e154e1dc3e..ac060c9d459 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -8,6 +8,7 @@ import { AbstractCursor, type Collection, type CommandStartedEvent, + CSOTTimeoutContext, CursorTimeoutContext, CursorTimeoutMode, type FindCursor, @@ -408,37 +409,58 @@ describe('class AbstractCursor', function () { let collection: Collection; let context: CursorTimeoutContext; const commands: CommandStartedEvent[] = []; + let internalContext: TimeoutContext; beforeEach(async function () { client = this.configuration.newClient({}, { monitorCommands: true }); client.on('commandStarted', filterForCommands('killCursors', commands)); collection = client.db('abstract_cursor_integration').collection('test'); + internalContext = TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }); - context = new CursorTimeoutContext( - TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }), - Symbol() - ); + context = new CursorTimeoutContext(internalContext, Symbol()); await collection.insertMany([{ a: 1 }, { b: 2 }, { c: 3 }]); }); afterEach(async function () { + sinon.restore(); await collection.deleteMany({}); await client.close(); }); - describe('when timeoutMode != LIFETIME', function () { - it('an error is thrown', function () { - expect(() => - collection.find( - {}, - { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.ITERATION } - ) - ).to.throw( - `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME` - ); - }); + it('CursorTimeoutMode.refresh is a no-op', async function () { + const cursorTimeoutRefreshSpy = sinon.spy(CursorTimeoutContext.prototype, 'refresh'); + const csotTimeoutContextRefreshSpy = sinon.spy(CSOTTimeoutContext.prototype, 'refresh'); + const abstractCursorGetMoreSpy = sinon.spy(AbstractCursor.prototype, 'getMore'); + + const cursor = collection.find( + {}, + { timeoutMode: CursorTimeoutMode.ITERATION, timeoutContext: context, batchSize: 1 } + ); + await cursor.toArray(); + + expect(abstractCursorGetMoreSpy).to.have.been.calledThrice; + + expect(cursorTimeoutRefreshSpy.getCalls()).to.have.length(3); + expect(csotTimeoutContextRefreshSpy).to.not.have.been.called; + }); + + it('CursorTimeoutMode.clear is a no-op', async function () { + const cursorTimeoutClearSpy = sinon.spy(CursorTimeoutContext.prototype, 'clear'); + const csotTimeoutContextRefreshSpy = sinon.spy(CSOTTimeoutContext.prototype, 'clear'); + const abstractCursorGetMoreSpy = sinon.spy(AbstractCursor.prototype, 'getMore'); + + const cursor = collection.find( + {}, + { timeoutMode: CursorTimeoutMode.ITERATION, timeoutContext: context, batchSize: 1 } + ); + await cursor.toArray(); + + expect(abstractCursorGetMoreSpy).to.have.been.calledThrice; + + expect(cursorTimeoutClearSpy.getCalls()).to.have.length(4); + expect(csotTimeoutContextRefreshSpy).to.not.have.been.called; }); describe('when timeoutMode is omitted', function () {