From c71a45005313095f555e5c292397a92b6cd8e8c4 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 4 Oct 2024 10:52:21 -0400 Subject: [PATCH] feat(NODE-6275): Add CSOT support to GridFS (#4246) Co-authored-by: Neal Beeken Co-authored-by: Bailey Pearson --- package-lock.json | 9 +- package.json | 2 +- src/collection.ts | 10 +- src/gridfs/download.ts | 44 +++- src/gridfs/index.ts | 74 +++++-- src/gridfs/upload.ts | 191 ++++++++++++++---- src/timeout.ts | 12 ++ ...ient_side_operations_timeout.prose.test.ts | 171 +++++++++++++++- ...lient_side_operations_timeout.spec.test.ts | 5 - .../node_csot.test.ts | 167 ++++++++++++++- test/tools/unified-spec-runner/operations.ts | 37 +++- 11 files changed, 634 insertions(+), 88 deletions(-) diff --git a/package-lock.json b/package-lock.json index 612c6170e2b..728deda4932 100644 --- a/package-lock.json +++ b/package-lock.json @@ -49,7 +49,7 @@ "mocha": "^10.4.0", "mocha-sinon": "^2.1.2", "mongodb-client-encryption": "^6.1.0", - "mongodb-legacy": "^6.1.1", + "mongodb-legacy": "^6.1.2", "nyc": "^15.1.0", "prettier": "^3.3.3", "semver": "^7.6.3", @@ -6440,10 +6440,11 @@ } }, "node_modules/mongodb-legacy": { - "version": "6.1.1", - "resolved": "https://registry.npmjs.org/mongodb-legacy/-/mongodb-legacy-6.1.1.tgz", - "integrity": "sha512-u9Cl8UEzdtf7mhWrAEHHhfU0OCqahaOB5midwtyudWIuEz5t18DJFXfqJq3cbEypVfLkfF3zi6rkolKMU9uPjQ==", + "version": "6.1.2", + "resolved": "https://registry.npmjs.org/mongodb-legacy/-/mongodb-legacy-6.1.2.tgz", + "integrity": "sha512-oj+LLtvhhi8XuAQ8dll2BVjrnKxOo/7ylyQu0LsKmzyGcbrvzcyvFUOLC6rPhuJPOvnezh3MZ3/Sk9Tl1jpUpg==", "dev": true, + "license": "Apache-2.0", "dependencies": { "mongodb": "^6.0.0" }, diff --git a/package.json b/package.json index b2f627801c8..3339b1df34d 100644 --- a/package.json +++ b/package.json @@ -97,7 +97,7 @@ "mocha": "^10.4.0", "mocha-sinon": "^2.1.2", "mongodb-client-encryption": "^6.1.0", - "mongodb-legacy": "^6.1.1", + "mongodb-legacy": "^6.1.2", "nyc": "^15.1.0", "prettier": "^3.3.3", "semver": "^7.6.3", diff --git a/src/collection.ts b/src/collection.ts index a73a5276f5f..62fa5bd4cba 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -501,12 +501,18 @@ export class Collection { */ async findOne(): Promise | null>; async findOne(filter: Filter): Promise | null>; - async findOne(filter: Filter, options: FindOptions): Promise | null>; + async findOne( + filter: Filter, + options: Omit + ): Promise | null>; // allow an override of the schema. async findOne(): Promise; async findOne(filter: Filter): Promise; - async findOne(filter: Filter, options?: FindOptions): Promise; + async findOne( + filter: Filter, + options?: Omit + ): Promise; async findOne( filter: Filter = {}, diff --git a/src/gridfs/download.ts b/src/gridfs/download.ts index 06dda0a92ba..19651b885ea 100644 --- a/src/gridfs/download.ts +++ b/src/gridfs/download.ts @@ -2,6 +2,7 @@ import { Readable } from 'stream'; import type { Document, ObjectId } from '../bson'; import type { Collection } from '../collection'; +import { CursorTimeoutMode } from '../cursor/abstract_cursor'; import type { FindCursor } from '../cursor/find_cursor'; import { MongoGridFSChunkError, @@ -12,6 +13,7 @@ import { import type { FindOptions } from '../operations/find'; import type { ReadPreference } from '../read_preference'; import type { Sort } from '../sort'; +import { CSOTTimeoutContext } from '../timeout'; import type { Callback } from '../utils'; import type { GridFSChunk } from './upload'; @@ -28,7 +30,7 @@ export interface GridFSBucketReadStreamOptions { * to be returned by the stream. `end` is non-inclusive */ end?: number; - /** @internal TODO(NODE-5688): make this public */ + /** @public */ timeoutMS?: number; } @@ -98,8 +100,10 @@ export interface GridFSBucketReadStreamPrivate { skip?: number; start: number; end: number; + timeoutMS?: number; }; readPreference?: ReadPreference; + timeoutContext?: CSOTTimeoutContext; } /** @@ -148,7 +152,11 @@ export class GridFSBucketReadStream extends Readable { end: 0, ...options }, - readPreference + readPreference, + timeoutContext: + options?.timeoutMS != null + ? new CSOTTimeoutContext({ timeoutMS: options.timeoutMS, serverSelectionTimeoutMS: 0 }) + : undefined }; } @@ -196,7 +204,8 @@ export class GridFSBucketReadStream extends Readable { async abort(): Promise { this.push(null); this.destroy(); - await this.s.cursor?.close(); + const remainingTimeMS = this.s.timeoutContext?.getRemainingTimeMSOrThrow(); + await this.s.cursor?.close({ timeoutMS: remainingTimeMS }); } } @@ -352,7 +361,22 @@ function init(stream: GridFSBucketReadStream): void { filter['n'] = { $gte: skip }; } } - stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 }); + + let remainingTimeMS: number | undefined; + try { + remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow( + `Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms` + ); + } catch (error) { + return stream.destroy(error); + } + + stream.s.cursor = stream.s.chunks + .find(filter, { + timeoutMode: stream.s.options.timeoutMS != null ? CursorTimeoutMode.LIFETIME : undefined, + timeoutMS: remainingTimeMS + }) + .sort({ n: 1 }); if (stream.s.readPreference) { stream.s.cursor.withReadPreference(stream.s.readPreference); @@ -371,6 +395,18 @@ function init(stream: GridFSBucketReadStream): void { return; }; + let remainingTimeMS: number | undefined; + try { + remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow( + `Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms` + ); + } catch (error) { + if (!stream.destroyed) stream.destroy(error); + return; + } + + findOneOptions.timeoutMS = remainingTimeMS; + stream.s.files.findOne(stream.s.filter, findOneOptions).then(handleReadResult, error => { if (stream.destroyed) return; stream.destroy(error); diff --git a/src/gridfs/index.ts b/src/gridfs/index.ts index 51c32b7a01c..de114e5e597 100644 --- a/src/gridfs/index.ts +++ b/src/gridfs/index.ts @@ -2,10 +2,12 @@ import type { ObjectId } from '../bson'; import type { Collection } from '../collection'; import type { FindCursor } from '../cursor/find_cursor'; import type { Db } from '../db'; -import { MongoRuntimeError } from '../error'; +import { MongoOperationTimeoutError, MongoRuntimeError } from '../error'; import { type Filter, TypedEventEmitter } from '../mongo_types'; import type { ReadPreference } from '../read_preference'; import type { Sort } from '../sort'; +import { CSOTTimeoutContext } from '../timeout'; +import { resolveOptions } from '../utils'; import { WriteConcern, type WriteConcernOptions } from '../write_concern'; import type { FindOptions } from './../operations/find'; import { @@ -48,6 +50,7 @@ export interface GridFSBucketPrivate { chunkSizeBytes: number; readPreference?: ReadPreference; writeConcern: WriteConcern | undefined; + timeoutMS?: number; }; _chunksCollection: Collection; _filesCollection: Collection; @@ -81,11 +84,11 @@ export class GridFSBucket extends TypedEventEmitter { constructor(db: Db, options?: GridFSBucketOptions) { super(); this.setMaxListeners(0); - const privateOptions = { + const privateOptions = resolveOptions(db, { ...DEFAULT_GRIDFS_BUCKET_OPTIONS, ...options, writeConcern: WriteConcern.fromOptions(options) - }; + }); this.s = { db, options: privateOptions, @@ -109,7 +112,10 @@ export class GridFSBucket extends TypedEventEmitter { filename: string, options?: GridFSBucketWriteStreamOptions ): GridFSBucketWriteStream { - return new GridFSBucketWriteStream(this, filename, options); + return new GridFSBucketWriteStream(this, filename, { + timeoutMS: this.s.options.timeoutMS, + ...options + }); } /** @@ -122,7 +128,11 @@ export class GridFSBucket extends TypedEventEmitter { filename: string, options?: GridFSBucketWriteStreamOptions ): GridFSBucketWriteStream { - return new GridFSBucketWriteStream(this, filename, { ...options, id }); + return new GridFSBucketWriteStream(this, filename, { + timeoutMS: this.s.options.timeoutMS, + ...options, + id + }); } /** Returns a readable stream (GridFSBucketReadStream) for streaming file data from GridFS. */ @@ -135,7 +145,7 @@ export class GridFSBucket extends TypedEventEmitter { this.s._filesCollection, this.s.options.readPreference, { _id: id }, - options + { timeoutMS: this.s.options.timeoutMS, ...options } ); } @@ -144,11 +154,27 @@ export class GridFSBucket extends TypedEventEmitter { * * @param id - The id of the file doc */ - async delete(id: ObjectId): Promise { - const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id }); + async delete(id: ObjectId, options?: { timeoutMS: number }): Promise { + const { timeoutMS } = resolveOptions(this.s.db, options); + let timeoutContext: CSOTTimeoutContext | undefined = undefined; + + if (timeoutMS) { + timeoutContext = new CSOTTimeoutContext({ + timeoutMS, + serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS + }); + } + const { deletedCount } = await this.s._filesCollection.deleteOne( + { _id: id }, + { timeoutMS: timeoutContext?.remainingTimeMS } + ); + + const remainingTimeMS = timeoutContext?.remainingTimeMS; + if (remainingTimeMS != null && remainingTimeMS <= 0) + throw new MongoOperationTimeoutError(`Timed out after ${timeoutMS}ms`); // Delete orphaned chunks before returning FileNotFound - await this.s._chunksCollection.deleteMany({ files_id: id }); + await this.s._chunksCollection.deleteMany({ files_id: id }, { timeoutMS: remainingTimeMS }); if (deletedCount === 0) { // TODO(NODE-3483): Replace with more appropriate error @@ -188,7 +214,7 @@ export class GridFSBucket extends TypedEventEmitter { this.s._filesCollection, this.s.options.readPreference, { filename }, - { ...options, sort, skip } + { timeoutMS: this.s.options.timeoutMS, ...options, sort, skip } ); } @@ -198,18 +224,36 @@ export class GridFSBucket extends TypedEventEmitter { * @param id - the id of the file to rename * @param filename - new name for the file */ - async rename(id: ObjectId, filename: string): Promise { + async rename(id: ObjectId, filename: string, options?: { timeoutMS: number }): Promise { const filter = { _id: id }; const update = { $set: { filename } }; - const { matchedCount } = await this.s._filesCollection.updateOne(filter, update); + const { matchedCount } = await this.s._filesCollection.updateOne(filter, update, options); if (matchedCount === 0) { throw new MongoRuntimeError(`File with id ${id} not found`); } } /** Removes this bucket's files collection, followed by its chunks collection. */ - async drop(): Promise { - await this.s._filesCollection.drop(); - await this.s._chunksCollection.drop(); + async drop(options?: { timeoutMS: number }): Promise { + const { timeoutMS } = resolveOptions(this.s.db, options); + let timeoutContext: CSOTTimeoutContext | undefined = undefined; + + if (timeoutMS) { + timeoutContext = new CSOTTimeoutContext({ + timeoutMS, + serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS + }); + } + + if (timeoutContext) { + await this.s._filesCollection.drop({ timeoutMS: timeoutContext.remainingTimeMS }); + const remainingTimeMS = timeoutContext.getRemainingTimeMSOrThrow( + `Timed out after ${timeoutMS}ms` + ); + await this.s._chunksCollection.drop({ timeoutMS: remainingTimeMS }); + } else { + await this.s._filesCollection.drop(); + await this.s._chunksCollection.drop(); + } } } diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index f54d5131f66..c7544b715d8 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -2,7 +2,14 @@ import { Writable } from 'stream'; import { type Document, ObjectId } from '../bson'; import type { Collection } from '../collection'; -import { MongoAPIError, MONGODB_ERROR_CODES, MongoError } from '../error'; +import { CursorTimeoutMode } from '../cursor/abstract_cursor'; +import { + MongoAPIError, + MONGODB_ERROR_CODES, + MongoError, + MongoOperationTimeoutError +} from '../error'; +import { CSOTTimeoutContext } from '../timeout'; import { type Callback, squashError } from '../utils'; import type { WriteConcernOptions } from '../write_concern'; import { WriteConcern } from './../write_concern'; @@ -35,7 +42,7 @@ export interface GridFSBucketWriteStreamOptions extends WriteConcernOptions { * @deprecated Will be removed in the next major version. Add an aliases field to the metadata document instead. */ aliases?: string[]; - /** @internal TODO(NODE-5688): make this public */ + /** @public */ timeoutMS?: number; } @@ -97,6 +104,8 @@ export class GridFSBucketWriteStream extends Writable { * ``` */ gridFSFile: GridFSFile | null = null; + /** @internal */ + timeoutContext?: CSOTTimeoutContext; /** * @param bucket - Handle for this stream's corresponding bucket @@ -131,14 +140,11 @@ export class GridFSBucketWriteStream extends Writable { aborted: false }; - if (!this.bucket.s.calledOpenUploadStream) { - this.bucket.s.calledOpenUploadStream = true; - - checkIndexes(this).then(() => { - this.bucket.s.checkedIndexes = true; - this.bucket.emit('index'); - }, squashError); - } + if (options.timeoutMS != null) + this.timeoutContext = new CSOTTimeoutContext({ + timeoutMS: options.timeoutMS, + serverSelectionTimeoutMS: this.bucket.s.db.client.options.serverSelectionTimeoutMS + }); } /** @@ -147,10 +153,26 @@ export class GridFSBucketWriteStream extends Writable { * The stream is considered constructed when the indexes are done being created */ override _construct(callback: (error?: Error | null) => void): void { - if (this.bucket.s.checkedIndexes) { + if (!this.bucket.s.calledOpenUploadStream) { + this.bucket.s.calledOpenUploadStream = true; + + checkIndexes(this).then( + () => { + this.bucket.s.checkedIndexes = true; + this.bucket.emit('index'); + callback(); + }, + error => { + if (error instanceof MongoOperationTimeoutError) { + return handleError(this, error, callback); + } + squashError(error); + callback(); + } + ); + } else { return process.nextTick(callback); } - this.bucket.once('index', callback); } /** @@ -194,7 +216,10 @@ export class GridFSBucketWriteStream extends Writable { } this.state.aborted = true; - await this.chunks.deleteMany({ files_id: this.id }); + const remainingTimeMS = this.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${this.timeoutContext?.timeoutMS}ms` + ); + await this.chunks.deleteMany({ files_id: this.id, timeoutMS: remainingTimeMS }); } } @@ -219,9 +244,19 @@ function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise { const index = { files_id: 1, n: 1 }; + let remainingTimeMS; + remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ); + let indexes; try { - indexes = await stream.chunks.listIndexes().toArray(); + indexes = await stream.chunks + .listIndexes({ + timeoutMode: remainingTimeMS != null ? CursorTimeoutMode.LIFETIME : undefined, + timeoutMS: remainingTimeMS + }) + .toArray(); } catch (error) { if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { indexes = []; @@ -239,10 +274,14 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise }); if (!hasChunksIndex) { + remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ); await stream.chunks.createIndex(index, { ...stream.writeConcern, background: true, - unique: true + unique: true, + timeoutMS: remainingTimeMS }); } } @@ -270,13 +309,28 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { return; } - stream.files.insertOne(gridFSFile, { writeConcern: stream.writeConcern }).then( - () => { - stream.gridFSFile = gridFSFile; - callback(); - }, - error => handleError(stream, error, callback) - ); + const remainingTimeMS = stream.timeoutContext?.remainingTimeMS; + if (remainingTimeMS != null && remainingTimeMS <= 0) { + return handleError( + stream, + new MongoOperationTimeoutError( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ), + callback + ); + } + + stream.files + .insertOne(gridFSFile, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS }) + .then( + () => { + stream.gridFSFile = gridFSFile; + callback(); + }, + error => { + return handleError(stream, error, callback); + } + ); return; } @@ -284,7 +338,16 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void { } async function checkIndexes(stream: GridFSBucketWriteStream): Promise { - const doc = await stream.files.findOne({}, { projection: { _id: 1 } }); + let remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ); + const doc = await stream.files.findOne( + {}, + { + projection: { _id: 1 }, + timeoutMS: remainingTimeMS + } + ); if (doc != null) { // If at least one document exists assume the collection has the required index return; @@ -293,8 +356,15 @@ async function checkIndexes(stream: GridFSBucketWriteStream): Promise { const index = { filename: 1, uploadDate: 1 }; let indexes; + remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ); + const listIndexesOptions = { + timeoutMode: remainingTimeMS != null ? CursorTimeoutMode.LIFETIME : undefined, + timeoutMS: remainingTimeMS + }; try { - indexes = await stream.files.listIndexes().toArray(); + indexes = await stream.files.listIndexes(listIndexesOptions).toArray(); } catch (error) { if (error instanceof MongoError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound) { indexes = []; @@ -312,7 +382,11 @@ async function checkIndexes(stream: GridFSBucketWriteStream): Promise { }); if (!hasFileIndex) { - await stream.files.createIndex(index, { background: false }); + remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ); + + await stream.files.createIndex(index, { background: false, timeoutMS: remainingTimeMS }); } await checkChunksIndex(stream); @@ -386,6 +460,18 @@ function doWrite( let doc: GridFSChunk; if (spaceRemaining === 0) { doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore)); + + const remainingTimeMS = stream.timeoutContext?.remainingTimeMS; + if (remainingTimeMS != null && remainingTimeMS <= 0) { + return handleError( + stream, + new MongoOperationTimeoutError( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ), + callback + ); + } + ++stream.state.outstandingRequests; ++outstandingRequests; @@ -393,17 +479,21 @@ function doWrite( return; } - stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( - () => { - --stream.state.outstandingRequests; - --outstandingRequests; - - if (!outstandingRequests) { - checkDone(stream, callback); + stream.chunks + .insertOne(doc, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS }) + .then( + () => { + --stream.state.outstandingRequests; + --outstandingRequests; + + if (!outstandingRequests) { + checkDone(stream, callback); + } + }, + error => { + return handleError(stream, error, callback); } - }, - error => handleError(stream, error, callback) - ); + ); spaceRemaining = stream.chunkSizeBytes; stream.pos = 0; @@ -420,8 +510,6 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void return checkDone(stream, callback); } - ++stream.state.outstandingRequests; - // Create a new buffer to make sure the buffer isn't bigger than it needs // to be. const remnant = Buffer.alloc(stream.pos); @@ -433,13 +521,28 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void return; } - stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then( - () => { - --stream.state.outstandingRequests; - checkDone(stream, callback); - }, - error => handleError(stream, error, callback) - ); + const remainingTimeMS = stream.timeoutContext?.remainingTimeMS; + if (remainingTimeMS != null && remainingTimeMS <= 0) { + return handleError( + stream, + new MongoOperationTimeoutError( + `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms` + ), + callback + ); + } + ++stream.state.outstandingRequests; + stream.chunks + .insertOne(doc, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS }) + .then( + () => { + --stream.state.outstandingRequests; + checkDone(stream, callback); + }, + error => { + return handleError(stream, error, callback); + } + ); } function isAborted(stream: GridFSBucketWriteStream, callback: Callback): boolean { diff --git a/src/timeout.ts b/src/timeout.ts index f7fb3d0daa5..f694b5f4f4f 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -305,6 +305,18 @@ export class CSOTTimeoutContext extends TimeoutContext { this._serverSelectionTimeout?.clear(); this._connectionCheckoutTimeout?.clear(); } + + /** + * @internal + * Throws a MongoOperationTimeoutError if the context has expired. + * If the context has not expired, returns the `remainingTimeMS` + **/ + getRemainingTimeMSOrThrow(message?: string): number { + const { remainingTimeMS } = this; + if (remainingTimeMS <= 0) + throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`); + return remainingTimeMS; + } } /** @internal */ diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index e276c9bbafd..1b8c34633b4 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -3,15 +3,20 @@ import { expect } from 'chai'; import * as semver from 'semver'; import * as sinon from 'sinon'; +import { Readable } from 'stream'; +import { pipeline } from 'stream/promises'; import { type CommandStartedEvent } from '../../../mongodb'; import { type CommandSucceededEvent, + GridFSBucket, MongoBulkWriteError, MongoClient, MongoOperationTimeoutError, MongoServerSelectionError, - now + now, + ObjectId, + promiseWithResolvers } from '../../mongodb'; import { type FailPoint } from '../../tools/utils'; @@ -398,10 +403,42 @@ describe('CSOT spec prose tests', function () { }); }); - context.skip('6. GridFS - Upload', () => { + context('6. GridFS - Upload', () => { + const metadata: MongoDBMetadataUI = { + requires: { mongodb: '>=4.4' } + }; + let internalClient: MongoClient; + let client: MongoClient; + + beforeEach(async function () { + internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .dropCollection('files') + .catch(() => null); + await internalClient + .db('db') + .dropCollection('chunks') + .catch(() => null); + + client = this.configuration.newClient(undefined, { timeoutMS: 100 }); + }); + + afterEach(async function () { + if (internalClient) { + await internalClient + .db() + .admin() + .command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + } + if (client) { + await client.close(); + } + }); /** Tests in this section MUST only be run against server versions 4.4 and higher. */ - context('uploads via openUploadStream can be timed out', () => { + it('uploads via openUploadStream can be timed out', metadata, async function () { /** * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. * 1. Using `internalClient`, set the following fail point: @@ -424,9 +461,30 @@ describe('CSOT spec prose tests', function () { * 1. Call `uploadStream.close()` to flush the stream and insert chunks. * - Expect this to fail with a timeout error. */ + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS: 150 + } + }; + await internalClient.db().admin().command(failpoint); + + const bucket = new GridFSBucket(client.db('db')); + const stream = bucket.openUploadStream('filename'); + const data = Buffer.from('13', 'hex'); + + const fileStream = Readable.from(data); + const maybeError = await pipeline(fileStream, stream).then( + () => null, + error => error + ); + expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); }); - context('Aborting an upload stream can be timed out', () => { + it('Aborting an upload stream can be timed out', metadata, async function () { /** * This test only applies to drivers that provide an API to abort a GridFS upload stream. * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. @@ -450,10 +508,92 @@ describe('CSOT spec prose tests', function () { * 1. Call `uploadStream.abort()`. * - Expect this to fail with a timeout error. */ + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['delete'], + blockConnection: true, + blockTimeMS: 200 + } + }; + + await internalClient.db().admin().command(failpoint); + const bucket = new GridFSBucket(client.db('db'), { chunkSizeBytes: 2 }); + const uploadStream = bucket.openUploadStream('filename', { timeoutMS: 300 }); + + const data = Buffer.from('01020304', 'hex'); + + const { promise: writePromise, resolve, reject } = promiseWithResolvers(); + uploadStream.on('error', error => uploadStream.destroy(error)); + uploadStream.write(data, error => { + if (error) reject(error); + else resolve(); + }); + let maybeError = await writePromise.then( + () => null, + e => e + ); + expect(maybeError).to.be.null; + + maybeError = await uploadStream.abort().then( + () => null, + error => error + ); + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + uploadStream.destroy(); }); }); - context.skip('7. GridFS - Download', () => { + context('7. GridFS - Download', () => { + let internalClient: MongoClient; + let client: MongoClient; + const metadata: MongoDBMetadataUI = { + requires: { mongodb: '>=4.4' } + }; + + beforeEach(async function () { + internalClient = this.configuration.newClient(); + await internalClient + .db('db') + .dropCollection('files') + .catch(() => null); + await internalClient + .db('db') + .dropCollection('chunks') + .catch(() => null); + + const files = await internalClient.db('db').createCollection('files'); + + await files.insertOne({ + _id: new ObjectId('000000000000000000000005'), + length: 10, + chunkSize: 4, + uploadDate: new Date('1970-01-01T00:00:00.000Z'), + md5: '57d83cd477bfb1ccd975ab33d827a92b', + filename: 'length-10', + contentType: 'application/octet-stream', + aliases: [], + metadata: {} + }); + + client = this.configuration.newClient(undefined, { timeoutMS: 100 }); + }); + + afterEach(async function () { + if (internalClient) { + await internalClient + .db() + .admin() + .command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + } + + if (client) { + await client.close(); + } + }); + /** * This test MUST only be run against server versions 4.4 and higher. * 1. Using `internalClient`, drop and re-create the `db.fs.files` and `db.fs.chunks` collections. @@ -495,6 +635,27 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a timeout error. * 1. Verify that two `find` commands were executed during the read: one against `db.fs.files` and another against `db.fs.chunks`. */ + it('download streams can be timed out', metadata, async function () { + const bucket = new GridFSBucket(client.db('db')); + const downloadStream = bucket.openDownloadStream(new ObjectId('000000000000000000000005')); + + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['find'], + blockConnection: true, + blockTimeMS: 150 + } + }; + await internalClient.db().admin().command(failpoint); + + const maybeError = await downloadStream.toArray().then( + () => null, + e => e + ); + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + }); }); context('8. Server Selection', () => { diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts index c2e08cfc80a..49ddabc924b 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts @@ -8,11 +8,6 @@ const skippedSpecs = { 'change-streams': 'TODO(NODE-6035)', 'convenient-transactions': 'TODO(NODE-5687)', 'deprecated-options': 'TODO(NODE-5689)', - 'gridfs-advanced': 'TODO(NODE-6275)', - 'gridfs-delete': 'TODO(NODE-6275)', - 'gridfs-download': 'TODO(NODE-6275)', - 'gridfs-find': 'TODO(NODE-6275)', - 'gridfs-upload': 'TODO(NODE-6275)', 'tailable-awaitData': 'TODO(NODE-6035)', 'tailable-non-awaitData': 'TODO(NODE-6035)' }; diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index 56127cc8ace..b2011ee2e73 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -1,4 +1,7 @@ /* Anything javascript specific relating to timeouts */ +import { once } from 'node:events'; +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; import { setTimeout } from 'node:timers/promises'; import { expect } from 'chai'; @@ -15,11 +18,13 @@ import { Connection, type Db, type FindCursor, + GridFSBucket, LEGACY_HELLO_COMMAND, type MongoClient, MongoInvalidArgumentError, MongoOperationTimeoutError, - MongoServerError + MongoServerError, + ObjectId } from '../../mongodb'; import { type FailPoint } from '../../tools/utils'; @@ -576,6 +581,166 @@ describe('CSOT driver tests', metadata, () => { }); }); + describe('GridFSBucket', () => { + const blockTimeMS = 200; + let internalClient: MongoClient; + let client: MongoClient; + let bucket: GridFSBucket; + + beforeEach(async function () { + client = this.configuration.newClient(undefined, { timeoutMS: 1000 }); + internalClient = this.configuration.newClient(undefined); + }); + + afterEach(async function () { + await client.close(); + await internalClient.db().admin().command({ configureFailPoint: 'failCommand', mode: 'off' }); + await internalClient.close(); + }); + + context('upload', function () { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['insert'], + blockConnection: true, + blockTimeMS + } + }; + + beforeEach(async function () { + await internalClient + .db('db') + .dropDatabase() + .catch(() => null); + await internalClient.db().admin().command(failpoint); + + const db = client.db('db'); + expect(db.timeoutMS).to.equal(1000); + + bucket = new GridFSBucket(client.db('db'), { chunkSizeBytes: 2 }); + }); + + describe('openUploadStream', function () { + it('can override db timeoutMS settings', metadata, async function () { + const data = Buffer.from('01020304', 'hex'); + const uploadStream = bucket.openUploadStream('filename', { timeoutMS: 175 }); + uploadStream.on('error', error => { + uploadStream.destroy(error); + }); + + uploadStream.write(data, error => { + uploadStream.destroy(error); + }); + + const maybeError = await once(uploadStream, 'error'); + expect(maybeError[0]).to.be.instanceOf(MongoOperationTimeoutError); + }); + + it('only emits index event once per bucket', metadata, async function () { + let numEventsSeen = 0; + bucket.on('index', () => numEventsSeen++); + + const uploadStream0 = bucket + .openUploadStream('filename') + .on('error', error => uploadStream0.destroy(error)); + const uploadStream1 = bucket + .openUploadStream('filename') + .on('error', error => uploadStream1.destroy(error)); + + const data = Buffer.from('test', 'utf-8'); + await pipeline(Readable.from(data), uploadStream0); + await pipeline(Readable.from(data), uploadStream1); + + expect(numEventsSeen).to.equal(1); + }); + }); + + describe('openUploadStreamWithId', function () { + it('can override db timeoutMS settings', metadata, async function () { + const data = Buffer.from('01020304', 'hex'); + const uploadStream = bucket.openUploadStreamWithId(new ObjectId(), 'filename', { + timeoutMS: 175 + }); + uploadStream.on('error', error => { + uploadStream.destroy(error); + }); + + uploadStream.write(data, error => { + uploadStream.destroy(error); + }); + + const maybeError = await once(uploadStream, 'error'); + expect(maybeError[0]).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + }); + + context('download', function () { + const failpoint: FailPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: ['find'], + blockConnection: true, + blockTimeMS + } + }; + const _id = new ObjectId('000000000000000000000005'); + + beforeEach(async function () { + await internalClient + .db('db') + .dropDatabase() + .catch(() => null); + + const files = await internalClient.db('db').createCollection('files'); + await files.insertOne({ + _id, + length: 10, + chunkSize: 4, + uploadDate: new Date('1970-01-01T00:00:00.000Z'), + md5: '57d83cd477bfb1ccd975ab33d827a92b', + filename: 'length-10', + contentType: 'application/octet-stream', + aliases: [], + metadata: {} + }); + + await internalClient.db().admin().command(failpoint); + + const db = client.db('db'); + expect(db.timeoutMS).to.equal(1000); + + bucket = new GridFSBucket(db); + }); + + describe('openDownloadStream', function () { + it('can override db timeoutMS settings', metadata, async function () { + const downloadStream = bucket.openDownloadStream(_id, { timeoutMS: 80 }); + const maybeError = await downloadStream.toArray().then( + () => null, + e => e + ); + + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + + describe('openDownloadStreamByName', function () { + it('can override db timeoutMS settings', metadata, async function () { + const downloadStream = bucket.openDownloadStreamByName('length-10', { timeoutMS: 80 }); + const maybeError = await downloadStream.toArray().then( + () => null, + e => e + ); + expect(maybeError).to.be.instanceOf(MongoOperationTimeoutError); + }); + }); + }); + }); + describe('when using an explicit session', () => { const metadata: MongoDBMetadataUI = { requires: { topology: ['replicaset'], mongodb: '>=4.4' } diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 31414fa4664..a9f79842c31 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -11,6 +11,7 @@ import { CommandStartedEvent, Db, type Document, + GridFSBucket, type MongoClient, MongoError, ReadConcern, @@ -311,7 +312,7 @@ operations.set('dropCollection', async ({ entities, operation }) => { operations.set('drop', async ({ entities, operation }) => { const bucket = entities.getEntity('bucket', operation.object); - return bucket.drop(); + return bucket.drop(operation.arguments); }); operations.set('dropIndexes', async ({ entities, operation }) => { @@ -529,7 +530,8 @@ operations.set('targetedFailPoint', async ({ entities, operation }) => { operations.set('delete', async ({ entities, operation }) => { const bucket = entities.getEntity('bucket', operation.object); - return bucket.delete(operation.arguments!.id); + const { id, ...opts } = operation.arguments; + return bucket.delete(id, opts); }); operations.set('download', async ({ entities, operation }) => { @@ -537,7 +539,8 @@ operations.set('download', async ({ entities, operation }) => { const { id, ...options } = operation.arguments ?? {}; const stream = bucket.openDownloadStream(id, options); - return Buffer.concat(await stream.toArray()); + const data = Buffer.concat(await stream.toArray()); + return data; }); operations.set('downloadByName', async ({ entities, operation }) => { @@ -552,7 +555,6 @@ operations.set('downloadByName', async ({ entities, operation }) => { operations.set('upload', async ({ entities, operation }) => { const bucket = entities.getEntity('bucket', operation.object); const { filename, source, ...options } = operation.arguments ?? {}; - const stream = bucket.openUploadStream(filename, options); const fileStream = Readable.from(Buffer.from(source.$$hexBytes, 'hex')); @@ -832,9 +834,30 @@ operations.set('updateOne', async ({ entities, operation }) => { }); operations.set('rename', async ({ entities, operation }) => { - const collection = entities.getEntity('collection', operation.object); - const { to, ...options } = operation.arguments!; - return collection.rename(to, options); + let entity: GridFSBucket | Collection | undefined; + try { + entity = entities.getEntity('collection', operation.object, false); + } catch { + // Ignore wrong type error + } + + if (entity instanceof Collection) { + const { to, ...options } = operation.arguments!; + return entity.rename(to, options); + } + + try { + entity = entities.getEntity('bucket', operation.object, false); + } catch { + // Ignore wrong type error + } + + if (entity instanceof GridFSBucket) { + const { id, newFilename, ...opts } = operation.arguments!; + return entity.rename(id, newFilename, opts as any); + } + + expect.fail(`No collection or bucket with name '${operation.object}' found`); }); operations.set('createDataKey', async ({ entities, operation }) => {