Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6305): Add CSOT support to tailable cursors #4218

Merged
merged 141 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 127 commits
Commits
Show all changes
141 commits
Select commit Hold shift + click to select a range
d513fd6
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
7d65aae
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
954e713
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
c5969b2
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
nbbeeken Jul 22, 2024
8599cb3
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
7b9fa1f
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
W-A-James Aug 1, 2024
260d679
feat(NODE-6312): add error transformation for server timeouts (#4192)
nbbeeken Aug 12, 2024
78eb356
add timeoutMode to find, findOne, listIndexes, aggregate, listCollect…
W-A-James Aug 14, 2024
a22784d
unskip spec tests
W-A-James Aug 14, 2024
6c423e0
implement prose tests
W-A-James Aug 14, 2024
978d8dd
Add timeoutMode to abstract cursor and add validation
W-A-James Aug 14, 2024
5317378
Add $out and $merge validation
W-A-James Aug 14, 2024
7c459e0
start csot cursor implementation
W-A-James Aug 15, 2024
0a2e4ce
WIP: int testing
W-A-James Aug 15, 2024
ac97b5a
WIP
W-A-James Aug 16, 2024
02f835e
fix test
W-A-James Aug 16, 2024
1e65e25
correctly propagate timeoutMode
W-A-James Aug 16, 2024
0bc7793
rework test skip logic
W-A-James Aug 16, 2024
f83d3ef
wip
W-A-James Aug 20, 2024
9549fd4
prose tests WIP
W-A-James Aug 20, 2024
173796b
put CSOT options on runCursorCommand
W-A-James Aug 20, 2024
9fc4c4e
finish up int tests
W-A-James Aug 20, 2024
507af55
timeoutContext refresh fix
W-A-James Aug 20, 2024
1f27040
apply CSOT to hasNext and tryNext
W-A-James Aug 20, 2024
af02eb8
sync spec tests
W-A-James Aug 21, 2024
679c02c
add omitMaxTimeMS
W-A-James Aug 21, 2024
abc1cfd
skipping spec tests
W-A-James Aug 21, 2024
b64af7b
pass through timeoutMS arg to cursor.close
W-A-James Aug 21, 2024
628eb55
Fix tests and dropindexes
W-A-James Aug 23, 2024
e90e7ed
add catch
W-A-James Aug 23, 2024
cbbd7e7
ensure that CSOT tests only run against 4.4 or greater
W-A-James Aug 26, 2024
fd43e60
lint
W-A-James Aug 26, 2024
bcb6e70
ignore
W-A-James Aug 26, 2024
2ac4b0f
update export test
W-A-James Aug 26, 2024
ba0dc00
remove TODO
W-A-James Aug 26, 2024
1fb6c7d
use cursorOptions instead of CursorInitializeOptions
W-A-James Aug 28, 2024
343829f
remove CursorInitializeOptions and fix export
W-A-James Aug 28, 2024
441f71d
Review fixes
W-A-James Aug 28, 2024
a9b4f02
fix setting of omitMaxTimeMS opts
W-A-James Aug 28, 2024
041b202
ensure that timeoutMS is provided to cursor.close in UTR
W-A-James Aug 28, 2024
cff3c3c
add modified spec test
W-A-James Aug 28, 2024
0488d50
remove redundant optional chaining
W-A-James Aug 28, 2024
4a6607f
remove cursorTimeoutMode from timeoutContext options
W-A-James Aug 28, 2024
96ecddc
remove types from expected exports
W-A-James Aug 28, 2024
504eec2
remove cursorTimeoutMode from TimeoutContext.create call
W-A-James Aug 29, 2024
cb810e2
review comments
W-A-James Sep 3, 2024
54286ae
lint
W-A-James Sep 3, 2024
9114290
type annotation and bracket reorganization
W-A-James Sep 3, 2024
a47337b
skip flaky test and replace with more consistent one
W-A-James Sep 4, 2024
3236c83
spec test fixes
W-A-James Sep 4, 2024
743b55f
lint
W-A-James Sep 4, 2024
5f4ba4c
unskip tailable cursor spec tests
W-A-James Aug 28, 2024
fe62afb
unskip prose test
W-A-James Aug 28, 2024
f6aa44b
WIP
W-A-James Aug 29, 2024
134259c
WIP
W-A-James Sep 3, 2024
5a30b84
update tests
W-A-James Sep 3, 2024
5eb53ca
WIP
W-A-James Sep 3, 2024
8e6069b
WIP
W-A-James Sep 4, 2024
f997eed
test fixes
W-A-James Sep 4, 2024
095234a
update spec runner
W-A-James Sep 4, 2024
2ec0219
unskip spec tests
W-A-James Sep 4, 2024
f98c095
WIP
W-A-James Sep 6, 2024
b5006de
WIP - updated spec test files
W-A-James Sep 6, 2024
c110bcb
change streams WIP
W-A-James Sep 6, 2024
3c2ec0a
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
909578f
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
e101750
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
e4efd3f
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
nbbeeken Jul 22, 2024
22082c9
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
bf95fa4
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
W-A-James Aug 1, 2024
c63d102
feat(NODE-6312): add error transformation for server timeouts (#4192)
nbbeeken Aug 12, 2024
1eab23d
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
4c4b0a9
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
W-A-James Sep 12, 2024
558d416
fix(NODE-6374): MongoOperationTimeoutError inherits MongoRuntimeError…
nbbeeken Sep 12, 2024
3ed4a14
test: remove empty skipped context blocks (#4238)
W-A-James Sep 12, 2024
45483b7
Merge branch 'NODE-6090' into NODE-6305
W-A-James Sep 16, 2024
ae83865
Revert "Merge branch 'NODE-6090' into NODE-6305"
W-A-James Sep 16, 2024
5045ec3
Reapply "Merge branch 'NODE-6090' into NODE-6305"
W-A-James Sep 16, 2024
d3438ea
feat(NODE-5844): add iscryptd to ServerDescription (#4239)
nbbeeken Sep 17, 2024
23c9cb1
resolve merge errors
W-A-James Sep 17, 2024
abe7827
resolve merge errors
W-A-James Sep 17, 2024
f7227a7
fixed tests
W-A-James Sep 17, 2024
7a400c7
lint
W-A-James Sep 17, 2024
9e90e13
unskip prose tests
W-A-James Sep 17, 2024
0fdeaa4
add new benchmarks
W-A-James Sep 17, 2024
6c127b9
do not add timeoutMS to create command
W-A-James Sep 17, 2024
fb87725
add todo
W-A-James Sep 17, 2024
2fde74c
Merge branch 'NODE-6090' into NODE-6305
W-A-James Sep 18, 2024
f5634e6
update prose test
W-A-James Sep 18, 2024
acbc170
skip change stream spec tests
W-A-James Sep 18, 2024
2c4aafb
create modified spec tests for coverage
W-A-James Sep 18, 2024
31b2115
update modified spec tests
W-A-James Sep 18, 2024
4ac2ece
cleanup
W-A-James Sep 18, 2024
6b12b6e
lint
W-A-James Sep 18, 2024
3233a66
lint
W-A-James Sep 18, 2024
10cc4bf
comment debug code
W-A-James Sep 18, 2024
588735b
revert change_stream changes
W-A-James Sep 18, 2024
a5d7dcc
revert changes to change_stream_cursor
W-A-James Sep 18, 2024
257540a
remove duplicated logic
W-A-James Sep 18, 2024
733980c
bump timeouts
W-A-James Sep 18, 2024
c66dc7f
change timeoutMS to match spec
W-A-James Sep 18, 2024
db47396
fix flaky tests
W-A-James Sep 19, 2024
77c4014
fix benchmarks
W-A-James Sep 19, 2024
93746b7
fix tests
W-A-James Sep 19, 2024
91b3948
review comments
W-A-James Sep 19, 2024
7a37fd8
review comments
W-A-James Sep 19, 2024
8901b67
enable CSOT by default for benchmarks
W-A-James Sep 19, 2024
88ea7df
lint
W-A-James Sep 19, 2024
eadcff3
skip test on incompatible server versions
W-A-James Sep 20, 2024
8b4c6fe
add test metadata
W-A-James Sep 20, 2024
609a550
lint
W-A-James Sep 20, 2024
999f23d
feat(NODE-6090): Implement CSOT logic for connection checkout and ser…
W-A-James Apr 11, 2024
0355404
test(NODE-6120): Implement Unified test runner changes for CSOT (#4121)
W-A-James Jun 10, 2024
5ef3d69
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
W-A-James Jun 21, 2024
7139b8f
refactor(NODE-6230): executeOperation to use iterative retry mechanis…
nbbeeken Jul 22, 2024
acfb4fc
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
4efff95
feat(NODE-6231): Add CSOT behaviour for retryable reads and writes (#…
W-A-James Aug 1, 2024
1997f81
feat(NODE-6312): add error transformation for server timeouts (#4192)
nbbeeken Aug 12, 2024
cc3ef8f
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
38affae
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
W-A-James Sep 12, 2024
738188b
fix(NODE-6374): MongoOperationTimeoutError inherits MongoRuntimeError…
nbbeeken Sep 12, 2024
c4a7c2c
test: remove empty skipped context blocks (#4238)
W-A-James Sep 12, 2024
5aa6d4c
feat(NODE-5844): add iscryptd to ServerDescription (#4239)
nbbeeken Sep 17, 2024
17a2fde
chore: allow clientBulkWrite to use TimeoutContext (#4251)
W-A-James Sep 25, 2024
1ed3132
Merge branch 'NODE-6090' into NODE-6305
W-A-James Sep 26, 2024
c29c4ab
lint
W-A-James Sep 26, 2024
1e58f14
lint
W-A-James Sep 26, 2024
2c1cb82
review comments
W-A-James Sep 27, 2024
05c3fcd
remove duplicated logic
W-A-James Sep 27, 2024
6aea417
add catch to dropCollection
W-A-James Sep 30, 2024
20d4b91
skip tests on < 4.4
W-A-James Sep 30, 2024
3cc81c2
Merge branch 'NODE-6090' into NODE-6305
W-A-James Oct 2, 2024
66b3dd5
reskip problematic tests on 4.4
W-A-James Oct 2, 2024
4918283
lint
W-A-James Oct 2, 2024
52227d6
Merge branch 'NODE-6090' into NODE-6305
W-A-James Oct 3, 2024
73c394b
Merge branch 'NODE-6090' into NODE-6305
W-A-James Oct 4, 2024
04e226a
fix merge conflict
W-A-James Oct 4, 2024
737b831
empty
W-A-James Oct 4, 2024
199003a
wait for bulk write to finish
W-A-James Oct 7, 2024
2b843b9
Merge branch 'NODE-6090' into NODE-6305
W-A-James Oct 7, 2024
7225b4e
use separate collection for bulk write test
W-A-James Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion etc/notes/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Children of `MongoError` include:
### `MongoDriverError`

This class represents errors which originate in the driver itself or when the user incorrectly uses the driver. This class should **never** be directly instantiated.
Its children are the main classes of errors that most users will interact with: [**`MongoAPIError`**](#MongoAPIError) and [**`MongoRuntimeError`**](#MongoRuntimeError).
Its children are the main classes of errors that most users will interact with: [**`MongoAPIError`**](#MongoAPIError), [**`MongoRuntimeError`**](#MongoRuntimeError) and [**`MongoOperationTimeoutError`**](#MongoOperationTimeoutError).

### `MongoAPIError`

Expand Down Expand Up @@ -109,6 +109,10 @@ This class should **never** be directly instantiated.
| **MongoGridFSChunkError** | Thrown when a malformed or invalid chunk is encountered when reading from a GridFS Stream. |
| **MongoUnexpectedServerResponseError** | Thrown when the driver receives a **parsable** response it did not expect from the server. |

### `MongoOperationTimeoutError`

- TODO(NODE-5688): Add MongoOperationTimeoutError documentation

### MongoUnexpectedServerResponseError

Intended for the scenario where the MongoDB returns an unexpected response in relation to some state the driver is in.
Expand Down
82 changes: 41 additions & 41 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"mocha": "^10.4.0",
"mocha-sinon": "^2.1.2",
"mongodb-client-encryption": "^6.1.0",
"mongodb-legacy": "^6.1.0",
"mongodb-legacy": "^6.1.1",
"nyc": "^15.1.0",
"prettier": "^3.3.3",
"semver": "^7.6.3",
Expand Down
8 changes: 6 additions & 2 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ export class Admin {
new RunAdminCommandOperation(command, {
...resolveBSONOptions(options),
session: options?.session,
readPreference: options?.readPreference
readPreference: options?.readPreference,
timeoutMS: options?.timeoutMS ?? this.s.db.timeoutMS
})
);
}
Expand Down Expand Up @@ -154,7 +155,10 @@ export class Admin {
* @param options - Optional settings for the command
*/
async listDatabases(options?: ListDatabasesOptions): Promise<ListDatabasesResult> {
return await executeOperation(this.s.db.client, new ListDatabasesOperation(this.s.db, options));
return await executeOperation(
this.s.db.client,
new ListDatabasesOperation(this.s.db, { timeoutMS: this.s.db.timeoutMS, ...options })
);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import {
applyRetryableWrites,
type Callback,
Expand Down Expand Up @@ -873,6 +874,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;

/** @internal */
timeoutContext?: TimeoutContext;
}

const executeCommandsAsync = promisify(executeCommands);
Expand Down
104 changes: 97 additions & 7 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import {
} from '../constants';
import {
MongoCompatibilityError,
MONGODB_ERROR_CODES,
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoOperationTimeoutError,
MongoParseError,
MongoServerError,
MongoUnexpectedServerResponseError
Expand All @@ -30,6 +32,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import { type TimeoutContext, TimeoutError } from '../timeout';
import {
BufferPool,
calculateDurationInMs,
Expand Down Expand Up @@ -83,6 +86,7 @@ export interface CommandOptions extends BSONSerializeOptions {
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;
omitMaxTimeMS?: boolean;

// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
// from executeOperation that the txnNum should be applied to this command.
Expand All @@ -94,6 +98,9 @@ export interface CommandOptions extends BSONSerializeOptions {
writeConcern?: WriteConcern;

directConnection?: boolean;

/** @internal */
timeoutContext?: TimeoutContext;
}

/** @public */
Expand Down Expand Up @@ -413,6 +420,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}

const message = this.supportsOpMsg
? new OpMsgRequest(db, cmd, commandOptions)
: new OpQueryRequest(db, cmd, commandOptions);
Expand All @@ -427,7 +439,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
): AsyncGenerator<MongoDBResponse> {
this.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
if (options.timeoutContext?.csotEnabled()) {
this.socket.setTimeout(0);
} else if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
Expand All @@ -436,7 +450,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
try {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
zlibCompressionLevel: this.description.zlibCompressionLevel,
timeoutContext: options.timeoutContext
});

if (options.noResponse || message.moreToCome) {
Expand All @@ -446,7 +461,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

this.throwIfAborted();

for await (const response of this.readMany()) {
if (
options.timeoutContext?.csotEnabled() &&
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}

for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
this.socket.setTimeout(0);
const bson = response.parse();

Expand Down Expand Up @@ -515,6 +540,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

if (document.ok === 0) {
if (options.timeoutContext?.csotEnabled() && document.isMaxTimeExpiredError) {
throw new MongoOperationTimeoutError('Server reported a timeout error', {
cause: new MongoServerError((object ??= document.toObject(bsonOptions)))
});
}
throw new MongoServerError((object ??= document.toObject(bsonOptions)));
}

Expand Down Expand Up @@ -588,6 +618,28 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
): Promise<Document> {
this.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options, responseType)) {
if (options.timeoutContext?.csotEnabled()) {
if (MongoDBResponse.is(document)) {
if (document.isMaxTimeExpiredError) {
throw new MongoOperationTimeoutError('Server reported a timeout error', {
cause: new MongoServerError(document.toObject())
});
}
} else {
if (
(Array.isArray(document?.writeErrors) &&
document.writeErrors.some(
error => error?.code === MONGODB_ERROR_CODES.MaxTimeMSExpired
)) ||
document?.writeConcernError?.code === MONGODB_ERROR_CODES.MaxTimeMSExpired
) {
throw new MongoOperationTimeoutError('Server reported a timeout error', {
cause: new MongoServerError(document)
});
}
}
}

return document;
}
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
Expand Down Expand Up @@ -623,7 +675,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
options: {
agreedCompressor?: CompressorName;
zlibCompressionLevel?: number;
timeoutContext?: TimeoutContext;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
Expand All @@ -635,8 +691,32 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (options.timeoutContext?.csotEnabled()) {
if (
options.timeoutContext.minRoundTripTime != null &&
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
) {
throw new MongoOperationTimeoutError(
'Server roundtrip time is greater than the time remaining'
);
}
}

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');

const drainEvent = once<void>(this.socket, 'drain');
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
if (timeout) {
try {
return await Promise.race([drainEvent, timeout]);
} catch (error) {
if (TimeoutError.is(error)) {
throw new MongoOperationTimeoutError('Timed out at socket write');
}
throw error;
}
}
return await drainEvent;
}

/**
Expand All @@ -648,9 +728,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
private async *readMany(options: {
timeoutContext?: TimeoutContext;
}): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.dataEvents = onData(this.messageStream, options);

for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand All @@ -659,6 +742,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
return;
}
}
} catch (readError) {
if (TimeoutError.is(readError)) {
throw new MongoOperationTimeoutError(
`Timed out during socket read (${readError.duration}ms)`
);
}
throw readError;
} finally {
this.dataEvents = null;
this.throwIfAborted();
Expand Down
Loading