Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(NODE-6411): AbstractCursor accepts an external timeout context #4264

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
if (!options.omitMaxTimeMS) {
const maxTimeMS = options.timeoutContext?.maxTimeMS;
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
Expand Down
134 changes: 103 additions & 31 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;

/**
* @internal
*
* A timeout context to govern the total time the cursor can live. If provided, the cursor
* cannot be used in ITERATION mode.
*/
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -171,7 +179,7 @@ export abstract class AbstractCursor<
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;
protected timeoutContext?: CursorTimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -205,20 +213,12 @@ export abstract class AbstractCursor<
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a bit of cleanup, happy to revert.

throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
}
this.cursorOptions.timeoutMode =
options.timeoutMode ??
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
} else {
if (options.timeoutMode != null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
Expand Down Expand Up @@ -264,6 +264,17 @@ export abstract class AbstractCursor<
utf8: options?.enableUtf8Validation === false ? false : true
}
};

if (
options.timeoutContext != null &&
options.timeoutMS != null &&
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
) {
throw new MongoAPIError(
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
);
}
this.timeoutContext = options.timeoutContext;
}

/**
Expand Down Expand Up @@ -721,6 +732,9 @@ export abstract class AbstractCursor<
* if the resultant data has already been retrieved by this cursor.
*/
rewind(): void {
if (this.timeoutContext && this.timeoutContext.owner !== this) {
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
}
if (!this.initialized) {
return;
}
Expand Down Expand Up @@ -790,10 +804,13 @@ export abstract class AbstractCursor<
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
this.timeoutContext ??= new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
}),
this
);
}
try {
const state = await this._initialize(this.cursorSession);
Expand Down Expand Up @@ -872,6 +889,20 @@ export abstract class AbstractCursor<
private async cleanup(timeoutMS?: number, error?: Error) {
this.isClosed = true;
const session = this.cursorSession;
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
if (timeoutMS != null) {
this.timeoutContext?.clear();
return new CursorTimeoutContext(
TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
}),
this
);
} else {
return this.timeoutContext?.refreshed();
}
};
try {
if (
!this.isKilled &&
Expand All @@ -884,23 +915,13 @@ export abstract class AbstractCursor<
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
let timeoutContext: TimeoutContext | undefined;
if (timeoutMS != null) {
this.timeoutContext?.clear();
timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
});
} else {
this.timeoutContext?.refresh();
timeoutContext = this.timeoutContext;
}

await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
}),
timeoutContext
timeoutContextForKillCursors()
);
}
} catch (error) {
Expand Down Expand Up @@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
}

configureResourceManagement(AbstractCursor.prototype);

/**
* @internal
* The cursor timeout context is a wrapper around a timeout context
* that keeps track of the "owner" of the cursor. For timeout contexts
* instantiated inside a cursor, the owner will be the cursor.
*
* All timeout behavior is exactly the same as the wrapped timeout context's.
*/
export class CursorTimeoutContext extends TimeoutContext {
constructor(
public timeoutContext: TimeoutContext,
public owner: symbol | AbstractCursor
) {
super();
}
override get serverSelectionTimeout(): Timeout | null {
return this.timeoutContext.serverSelectionTimeout;
}
override get connectionCheckoutTimeout(): Timeout | null {
return this.timeoutContext.connectionCheckoutTimeout;
}
override get clearServerSelectionTimeout(): boolean {
return this.timeoutContext.clearServerSelectionTimeout;
}
override get clearConnectionCheckoutTimeout(): boolean {
return this.timeoutContext.clearConnectionCheckoutTimeout;
}
override get timeoutForSocketWrite(): Timeout | null {
return this.timeoutContext.timeoutForSocketWrite;
}
override get timeoutForSocketRead(): Timeout | null {
return this.timeoutContext.timeoutForSocketRead;
}
override csotEnabled(): this is CSOTTimeoutContext {
return this.timeoutContext.csotEnabled();
}
override refresh(): void {
return this.timeoutContext.refresh();
}
override clear(): void {
return this.timeoutContext.clear();
}
override get maxTimeMS(): number | null {
return this.timeoutContext.maxTimeMS;
}

override refreshed(): CursorTimeoutContext {
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export type {
CursorStreamOptions
} from './cursor/abstract_cursor';
export type {
CursorTimeoutContext,
InitialCursorResponse,
InternalAbstractCursorOptions
} from './cursor/abstract_cursor';
Expand Down
5 changes: 3 additions & 2 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { ReadConcern } from '../read_concern';
import type { Server } from '../sdam/server';
Expand All @@ -17,7 +17,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface FindOptions<TSchema extends Document = Document>
extends Omit<CommandOperationOptions, 'writeConcern'> {
extends Omit<CommandOperationOptions, 'writeConcern'>,
AbstractCursorOptions {
/** Sets the limit of documents returned in the query. */
limit?: number;
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */
Expand Down
17 changes: 17 additions & 0 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ export abstract class TimeoutContext {
else throw new MongoRuntimeError('Unrecognized options');
}

abstract get maxTimeMS(): number | null;

abstract get serverSelectionTimeout(): Timeout | null;

abstract get connectionCheckoutTimeout(): Timeout | null;
Expand All @@ -195,6 +197,9 @@ export abstract class TimeoutContext {
abstract refresh(): void;

abstract clear(): void;

/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
abstract refreshed(): TimeoutContext;
}

/** @internal */
Expand Down Expand Up @@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`);
return remainingTimeMS;
}

override refreshed(): CSOTTimeoutContext {
return new CSOTTimeoutContext(this);
}
}

/** @internal */
Expand Down Expand Up @@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
clear(): void {
return;
}

get maxTimeMS() {
return null;
}

override refreshed(): LegacyTimeoutContext {
return new LegacyTimeoutContext(this.options);
}
}
18 changes: 14 additions & 4 deletions test/integration/client-side-operations-timeout/node_csot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
MongoServerError,
ObjectId
} from '../../mongodb';
import { type FailPoint } from '../../tools/utils';
import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils';

const metadata = { requires: { mongodb: '>=4.4' } };

Expand Down Expand Up @@ -362,7 +362,7 @@ describe('CSOT driver tests', metadata, () => {
};

beforeEach(async function () {
internalClient = this.configuration.newClient();
internalClient = this.configuration.newClient({});
await internalClient
.db('db')
.dropCollection('coll')
Expand All @@ -378,7 +378,11 @@ describe('CSOT driver tests', metadata, () => {

await internalClient.db().admin().command(failpoint);

client = this.configuration.newClient(undefined, { monitorCommands: true });
client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 });

// wait for a handful of connections to have been established
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);

commandStarted = [];
commandSucceeded = [];
client.on('commandStarted', ev => commandStarted.push(ev));
Expand Down Expand Up @@ -492,7 +496,13 @@ describe('CSOT driver tests', metadata, () => {

await internalClient.db().admin().command(failpoint);

client = this.configuration.newClient(undefined, { monitorCommands: true });
client = this.configuration.newClient(undefined, {
monitorCommands: true,
minPoolSize: 10
});
// wait for a handful of connections to have been established
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);

commandStarted = [];
commandSucceeded = [];
client.on('commandStarted', ev => commandStarted.push(ev));
Expand Down
Loading