Skip to content

Commit

Permalink
feat(NODE-6275): Add CSOT support to GridFS (#4246)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
Co-authored-by: Bailey Pearson <bailey.pearson@mongodb.com>
  • Loading branch information
3 people committed Nov 1, 2024
1 parent d210d31 commit c71a450
Show file tree
Hide file tree
Showing 11 changed files with 634 additions and 88 deletions.
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"mocha": "^10.4.0",
"mocha-sinon": "^2.1.2",
"mongodb-client-encryption": "^6.1.0",
"mongodb-legacy": "^6.1.1",
"mongodb-legacy": "^6.1.2",
"nyc": "^15.1.0",
"prettier": "^3.3.3",
"semver": "^7.6.3",
Expand Down
10 changes: 8 additions & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,18 @@ export class Collection<TSchema extends Document = Document> {
*/
async findOne(): Promise<WithId<TSchema> | null>;
async findOne(filter: Filter<TSchema>): Promise<WithId<TSchema> | null>;
async findOne(filter: Filter<TSchema>, options: FindOptions): Promise<WithId<TSchema> | null>;
async findOne(
filter: Filter<TSchema>,
options: Omit<FindOptions, 'timeoutMode'>
): Promise<WithId<TSchema> | null>;

// allow an override of the schema.
async findOne<T = TSchema>(): Promise<T | null>;
async findOne<T = TSchema>(filter: Filter<TSchema>): Promise<T | null>;
async findOne<T = TSchema>(filter: Filter<TSchema>, options?: FindOptions): Promise<T | null>;
async findOne<T = TSchema>(
filter: Filter<TSchema>,
options?: Omit<FindOptions, 'timeoutMode'>
): Promise<T | null>;

async findOne(
filter: Filter<TSchema> = {},
Expand Down
44 changes: 40 additions & 4 deletions src/gridfs/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Readable } from 'stream';

import type { Document, ObjectId } from '../bson';
import type { Collection } from '../collection';
import { CursorTimeoutMode } from '../cursor/abstract_cursor';
import type { FindCursor } from '../cursor/find_cursor';
import {
MongoGridFSChunkError,
Expand All @@ -12,6 +13,7 @@ import {
import type { FindOptions } from '../operations/find';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import type { Callback } from '../utils';
import type { GridFSChunk } from './upload';

Expand All @@ -28,7 +30,7 @@ export interface GridFSBucketReadStreamOptions {
* to be returned by the stream. `end` is non-inclusive
*/
end?: number;
/** @internal TODO(NODE-5688): make this public */
/** @public */
timeoutMS?: number;
}

Expand Down Expand Up @@ -98,8 +100,10 @@ export interface GridFSBucketReadStreamPrivate {
skip?: number;
start: number;
end: number;
timeoutMS?: number;
};
readPreference?: ReadPreference;
timeoutContext?: CSOTTimeoutContext;
}

/**
Expand Down Expand Up @@ -148,7 +152,11 @@ export class GridFSBucketReadStream extends Readable {
end: 0,
...options
},
readPreference
readPreference,
timeoutContext:
options?.timeoutMS != null
? new CSOTTimeoutContext({ timeoutMS: options.timeoutMS, serverSelectionTimeoutMS: 0 })
: undefined
};
}

Expand Down Expand Up @@ -196,7 +204,8 @@ export class GridFSBucketReadStream extends Readable {
async abort(): Promise<void> {
this.push(null);
this.destroy();
await this.s.cursor?.close();
const remainingTimeMS = this.s.timeoutContext?.getRemainingTimeMSOrThrow();
await this.s.cursor?.close({ timeoutMS: remainingTimeMS });
}
}

Expand Down Expand Up @@ -352,7 +361,22 @@ function init(stream: GridFSBucketReadStream): void {
filter['n'] = { $gte: skip };
}
}
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });

let remainingTimeMS: number | undefined;
try {
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
);
} catch (error) {
return stream.destroy(error);
}

stream.s.cursor = stream.s.chunks
.find(filter, {
timeoutMode: stream.s.options.timeoutMS != null ? CursorTimeoutMode.LIFETIME : undefined,
timeoutMS: remainingTimeMS
})
.sort({ n: 1 });

if (stream.s.readPreference) {
stream.s.cursor.withReadPreference(stream.s.readPreference);
Expand All @@ -371,6 +395,18 @@ function init(stream: GridFSBucketReadStream): void {
return;
};

let remainingTimeMS: number | undefined;
try {
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
);
} catch (error) {
if (!stream.destroyed) stream.destroy(error);
return;
}

findOneOptions.timeoutMS = remainingTimeMS;

stream.s.files.findOne(stream.s.filter, findOneOptions).then(handleReadResult, error => {
if (stream.destroyed) return;
stream.destroy(error);
Expand Down
74 changes: 59 additions & 15 deletions src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import type { ObjectId } from '../bson';
import type { Collection } from '../collection';
import type { FindCursor } from '../cursor/find_cursor';
import type { Db } from '../db';
import { MongoRuntimeError } from '../error';
import { MongoOperationTimeoutError, MongoRuntimeError } from '../error';
import { type Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import { resolveOptions } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -48,6 +50,7 @@ export interface GridFSBucketPrivate {
chunkSizeBytes: number;
readPreference?: ReadPreference;
writeConcern: WriteConcern | undefined;
timeoutMS?: number;
};
_chunksCollection: Collection<GridFSChunk>;
_filesCollection: Collection<GridFSFile>;
Expand Down Expand Up @@ -81,11 +84,11 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
constructor(db: Db, options?: GridFSBucketOptions) {
super();
this.setMaxListeners(0);
const privateOptions = {
const privateOptions = resolveOptions(db, {
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
...options,
writeConcern: WriteConcern.fromOptions(options)
};
});
this.s = {
db,
options: privateOptions,
Expand All @@ -109,7 +112,10 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
filename: string,
options?: GridFSBucketWriteStreamOptions
): GridFSBucketWriteStream {
return new GridFSBucketWriteStream(this, filename, options);
return new GridFSBucketWriteStream(this, filename, {
timeoutMS: this.s.options.timeoutMS,
...options
});
}

/**
Expand All @@ -122,7 +128,11 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
filename: string,
options?: GridFSBucketWriteStreamOptions
): GridFSBucketWriteStream {
return new GridFSBucketWriteStream(this, filename, { ...options, id });
return new GridFSBucketWriteStream(this, filename, {
timeoutMS: this.s.options.timeoutMS,
...options,
id
});
}

/** Returns a readable stream (GridFSBucketReadStream) for streaming file data from GridFS. */
Expand All @@ -135,7 +145,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
this.s._filesCollection,
this.s.options.readPreference,
{ _id: id },
options
{ timeoutMS: this.s.options.timeoutMS, ...options }
);
}

Expand All @@ -144,11 +154,27 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
*
* @param id - The id of the file doc
*/
async delete(id: ObjectId): Promise<void> {
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });
async delete(id: ObjectId, options?: { timeoutMS: number }): Promise<void> {
const { timeoutMS } = resolveOptions(this.s.db, options);
let timeoutContext: CSOTTimeoutContext | undefined = undefined;

if (timeoutMS) {
timeoutContext = new CSOTTimeoutContext({
timeoutMS,
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
});
}

const { deletedCount } = await this.s._filesCollection.deleteOne(
{ _id: id },
{ timeoutMS: timeoutContext?.remainingTimeMS }
);

const remainingTimeMS = timeoutContext?.remainingTimeMS;
if (remainingTimeMS != null && remainingTimeMS <= 0)
throw new MongoOperationTimeoutError(`Timed out after ${timeoutMS}ms`);
// Delete orphaned chunks before returning FileNotFound
await this.s._chunksCollection.deleteMany({ files_id: id });
await this.s._chunksCollection.deleteMany({ files_id: id }, { timeoutMS: remainingTimeMS });

if (deletedCount === 0) {
// TODO(NODE-3483): Replace with more appropriate error
Expand Down Expand Up @@ -188,7 +214,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
this.s._filesCollection,
this.s.options.readPreference,
{ filename },
{ ...options, sort, skip }
{ timeoutMS: this.s.options.timeoutMS, ...options, sort, skip }
);
}

Expand All @@ -198,18 +224,36 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
* @param id - the id of the file to rename
* @param filename - new name for the file
*/
async rename(id: ObjectId, filename: string): Promise<void> {
async rename(id: ObjectId, filename: string, options?: { timeoutMS: number }): Promise<void> {
const filter = { _id: id };
const update = { $set: { filename } };
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update, options);
if (matchedCount === 0) {
throw new MongoRuntimeError(`File with id ${id} not found`);
}
}

/** Removes this bucket's files collection, followed by its chunks collection. */
async drop(): Promise<void> {
await this.s._filesCollection.drop();
await this.s._chunksCollection.drop();
async drop(options?: { timeoutMS: number }): Promise<void> {
const { timeoutMS } = resolveOptions(this.s.db, options);
let timeoutContext: CSOTTimeoutContext | undefined = undefined;

if (timeoutMS) {
timeoutContext = new CSOTTimeoutContext({
timeoutMS,
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
});
}

if (timeoutContext) {
await this.s._filesCollection.drop({ timeoutMS: timeoutContext.remainingTimeMS });
const remainingTimeMS = timeoutContext.getRemainingTimeMSOrThrow(
`Timed out after ${timeoutMS}ms`
);
await this.s._chunksCollection.drop({ timeoutMS: remainingTimeMS });
} else {
await this.s._filesCollection.drop();
await this.s._chunksCollection.drop();
}
}
}
Loading

0 comments on commit c71a450

Please sign in to comment.