diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index 6191e457be..02317264c7 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -223,7 +223,8 @@ export class GridFSBucketWriteStream extends Writable { const remainingTimeMS = this.timeoutContext?.getRemainingTimeMSOrThrow( `Upload timed out after ${this.timeoutContext?.timeoutMS}ms` ); - await this.chunks.deleteMany({ files_id: this.id, timeoutMS: remainingTimeMS }); + + await this.chunks.deleteMany({ files_id: this.id }, { timeoutMS: remainingTimeMS }); } } diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index 51bd834a20..64f5e9ad23 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -1,13 +1,13 @@ /* Specification prose tests */ import { type ChildProcess, spawn } from 'node:child_process'; +import { Readable } from 'node:stream'; import { expect } from 'chai'; import * as os from 'os'; import * as path from 'path'; import * as semver from 'semver'; import * as sinon from 'sinon'; -import { Readable } from 'stream'; import { pipeline } from 'stream/promises'; import { type CommandStartedEvent } from '../../../mongodb'; @@ -22,10 +22,15 @@ import { MongoServerSelectionError, now, ObjectId, - promiseWithResolvers, squashError } from '../../mongodb'; -import { type FailPoint, makeMultiBatchWrite, measureDuration } from '../../tools/utils'; +import { + clearFailPoint, + configureFailPoint, + type FailPoint, + makeMultiBatchWrite, + measureDuration +} from '../../tools/utils'; import { filterForCommands } from '../shared'; describe('CSOT spec prose tests', function () { @@ -168,7 +173,7 @@ describe('CSOT spec prose tests', function () { /** * Each test under this category MUST only be run against server versions 4.4 and higher. In these tests, * `LOCAL_MASTERKEY` refers to the following base64: - * ```txt + * ```text * Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk * ``` * For each test, perform the following setup: @@ -629,14 +634,16 @@ describe('CSOT spec prose tests', function () { }); context('6. GridFS - Upload', () => { + /** Tests in this section MUST only be run against server versions 4.4 and higher. */ const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4' } }; - let internalClient: MongoClient; let client: MongoClient; beforeEach(async function () { - internalClient = this.configuration.newClient(); + const internalClient = this.configuration.newClient( + this.configuration.url({ useMultipleMongoses: false }) + ); await internalClient .db('db') .dropCollection('files') @@ -645,127 +652,113 @@ describe('CSOT spec prose tests', function () { .db('db') .dropCollection('chunks') .catch(() => null); + await internalClient.close(); - client = this.configuration.newClient(undefined, { timeoutMS: 100 }); + client = this.configuration.newClient( + this.configuration.url({ useMultipleMongoses: false }), + { timeoutMS: 150 } + ); }); afterEach(async function () { - if (internalClient) { - await internalClient - .db() - .admin() - .command({ configureFailPoint: 'failCommand', mode: 'off' }); - await internalClient.close(); - } - if (client) { - await client.close(); - } + await clearFailPoint( + this.configuration, + this.configuration.url({ useMultipleMongoses: false }) + ); + await client?.close(); }); - /** Tests in this section MUST only be run against server versions 4.4 and higher. */ it('uploads via openUploadStream can be timed out', metadata, async function () { - /** - * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: "failCommand", - * mode: { times: 1 }, - * data: { - * failCommands: ["insert"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=10`. - * 1. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database. - * 1. Call `bucket.open_upload_stream()` with the filename `filename` to create an upload stream (referred to as `uploadStream`). - * - Expect this to succeed and return a non-null stream. - * 1. Using `uploadStream`, upload a single `0x12` byte. - * 1. Call `uploadStream.close()` to flush the stream and insert chunks. - * - Expect this to fail with a timeout error. - */ - const failpoint: FailPoint = { - configureFailPoint: 'failCommand', - mode: { times: 1 }, - data: { - failCommands: ['insert'], - blockConnection: true, - blockTimeMS: 150 - } - }; - await internalClient.db().admin().command(failpoint); + // 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. + // 2. Using `internalClient`, set the following fail point: + // ```javascript + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["insert"], + // blockConnection: true, + // blockTimeMS: 200 + // } + // } + // ``` + // 3. Create a new MongoClient (referred to as `client`) with `timeoutMS=150`. + // 4. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database. + // 5. Call `bucket.open_upload_stream()` with the filename `filename` to create an upload stream (referred to as + // `uploadStream`). + // - Expect this to succeed and return a non-null stream. + // 6. Using `uploadStream`, upload a single `0x12` byte. + // 7. Call `uploadStream.close()` to flush the stream and insert chunks. + // - Expect this to fail with a timeout error. + await configureFailPoint( + this.configuration, + { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 200 + } + }, + this.configuration.url({ useMultipleMongoses: false }) + ); const bucket = new GridFSBucket(client.db('db')); const stream = bucket.openUploadStream('filename'); - const data = Buffer.from('13', 'hex'); - const fileStream = Readable.from(data); - const maybeError = await pipeline(fileStream, stream).then( - () => null, + const maybeError = await pipeline(Readable.from(Buffer.from('13', 'hex')), stream).catch( error => error ); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); }); - it('Aborting an upload stream can be timed out', metadata, async function () { - /** - * This test only applies to drivers that provide an API to abort a GridFS upload stream. - * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: "failCommand", - * mode: { times: 1 }, - * data: { - * failCommands: ["delete"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=10`. - * 1. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database with `chunkSizeBytes=2`. - * 1. Call `bucket.open_upload_stream()` with the filename `filename` to create an upload stream (referred to as `uploadStream`). - * - Expect this to succeed and return a non-null stream. - * 1. Using `uploadStream`, upload the bytes `[0x01, 0x02, 0x03, 0x04]`. - * 1. Call `uploadStream.abort()`. - * - Expect this to fail with a timeout error. - */ - const failpoint: FailPoint = { - configureFailPoint: 'failCommand', - mode: { times: 1 }, - data: { - failCommands: ['delete'], - blockConnection: true, - blockTimeMS: 200 - } - }; + // 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. + // 2. Using `internalClient`, set the following fail point: + // ```javascript + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["delete"], + // blockConnection: true, + // blockTimeMS: 200 + // } + // } + // ``` + // 3. Create a new MongoClient (referred to as `client`) with `timeoutMS=150`. + // 4. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database with + // `chunkSizeBytes=2`. + // 5. Call `bucket.open_upload_stream()` with the filename `filename` to create an upload stream (referred to as + // `uploadStream`). + // - Expect this to succeed and return a non-null stream. + // 6. Using `uploadStream`, upload the bytes `[0x01, 0x02, 0x03, 0x04]`. + // 7. Call `uploadStream.abort()`. + // - Expect this to fail with a timeout error. + await configureFailPoint( + this.configuration, + { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['delete'], + blockConnection: true, + blockTimeMS: 200 + } + }, + this.configuration.url({ useMultipleMongoses: false }) + ); - await internalClient.db().admin().command(failpoint); const bucket = new GridFSBucket(client.db('db'), { chunkSizeBytes: 2 }); - const uploadStream = bucket.openUploadStream('filename', { timeoutMS: 300 }); + const uploadStream = bucket.openUploadStream('filename'); - const data = Buffer.from('01020304', 'hex'); - - const { promise: writePromise, resolve, reject } = promiseWithResolvers(); - uploadStream.on('error', error => uploadStream.destroy(error)); - uploadStream.write(data, error => { - if (error) reject(error); - else resolve(); + await pipeline(Readable.from(Buffer.from('01020304', 'hex')), uploadStream, { + end: false }); - let maybeError = await writePromise.then( - () => null, - e => e - ); - expect(maybeError).to.be.null; - maybeError = await uploadStream.abort().then( - () => null, - error => error - ); - expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + const timeoutError = await uploadStream.abort().catch(error => error); + expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); + uploadStream.destroy(); }); }); @@ -819,47 +812,49 @@ describe('CSOT spec prose tests', function () { } }); - /** - * This test MUST only be run against server versions 4.4 and higher. - * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. - * 1. Using `internalClient`, insert the following document into the `db.fs.files` collection: - * ```js - * { - * "_id": { - * "$oid": "000000000000000000000005" - * }, - * "length": 10, - * "chunkSize": 4, - * "uploadDate": { - * "$date": "1970-01-01T00:00:00.000Z" - * }, - * "md5": "57d83cd477bfb1ccd975ab33d827a92b", - * "filename": "length-10", - * "contentType": "application/octet-stream", - * "aliases": [], - * "metadata": {} - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) with `timeoutMS=10`. - * 1. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database. - * 1. Call `bucket.open_download_stream` with the id `{ "$oid": "000000000000000000000005" }` to create a download stream (referred to as `downloadStream`). - * - Expect this to succeed and return a non-null stream. - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: "failCommand", - * mode: { times: 1 }, - * data: { - * failCommands: ["find"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Read from the `downloadStream`. - * - Expect this to fail with a timeout error. - * 1. Verify that two `find` commands were executed during the read: one against `db.fs.files` and another against `db.fs.chunks`. - */ + // This test MUST only be run against server versions 4.4 and higher. Drivers SHOULD apply + // [useMultipleMongoses=false](../../unified-test-format/unified-test-format.md#entity) as described in the unified test + // format when testing on sharded clusters to ensure failpoint are hit by only using one mongos. + // 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. + // 2. Using `internalClient`, insert the following document into the `db.fs.files` collection: + // ```javascript + // { + // "_id": { + // "$oid": "000000000000000000000005" + // }, + // "length": 10, + // "chunkSize": 4, + // "uploadDate": { + // "$date": "1970-01-01T00:00:00.000Z" + // }, + // "md5": "57d83cd477bfb1ccd975ab33d827a92b", + // "filename": "length-10", + // "contentType": "application/octet-stream", + // "aliases": [], + // "metadata": {} + // } + // ``` + // 3. Create a new MongoClient (referred to as `client`) with `timeoutMS=150`. + // 4. Using `client`, create a GridFS bucket (referred to as `bucket`) that wraps the `db` database. + // 5. Call `bucket.open_download_stream` with the id `{ "$oid": "000000000000000000000005" }` to create a download stream + // (referred to as `downloadStream`). + // - Expect this to succeed and return a non-null stream. + // 6. Using `internalClient`, set the following fail point: + // ```javascript + // { + // configureFailPoint: "failCommand", + // mode: { times: 1 }, + // data: { + // failCommands: ["find"], + // blockConnection: true, + // blockTimeMS: 200 + // } + // } + // ``` + // 7. Read from the `downloadStream`. + // - Expect this to fail with a timeout error. + // 8. Verify that two `find` commands were executed during the read: one against `db.fs.files` and another against + // `db.fs.chunks`. it('download streams can be timed out', metadata, async function () { const bucket = new GridFSBucket(client.db('db')); const downloadStream = bucket.openDownloadStream(new ObjectId('000000000000000000000005')); @@ -1134,35 +1129,33 @@ describe('CSOT spec prose tests', function () { const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: ['replicaset', 'sharded'] } }; - /** - * This test MUST only be run against replica sets and sharded clusters with server version 4.4 or higher. It MUST be - * run three times: once with the timeout specified via the MongoClient `timeoutMS` option, once with the timeout - * specified via the ClientSession `defaultTimeoutMS` option, and once more with the timeout specified via the - * `timeoutMS` option for the `endSession` operation. In all cases, the timeout MUST be set to 10 milliseconds. - * - * 1. Using `internalClient`, drop the `db.coll` collection. - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: failCommand, - * mode: { times: 1 }, - * data: { - * failCommands: ["abortTransaction"], - * blockConnection: true, - * blockTimeMS: 15 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) and an explicit ClientSession derived from that MongoClient (referred to as `session`). - * 1. Execute the following code: - * ```ts - * coll = client.database("db").collection("coll") - * session.start_transaction() - * coll.insert_one({x: 1}, session=session) - * ``` - * 1. Using `session`, execute `session.end_session` - * - Expect this to fail with a timeout error after no more than 15ms. - */ + // This test MUST only be run against replica sets and sharded clusters with server version 4.4 or higher. It MUST be run + // three times: once with the timeout specified via the MongoClient `timeoutMS` option, once with the timeout specified via + // the ClientSession `defaultTimeoutMS` option, and once more with the timeout specified via the `timeoutMS` option for the + // `endSession` operation. In all cases, the timeout MUST be set to 150 milliseconds. + // 1. Using `internalClient`, drop the `db.coll` collection. + // 2. Using `internalClient`, set the following fail point: + // ```javascript + // { + // configureFailPoint: failCommand, + // mode: { times: 1 }, + // data: { + // failCommands: ["abortTransaction"], + // blockConnection: true, + // blockTimeMS: 200 + // } + // } + // ``` + // 3. Create a new MongoClient (referred to as `client`) and an explicit ClientSession derived from that MongoClient + // (referred to as `session`). + // 4. Execute the following code: + // ```typescript + // coll = client.database("db").collection("coll") + // session.start_transaction() + // coll.insert_one({x: 1}, session=session) + // ``` + // 5. Using `session`, execute `session.end_session` + // - Expect this to fail with a timeout error after no more than 150ms. const failpoint: FailPoint = { configureFailPoint: 'failCommand', mode: { times: 1 }, @@ -1249,34 +1242,32 @@ describe('CSOT spec prose tests', function () { }; describe('when an operation fails inside withTransaction callback', () => { - /** - * 1. Using `internalClient`, drop the `db.coll` collection. - * 1. Using `internalClient`, set the following fail point: - * ```js - * { - * configureFailPoint: failCommand, - * mode: { times: 2 }, - * data: { - * failCommands: ["insert", "abortTransaction"], - * blockConnection: true, - * blockTimeMS: 200 - * } - * } - * ``` - * 1. Create a new MongoClient (referred to as `client`) configured with `timeoutMS=10` and an explicit ClientSession derived from that MongoClient (referred to as `session`). - * 1. Using `session`, execute a `withTransaction` operation with the following callback: - * ```js - * function callback() { - * coll = client.database("db").collection("coll") - * coll.insert_one({ _id: 1 }, session=session) - * } - * ``` - * 1. Expect the previous `withTransaction` call to fail with a timeout error. - * 1. Verify that the following events were published during the `withTransaction` call: - * 1. `command_started` and `command_failed` events for an `insert` command. - * 1. `command_started` and `command_failed` events for an `abortTransaction` command. - */ - + // 1. Using `internalClient`, drop the `db.coll` collection. + // 2. Using `internalClient`, set the following fail point: + // ```javascript + // { + // configureFailPoint: failCommand, + // mode: { times: 2 }, + // data: { + // failCommands: ["insert", "abortTransaction"], + // blockConnection: true, + // blockTimeMS: 200 + // } + // } + // ``` + // 3. Create a new MongoClient (referred to as `client`) configured with `timeoutMS=150` and an explicit ClientSession + // derived from that MongoClient (referred to as `session`). + // 4. Using `session`, execute a `withTransaction` operation with the following callback: + // ```typescript + // def callback() { + // coll = client.database("db").collection("coll") + // coll.insert_one({ _id: 1 }, session=session) + // } + // ``` + // 5. Expect the previous `withTransaction` call to fail with a timeout error. + // 6. Verify that the following events were published during the `withTransaction` call: + // 1. `command_started` and `command_failed` events for an `insert` command. + // 2. `command_started` and `command_failed` events for an `abortTransaction` command. const failpoint: FailPoint = { configureFailPoint: 'failCommand', mode: { times: 2 },