diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 8c71e4b0bf5..92d49e56abe 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -427,9 +427,9 @@ export class Connection extends TypedEventEmitter { ...options }; - if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) { - const { maxTimeMS } = options.timeoutContext; - if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS; + if (!options.omitMaxTimeMS) { + const maxTimeMS = options.timeoutContext?.maxTimeMS; + if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS; } const message = this.supportsOpMsg diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d0f386923ad..f7e488d24b2 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import { type AsyncDisposable, configureResourceManagement } from '../resource_management'; import type { Server } from '../sdam/server'; import { ClientSession, maybeClearPinnedConnection } from '../sessions'; -import { TimeoutContext } from '../timeout'; +import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout'; import { type MongoDBNamespace, squashError } from '../utils'; /** @@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { timeoutMS?: number; /** @internal TODO(NODE-5688): make this public */ timeoutMode?: CursorTimeoutMode; + + /** + * @internal + * + * A timeout context to govern the total time the cursor can live. If provided, the cursor + * cannot be used in ITERATION mode. + */ + timeoutContext?: CursorTimeoutContext; } /** @internal */ @@ -171,7 +179,7 @@ export abstract class AbstractCursor< /** @internal */ protected readonly cursorOptions: InternalAbstractCursorOptions; /** @internal */ - protected timeoutContext?: TimeoutContext; + protected timeoutContext?: CursorTimeoutContext; /** @event */ static readonly CLOSE = 'close' as const; @@ -205,20 +213,12 @@ export abstract class AbstractCursor< }; this.cursorOptions.timeoutMS = options.timeoutMS; if (this.cursorOptions.timeoutMS != null) { - if (options.timeoutMode == null) { - if (options.tailable) { - this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION; - } else { - this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME; - } - } else { - if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { - throw new MongoInvalidArgumentError( - "Cannot set tailable cursor's timeoutMode to LIFETIME" - ); - } - this.cursorOptions.timeoutMode = options.timeoutMode; + if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) { + throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME"); } + this.cursorOptions.timeoutMode = + options.timeoutMode ?? + (options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME); } else { if (options.timeoutMode != null) throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS'); @@ -264,6 +264,17 @@ export abstract class AbstractCursor< utf8: options?.enableUtf8Validation === false ? false : true } }; + + if ( + options.timeoutContext != null && + options.timeoutMS != null && + this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME + ) { + throw new MongoAPIError( + `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.` + ); + } + this.timeoutContext = options.timeoutContext; } /** @@ -721,6 +732,9 @@ export abstract class AbstractCursor< * if the resultant data has already been retrieved by this cursor. */ rewind(): void { + if (this.timeoutContext && this.timeoutContext.owner !== this) { + throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`); + } if (!this.initialized) { return; } @@ -790,10 +804,13 @@ export abstract class AbstractCursor< */ private async cursorInit(): Promise { if (this.cursorOptions.timeoutMS != null) { - this.timeoutContext = TimeoutContext.create({ - serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, - timeoutMS: this.cursorOptions.timeoutMS - }); + this.timeoutContext ??= new CursorTimeoutContext( + TimeoutContext.create({ + serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, + timeoutMS: this.cursorOptions.timeoutMS + }), + this + ); } try { const state = await this._initialize(this.cursorSession); @@ -872,6 +889,20 @@ export abstract class AbstractCursor< private async cleanup(timeoutMS?: number, error?: Error) { this.isClosed = true; const session = this.cursorSession; + const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => { + if (timeoutMS != null) { + this.timeoutContext?.clear(); + return new CursorTimeoutContext( + TimeoutContext.create({ + serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, + timeoutMS + }), + this + ); + } else { + return this.timeoutContext?.refreshed(); + } + }; try { if ( !this.isKilled && @@ -884,23 +915,13 @@ export abstract class AbstractCursor< this.isKilled = true; const cursorId = this.cursorId; this.cursorId = Long.ZERO; - let timeoutContext: TimeoutContext | undefined; - if (timeoutMS != null) { - this.timeoutContext?.clear(); - timeoutContext = TimeoutContext.create({ - serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS, - timeoutMS - }); - } else { - this.timeoutContext?.refresh(); - timeoutContext = this.timeoutContext; - } + await executeOperation( this.cursorClient, new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, { session }), - timeoutContext + timeoutContextForKillCursors() ); } } catch (error) { @@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable { } configureResourceManagement(AbstractCursor.prototype); + +/** + * @internal + * The cursor timeout context is a wrapper around a timeout context + * that keeps track of the "owner" of the cursor. For timeout contexts + * instantiated inside a cursor, the owner will be the cursor. + * + * All timeout behavior is exactly the same as the wrapped timeout context's. + */ +export class CursorTimeoutContext extends TimeoutContext { + constructor( + public timeoutContext: TimeoutContext, + public owner: symbol | AbstractCursor + ) { + super(); + } + override get serverSelectionTimeout(): Timeout | null { + return this.timeoutContext.serverSelectionTimeout; + } + override get connectionCheckoutTimeout(): Timeout | null { + return this.timeoutContext.connectionCheckoutTimeout; + } + override get clearServerSelectionTimeout(): boolean { + return this.timeoutContext.clearServerSelectionTimeout; + } + override get clearConnectionCheckoutTimeout(): boolean { + return this.timeoutContext.clearConnectionCheckoutTimeout; + } + override get timeoutForSocketWrite(): Timeout | null { + return this.timeoutContext.timeoutForSocketWrite; + } + override get timeoutForSocketRead(): Timeout | null { + return this.timeoutContext.timeoutForSocketRead; + } + override csotEnabled(): this is CSOTTimeoutContext { + return this.timeoutContext.csotEnabled(); + } + override refresh(): void { + return this.timeoutContext.refresh(); + } + override clear(): void { + return this.timeoutContext.clear(); + } + override get maxTimeMS(): number | null { + return this.timeoutContext.maxTimeMS; + } + + override refreshed(): CursorTimeoutContext { + return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner); + } +} diff --git a/src/index.ts b/src/index.ts index 18d34e3e129..419ddc2e692 100644 --- a/src/index.ts +++ b/src/index.ts @@ -359,6 +359,7 @@ export type { CursorStreamOptions } from './cursor/abstract_cursor'; export type { + CursorTimeoutContext, InitialCursorResponse, InternalAbstractCursorOptions } from './cursor/abstract_cursor'; diff --git a/src/operations/find.ts b/src/operations/find.ts index e50b2762449..10453d141da 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -1,6 +1,6 @@ import type { Document } from '../bson'; import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses'; -import { type CursorTimeoutMode } from '../cursor/abstract_cursor'; +import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor'; import { MongoInvalidArgumentError } from '../error'; import { type ExplainOptions } from '../explain'; import { ReadConcern } from '../read_concern'; @@ -18,7 +18,8 @@ import { Aspect, defineAspects, type Hint } from './operation'; */ // eslint-disable-next-line @typescript-eslint/no-unused-vars export interface FindOptions - extends Omit { + extends Omit, + AbstractCursorOptions { /** Sets the limit of documents returned in the query. */ limit?: number; /** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */ diff --git a/src/timeout.ts b/src/timeout.ts index f694b5f4f4f..9041ce4b88d 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -178,6 +178,8 @@ export abstract class TimeoutContext { else throw new MongoRuntimeError('Unrecognized options'); } + abstract get maxTimeMS(): number | null; + abstract get serverSelectionTimeout(): Timeout | null; abstract get connectionCheckoutTimeout(): Timeout | null; @@ -195,6 +197,9 @@ export abstract class TimeoutContext { abstract refresh(): void; abstract clear(): void; + + /** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */ + abstract refreshed(): TimeoutContext; } /** @internal */ @@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext { throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`); return remainingTimeMS; } + + override refreshed(): CSOTTimeoutContext { + return new CSOTTimeoutContext(this); + } } /** @internal */ @@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext { clear(): void { return; } + + get maxTimeMS() { + return null; + } + + override refreshed(): LegacyTimeoutContext { + return new LegacyTimeoutContext(this.options); + } } diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index b2011ee2e73..f4cfc7d882c 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -26,7 +26,7 @@ import { MongoServerError, ObjectId } from '../../mongodb'; -import { type FailPoint } from '../../tools/utils'; +import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils'; const metadata = { requires: { mongodb: '>=4.4' } }; @@ -362,7 +362,7 @@ describe('CSOT driver tests', metadata, () => { }; beforeEach(async function () { - internalClient = this.configuration.newClient(); + internalClient = this.configuration.newClient({}); await internalClient .db('db') .dropCollection('coll') @@ -378,7 +378,11 @@ describe('CSOT driver tests', metadata, () => { await internalClient.db().admin().command(failpoint); - client = this.configuration.newClient(undefined, { monitorCommands: true }); + client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 }); + + // wait for a handful of connections to have been established + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5); + commandStarted = []; commandSucceeded = []; client.on('commandStarted', ev => commandStarted.push(ev)); @@ -492,7 +496,13 @@ describe('CSOT driver tests', metadata, () => { await internalClient.db().admin().command(failpoint); - client = this.configuration.newClient(undefined, { monitorCommands: true }); + client = this.configuration.newClient(undefined, { + monitorCommands: true, + minPoolSize: 10 + }); + // wait for a handful of connections to have been established + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5); + commandStarted = []; commandSucceeded = []; client.on('commandStarted', ev => commandStarted.push(ev)); diff --git a/test/integration/crud/find_cursor_methods.test.js b/test/integration/crud/find_cursor_methods.test.js index 42eeda3e816..21a6649bf0b 100644 --- a/test/integration/crud/find_cursor_methods.test.js +++ b/test/integration/crud/find_cursor_methods.test.js @@ -1,7 +1,13 @@ 'use strict'; const { expect } = require('chai'); const { filterForCommands } = require('../shared'); -const { promiseWithResolvers, MongoCursorExhaustedError } = require('../../mongodb'); +const { + promiseWithResolvers, + MongoCursorExhaustedError, + CursorTimeoutContext, + TimeoutContext, + MongoAPIError +} = require('../../mongodb'); describe('Find Cursor', function () { let client; @@ -246,23 +252,45 @@ describe('Find Cursor', function () { }); context('#rewind', function () { - it('should rewind a cursor', function (done) { + it('should rewind a cursor', async function () { const coll = client.db().collection('abstract_cursor'); const cursor = coll.find({}); - this.defer(() => cursor.close()); - cursor.toArray((err, docs) => { - expect(err).to.not.exist; - expect(docs).to.have.length(6); + try { + let docs = await cursor.toArray(); + expect(docs).to.have.lengthOf(6); cursor.rewind(); - cursor.toArray((err, docs) => { - expect(err).to.not.exist; - expect(docs).to.have.length(6); + docs = await cursor.toArray(); + expect(docs).to.have.lengthOf(6); + } finally { + await cursor.close(); + } + }); - done(); - }); - }); + it('throws if the cursor does not own its timeoutContext', async function () { + const coll = client.db().collection('abstract_cursor'); + const cursor = coll.find( + {}, + { + timeoutContext: new CursorTimeoutContext( + TimeoutContext.create({ + timeoutMS: 1000, + serverSelectionTimeoutMS: 1000 + }), + Symbol() + ) + } + ); + + try { + cursor.rewind(); + expect.fail(`rewind should have thrown.`); + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + } finally { + await cursor.close(); + } }); it('should end an implicit session on rewind', { diff --git a/test/integration/node-specific/abstract_cursor.test.ts b/test/integration/node-specific/abstract_cursor.test.ts index a5e7fba13dd..136e72a3499 100644 --- a/test/integration/node-specific/abstract_cursor.test.ts +++ b/test/integration/node-specific/abstract_cursor.test.ts @@ -7,12 +7,17 @@ import { inspect } from 'util'; import { AbstractCursor, type Collection, + CursorTimeoutContext, + CursorTimeoutMode, type FindCursor, MongoAPIError, type MongoClient, MongoCursorExhaustedError, - MongoServerError + MongoOperationTimeoutError, + MongoServerError, + TimeoutContext } from '../../mongodb'; +import { type FailPoint } from '../../tools/utils'; describe('class AbstractCursor', function () { describe('regression tests NODE-5372', function () { @@ -395,4 +400,114 @@ describe('class AbstractCursor', function () { expect(nextSpy.callCount).to.be.lessThan(numDocuments); }); }); + + describe('externally provided timeout contexts', function () { + let client: MongoClient; + let collection: Collection; + let context: CursorTimeoutContext; + + beforeEach(async function () { + client = this.configuration.newClient(); + + collection = client.db('abstract_cursor_integration').collection('test'); + + context = new CursorTimeoutContext( + TimeoutContext.create({ timeoutMS: 1000, serverSelectionTimeoutMS: 2000 }), + Symbol() + ); + + await collection.insertMany([{ a: 1 }, { b: 2 }, { c: 3 }]); + }); + + afterEach(async function () { + await collection.deleteMany({}); + await client.close(); + }); + + describe('when timeoutMode != LIFETIME', function () { + it('an error is thrown', function () { + expect(() => + collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.ITERATION } + ) + ).to.throw( + `cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME` + ); + }); + }); + + describe('when timeoutMode is omitted', function () { + it('stores timeoutContext as the timeoutContext on the cursor', function () { + const cursor = collection.find({}, { timeoutContext: context, timeoutMS: 1000 }); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when timeoutMode is LIFETIME', function () { + it('stores timeoutContext as the timeoutContext on the cursor', function () { + const cursor = collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.LIFETIME } + ); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when the cursor is initialized', function () { + it('the provided timeoutContext is not overwritten', async function () { + const cursor = collection.find( + {}, + { timeoutContext: context, timeoutMS: 1000, timeoutMode: CursorTimeoutMode.LIFETIME } + ); + + await cursor.toArray(); + + // @ts-expect-error Private access. + expect(cursor.timeoutContext).to.equal(context); + }); + }); + + describe('when the cursor refreshes the timeout for killCursors', function () { + it( + 'the provided timeoutContext is not modified', + { + requires: { + mongodb: '>=4.4' + } + }, + async function () { + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + blockConnection: true, + blockTimeMS: 5000 + } + } as FailPoint); + + const cursor = collection.find( + {}, + { + timeoutContext: context, + timeoutMS: 1000, + timeoutMode: CursorTimeoutMode.LIFETIME, + batchSize: 1 + } + ); + + const error = await cursor.toArray().catch(e => e); + + expect(error).to.be.instanceof(MongoOperationTimeoutError); + // @ts-expect-error We know we have a CSOT timeout context but TS does not. + expect(context.timeoutContext.remainingTimeMS).to.be.lessThan(0); + } + ); + }); + }); }); diff --git a/test/integration/server-selection/server_selection.prose.operation_count.test.ts b/test/integration/server-selection/server_selection.prose.operation_count.test.ts index fec6d24e61c..b4a7d9bf47b 100644 --- a/test/integration/server-selection/server_selection.prose.operation_count.test.ts +++ b/test/integration/server-selection/server_selection.prose.operation_count.test.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import { on } from 'events'; import { type Collection, @@ -7,7 +6,7 @@ import { HostAddress, type MongoClient } from '../../mongodb'; -import { sleep } from '../../tools/utils'; +import { waitUntilPoolsFilled } from '../../tools/utils'; const failPoint = { configureFailPoint: 'failCommand', @@ -28,17 +27,6 @@ async function runTaskGroup(collection: Collection, count: 10 | 100 | 1000) { } } -async function ensurePoolIsFull(client: MongoClient): Promise { - let connectionCount = 0; - - for await (const _event of on(client, 'connectionCreated')) { - connectionCount++; - if (connectionCount === POOL_SIZE * 2) { - break; - } - } -} - // Step 1: Configure a sharded cluster with two mongoses. Use a 4.2.9 or newer server version. const TEST_METADATA: MongoDBMetadataUI = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } }; @@ -75,15 +63,8 @@ describe('operationCount-based Selection Within Latency Window - Prose Test', fu client.on('commandStarted', updateCount); - const poolIsFullPromise = ensurePoolIsFull(client); - - await client.connect(); - // Step 4: Using CMAP events, ensure the client's connection pools for both mongoses have been saturated - const poolIsFull = Promise.race([poolIsFullPromise, sleep(30 * 1000)]); - if (!poolIsFull) { - throw new Error('Timed out waiting for connection pool to fill to minPoolSize'); - } + await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), POOL_SIZE * 2); seeds = client.topology.s.seedlist.map(address => address.toString()); diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 3cb50d2cd51..8614bd7d64c 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,5 +1,5 @@ import * as child_process from 'node:child_process'; -import { once } from 'node:events'; +import { on, once } from 'node:events'; import * as fs from 'node:fs/promises'; import * as path from 'node:path'; @@ -568,3 +568,33 @@ export async function itInNodeProcess( } }); } + +/** + * Connects the client and waits until `client` has emitted `count` connectionCreated events. + * + * **This will hang if the client does not have a maxPoolSizeSet!** + * + * This is useful when you want to ensure that the client has pools that are full of connections. + * + * This does not guarantee that all pools that the client has are completely full unless + * count = number of servers to which the client is connected * maxPoolSize. But it can + * serve as a way to ensure that some connections have been established and are in the pools. + */ +export async function waitUntilPoolsFilled( + client: MongoClient, + signal: AbortSignal, + count: number = client.s.options.maxPoolSize +): Promise { + let connectionCount = 0; + + async function wait$() { + for await (const _event of on(client, 'connectionCreated', { signal })) { + connectionCount++; + if (connectionCount >= count) { + break; + } + } + } + + await Promise.all([wait$(), client.connect()]); +}