Skip to content

Commit 514f8b3

Browse files
committed
feat(NODE-5019): add runCursorCommand API
1 parent adef3f7 commit 514f8b3

17 files changed

+2259
-17
lines changed

etc/sync-wip-spec.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#! /usr/bin/env bash
2+
3+
set -o xtrace
4+
set -o errexit
5+
6+
pushd "$HOME/code/drivers/specifications/source"
7+
make
8+
popd
9+
10+
SOURCE="$HOME/code/drivers/specifications/source/run-command/tests/unified"
11+
12+
for file in $SOURCE/*; do
13+
cp "$file" "test/spec/run-command/$(basename "$file")"
14+
done
15+
16+
# cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/invalid/entity-createRunCursorCommand.yml" test/spec/unified-test-format/invalid/entity-createRunCursorCommand.yml
17+
# cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/invalid/entity-createRunCursorCommand.json" test/spec/unified-test-format/invalid/entity-createRunCursorCommand.json
18+
19+
cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/valid-pass/entity-commandCursor.yml" test/spec/unified-test-format/valid-pass/entity-commandCursor.yml
20+
cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/valid-pass/entity-commandCursor.json" test/spec/unified-test-format/valid-pass/entity-commandCursor.json
21+
22+
23+
if [ -z "$MONGODB_URI" ]; then echo "must set uri" && exit 1; fi
24+
export MONGODB_URI=$MONGODB_URI
25+
npm run check:test -- -g '(RunCommand spec)|(Unified test format runner runCursorCommand)'

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ export abstract class AbstractCursor<
608608
abstract clone(): AbstractCursor<TSchema>;
609609

610610
/** @internal */
611-
abstract _initialize(
611+
protected abstract _initialize(
612612
session: ClientSession | undefined,
613613
callback: Callback<ExecutionResult>
614614
): void;

src/cursor/run_command_cursor.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import type { BSONSerializeOptions, Document, Long } from '../bson';
2+
import type { Db } from '../db';
3+
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
4+
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
5+
import { GetMoreOperation } from '../operations/get_more';
6+
import { RunCommandOperation } from '../operations/run_command';
7+
import type { ReadConcernLike } from '../read_concern';
8+
import type { ReadPreferenceLike } from '../read_preference';
9+
import type { ClientSession } from '../sessions';
10+
import { Callback, ns } from '../utils';
11+
import { AbstractCursor } from './abstract_cursor';
12+
13+
/** @public */
14+
export type RunCommandCursorOptions = {
15+
readPreference?: ReadPreferenceLike;
16+
session?: ClientSession;
17+
} & BSONSerializeOptions;
18+
19+
/** @internal */
20+
type RunCursorCommandResponse = {
21+
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
22+
ok: 1;
23+
};
24+
25+
/** @public */
26+
export class RunCommandCursor extends AbstractCursor {
27+
public readonly command: Readonly<Record<string, any>>;
28+
public readonly getMoreOptions: {
29+
comment?: any;
30+
maxAwaitTimeMS?: number;
31+
batchSize?: number;
32+
} = {};
33+
34+
/**
35+
* Controls the `getMore.comment` field
36+
* @param comment - any BSON value
37+
*/
38+
public setComment(comment: any): this {
39+
this.getMoreOptions.comment = comment;
40+
return this;
41+
}
42+
43+
/**
44+
* Controls the `getMore.maxTimeMS` field. Only valid when cursor is tailable await
45+
* @param maxTimeMS - the number of milliseconds to wait for new data
46+
*/
47+
public setMaxTimeMS(maxTimeMS: number): this {
48+
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
49+
return this;
50+
}
51+
52+
/**
53+
* Controls the `getMore.batchSize` field
54+
* @param maxTimeMS - the number documents to return in the `nextBatch`
55+
*/
56+
public setBatchSize(batchSize: number): this {
57+
this.getMoreOptions.batchSize = batchSize;
58+
return this;
59+
}
60+
61+
public clone(): never {
62+
throw new MongoAPIError('RunCommandCursor cannot be cloned');
63+
}
64+
65+
public override withReadConcern(_: ReadConcernLike): never {
66+
throw new MongoAPIError(
67+
'RunCommandCursor does not support readConcern it must be attached to the command being run'
68+
);
69+
}
70+
71+
public override addCursorFlag(_: string, __: boolean): never {
72+
throw new MongoAPIError(
73+
'RunCommandCursor does not support cursor flags, they must be attached to the command being run'
74+
);
75+
}
76+
77+
public override maxTimeMS(_: number): never {
78+
throw new MongoAPIError(
79+
'RunCommandCursor does not support maxTimeMS, it must be attached to the command being run'
80+
);
81+
}
82+
83+
public override batchSize(_: number): never {
84+
throw new MongoAPIError(
85+
'RunCommandCursor does not support batchSize, it must be attached to the command being run'
86+
);
87+
}
88+
89+
/** @internal */
90+
private db: Db | undefined;
91+
92+
/** @internal */
93+
constructor(db: Db, command: Document, options: RunCommandCursorOptions = {}) {
94+
super(db.s.client, ns(db.namespace), options);
95+
this.db = db;
96+
this.command = Object.freeze({ ...command });
97+
}
98+
99+
/** @internal */
100+
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
101+
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
102+
...this.cursorOptions,
103+
session: session,
104+
readPreference: this.cursorOptions.readPreference
105+
});
106+
executeOperation(this.client, operation).then(
107+
response => {
108+
if (!response.cursor) {
109+
callback(
110+
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
111+
);
112+
return;
113+
}
114+
callback(undefined, {
115+
server: operation.server,
116+
session,
117+
response
118+
});
119+
},
120+
err => callback(err)
121+
);
122+
}
123+
124+
/** @internal */
125+
override _getMore(_batchSize: number, callback: Callback<Document>) {
126+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
127+
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
128+
...this.cursorOptions,
129+
session: this.session,
130+
...this.getMoreOptions
131+
});
132+
133+
executeOperation(this.client, getMoreOperation, callback);
134+
}
135+
}

src/db.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
55
import * as CONSTANTS from './constants';
66
import { AggregationCursor } from './cursor/aggregation_cursor';
77
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
8+
import { RunCommandCursor, type RunCommandCursorOptions } from './cursor/run_command_cursor';
89
import { MongoAPIError, MongoInvalidArgumentError } from './error';
910
import type { MongoClient, PkFactory } from './mongo_client';
1011
import type { TODO_NODE_3286 } from './mongo_types';
@@ -523,6 +524,19 @@ export class Db {
523524

524525
return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
525526
}
527+
528+
/**
529+
* A low level cursor API providing basic driver functionality.
530+
* - ClientSession management
531+
* - ReadPreference for server selection
532+
* - Running getMores automatically when a local batch is exhausted
533+
*
534+
* @param command - The command that will start a cursor on the server.
535+
* @param options - Configurations for running the command, bson options will apply to getMores
536+
*/
537+
runCursorCommand(command: Document, options?: RunCommandCursorOptions): RunCommandCursor {
538+
return new RunCommandCursor(this, command, options);
539+
}
526540
}
527541

528542
// TODO(NODE-3484): Refactor into MongoDBNamespace

src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
88
import { FindCursor } from './cursor/find_cursor';
99
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
1010
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
11+
import type { RunCommandCursor } from './cursor/run_command_cursor';
1112
import { Db } from './db';
1213
import { GridFSBucket } from './gridfs';
1314
import { GridFSBucketReadStream } from './gridfs/download';
@@ -87,6 +88,7 @@ export {
8788
ListIndexesCursor,
8889
MongoClient,
8990
OrderedBulkOperation,
91+
RunCommandCursor,
9092
UnorderedBulkOperation
9193
};
9294

@@ -275,6 +277,7 @@ export type {
275277
ChangeStreamAggregateRawResult,
276278
ChangeStreamCursorOptions
277279
} from './cursor/change_stream_cursor';
280+
export type { RunCommandCursorOptions } from './cursor/run_command_cursor';
278281
export type { DbOptions, DbPrivate } from './db';
279282
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
280283
export type { Encrypter, EncrypterOptions } from './encrypter';

test/integration/run-command/run_command.spec.test.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,13 @@ import { loadSpecTests } from '../../spec';
22
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
33

44
describe('RunCommand spec', () => {
5-
runUnifiedSuite(loadSpecTests('run-command'));
5+
runUnifiedSuite(loadSpecTests('run-command'), test => {
6+
if (test.description.includes('timeoutMS') || test.description.includes('timeoutMode')) {
7+
return 'CSOT not implemented in Node.js yet';
8+
}
9+
if (test.description === 'does not attach $readPreference to given command on standalone') {
10+
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
11+
}
12+
return false;
13+
});
614
});
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { expect } from 'chai';
2+
3+
import { CommandStartedEvent, Db, MongoClient } from '../../mongodb';
4+
5+
describe('class RunCommandCursor', () => {
6+
let client: MongoClient;
7+
let db: Db;
8+
let commandsStarted: CommandStartedEvent[];
9+
10+
beforeEach(async function () {
11+
client = this.configuration.newClient({}, { monitorCommands: true });
12+
db = client.db();
13+
await db.dropDatabase().catch(() => null);
14+
await db
15+
.collection<{ _id: number }>('collection')
16+
.insertMany([{ _id: 0 }, { _id: 1 }, { _id: 2 }]);
17+
commandsStarted = [];
18+
client.on('commandStarted', started => commandsStarted.push(started));
19+
});
20+
21+
afterEach(async function () {
22+
commandsStarted = [];
23+
await client.close();
24+
});
25+
26+
it('should only run init command once', async () => {
27+
const cursor = db.runCursorCommand({ find: 'collection', filter: {}, batchSize: 1 });
28+
cursor.setBatchSize(1);
29+
const it0 = cursor[Symbol.asyncIterator]();
30+
const it1 = cursor[Symbol.asyncIterator]();
31+
32+
const next0it0 = await it0.next(); // find, 1 doc
33+
const next0it1 = await it1.next(); // getMore, 1 doc
34+
35+
expect(next0it0).to.deep.equal({ value: { _id: 0 }, done: false });
36+
expect(next0it1).to.deep.equal({ value: { _id: 1 }, done: false });
37+
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(2);
38+
39+
const next1it0 = await it0.next(); // getMore, 1 doc
40+
const next1it1 = await it1.next(); // getMore, 0 doc & exhausted id
41+
42+
expect(next1it0).to.deep.equal({ value: { _id: 2 }, done: false });
43+
expect(next1it1).to.deep.equal({ value: undefined, done: true });
44+
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(4);
45+
46+
const next2it0 = await it0.next();
47+
const next2it1 = await it1.next();
48+
49+
expect(next2it0).to.deep.equal({ value: undefined, done: true });
50+
expect(next2it1).to.deep.equal({ value: undefined, done: true });
51+
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(4);
52+
});
53+
});

test/mongodb.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
138138
export * from '../src/cursor/find_cursor';
139139
export * from '../src/cursor/list_collections_cursor';
140140
export * from '../src/cursor/list_indexes_cursor';
141+
export * from '../src/cursor/run_command_cursor';
141142
export * from '../src/db';
142143
export * from '../src/deps';
143144
export * from '../src/encrypter';

0 commit comments

Comments
 (0)