Skip to content

Commit

Permalink
feat(NODE-5019): add runCursorCommand API (#3655)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored May 24, 2023
1 parent bf413e5 commit 4da926e
Show file tree
Hide file tree
Showing 20 changed files with 3,146 additions and 32 deletions.
3 changes: 2 additions & 1 deletion .evergreen/run-serverless-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ npx mocha \
test/integration/transactions/transactions.test.ts \
test/integration/versioned-api/versioned_api.spec.test.js \
test/integration/load-balancers/load_balancers.spec.test.js \
test/integration/client-side-encryption/client_side_encryption.spec.test.ts
test/integration/client-side-encryption/client_side_encryption.spec.test.ts \
test/integration/run-command/run_command.spec.test.ts
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ export abstract class AbstractCursor<
abstract clone(): AbstractCursor<TSchema>;

/** @internal */
abstract _initialize(
protected abstract _initialize(
session: ClientSession | undefined,
callback: Callback<ExecutionResult>
): void;
Expand Down
140 changes: 140 additions & 0 deletions src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import type { BSONSerializeOptions, Document, Long } from '../bson';
import type { Db } from '../db';
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { RunCommandOperation } from '../operations/run_command';
import type { ReadConcernLike } from '../read_concern';
import type { ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Callback, ns } from '../utils';
import { AbstractCursor } from './abstract_cursor';

/** @public */
export type RunCursorCommandOptions = {
readPreference?: ReadPreferenceLike;
session?: ClientSession;
} & BSONSerializeOptions;

/** @internal */
type RunCursorCommandResponse = {
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
ok: 1;
};

/** @public */
export class RunCommandCursor extends AbstractCursor {
public readonly command: Readonly<Record<string, any>>;
public readonly getMoreOptions: {
comment?: any;
maxAwaitTimeMS?: number;
batchSize?: number;
} = {};

/**
* Controls the `getMore.comment` field
* @param comment - any BSON value
*/
public setComment(comment: any): this {
this.getMoreOptions.comment = comment;
return this;
}

/**
* Controls the `getMore.maxTimeMS` field. Only valid when cursor is tailable await
* @param maxTimeMS - the number of milliseconds to wait for new data
*/
public setMaxTimeMS(maxTimeMS: number): this {
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
return this;
}

/**
* Controls the `getMore.batchSize` field
* @param maxTimeMS - the number documents to return in the `nextBatch`
*/
public setBatchSize(batchSize: number): this {
this.getMoreOptions.batchSize = batchSize;
return this;
}

/** Unsupported for RunCommandCursor */
public override clone(): never {
throw new MongoAPIError('Clone not supported, create a new cursor with db.runCursorCommand');
}

/** Unsupported for RunCommandCursor: readConcern must be configured directly on command document */
public override withReadConcern(_: ReadConcernLike): never {
throw new MongoAPIError(
'RunCommandCursor does not support readConcern it must be attached to the command being run'
);
}

/** Unsupported for RunCommandCursor: various cursor flags must be configured directly on command document */
public override addCursorFlag(_: string, __: boolean): never {
throw new MongoAPIError(
'RunCommandCursor does not support cursor flags, they must be attached to the command being run'
);
}

/** Unsupported for RunCommandCursor: maxTimeMS must be configured directly on command document */
public override maxTimeMS(_: number): never {
throw new MongoAPIError(
'maxTimeMS must be configured on the command document directly, to configure getMore.maxTimeMS use cursor.setMaxTimeMS()'
);
}

/** Unsupported for RunCommandCursor: batchSize must be configured directly on command document */
public override batchSize(_: number): never {
throw new MongoAPIError(
'batchSize must be configured on the command document directly, to configure getMore.batchSize use cursor.setBatchSize()'
);
}

/** @internal */
private db: Db;

/** @internal */
constructor(db: Db, command: Document, options: RunCursorCommandOptions = {}) {
super(db.s.client, ns(db.namespace), options);
this.db = db;
this.command = Object.freeze({ ...command });
}

/** @internal */
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
...this.cursorOptions,
session: session,
readPreference: this.cursorOptions.readPreference
});
executeOperation(this.client, operation).then(
response => {
if (response.cursor == null) {
callback(
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
);
return;
}
callback(undefined, {
server: operation.server,
session,
response
});
},
err => callback(err)
);
}

/** @internal */
override _getMore(_batchSize: number, callback: Callback<Document>) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
...this.cursorOptions,
session: this.session,
...this.getMoreOptions
});

executeOperation(this.client, getMoreOperation, callback);
}
}
14 changes: 14 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
import * as CONSTANTS from './constants';
import { AggregationCursor } from './cursor/aggregation_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor';
import { MongoAPIError, MongoInvalidArgumentError } from './error';
import type { MongoClient, PkFactory } from './mongo_client';
import type { TODO_NODE_3286 } from './mongo_types';
Expand Down Expand Up @@ -523,6 +524,19 @@ export class Db {

return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
}

/**
* A low level cursor API providing basic driver functionality:
* - ClientSession management
* - ReadPreference for server selection
* - Running getMores automatically when a local batch is exhausted
*
* @param command - The command that will start a cursor on the server.
* @param options - Configurations for running the command, bson options will apply to getMores
*/
runCursorCommand(command: Document, options?: RunCursorCommandOptions): RunCommandCursor {
return new RunCommandCursor(this, command, options);
}
}

// TODO(NODE-3484): Refactor into MongoDBNamespace
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
import { FindCursor } from './cursor/find_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
import type { RunCommandCursor } from './cursor/run_command_cursor';
import { Db } from './db';
import { GridFSBucket } from './gridfs';
import { GridFSBucketReadStream } from './gridfs/download';
Expand Down Expand Up @@ -87,6 +88,7 @@ export {
ListIndexesCursor,
MongoClient,
OrderedBulkOperation,
RunCommandCursor,
UnorderedBulkOperation
};

Expand Down Expand Up @@ -275,6 +277,7 @@ export type {
ChangeStreamAggregateRawResult,
ChangeStreamCursorOptions
} from './cursor/change_stream_cursor';
export type { RunCursorCommandOptions } from './cursor/run_command_cursor';
export type { DbOptions, DbPrivate } from './db';
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
export type { Encrypter, EncrypterOptions } from './encrypter';
Expand Down
Empty file.
7 changes: 6 additions & 1 deletion test/integration/run-command/run_command.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,10 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('RunCommand spec', () => {
runUnifiedSuite(loadSpecTests('run-command'));
runUnifiedSuite(loadSpecTests('run-command'), test => {
if (test.description === 'does not attach $readPreference to given command on standalone') {
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
}
return false;
});
});
50 changes: 50 additions & 0 deletions test/integration/run-command/run_cursor_command.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { expect } from 'chai';

import { Db, MongoClient } from '../../mongodb';

describe('runCursorCommand API', () => {
let client: MongoClient;
let db: Db;

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
db = client.db();
await db.dropDatabase().catch(() => null);
await db
.collection<{ _id: number }>('collection')
.insertMany([{ _id: 0 }, { _id: 1 }, { _id: 2 }]);
});

afterEach(async function () {
await client.close();
});

it('returns each document only once across multiple iterators', async () => {
const cursor = db.runCursorCommand({ find: 'collection', filter: {}, batchSize: 1 });
cursor.setBatchSize(1);

const a = cursor[Symbol.asyncIterator]();
const b = cursor[Symbol.asyncIterator]();

// Interleaving calls to A and B
const results = [
await a.next(), // find, first doc
await b.next(), // getMore, second doc

await a.next(), // getMore, third doc
await b.next(), // getMore, no doc & exhausted id, a.k.a. done

await a.next(), // done
await b.next() // done
];

expect(results).to.deep.equal([
{ value: { _id: 0 }, done: false },
{ value: { _id: 1 }, done: false },
{ value: { _id: 2 }, done: false },
{ value: undefined, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true }
]);
});
});
1 change: 1 addition & 0 deletions test/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
export * from '../src/cursor/find_cursor';
export * from '../src/cursor/list_collections_cursor';
export * from '../src/cursor/list_indexes_cursor';
export * from '../src/cursor/run_command_cursor';
export * from '../src/db';
export * from '../src/deps';
export * from '../src/encrypter';
Expand Down
Loading

0 comments on commit 4da926e

Please sign in to comment.