Skip to content

Commit 7851975

Browse files
committed
sync and clean
1 parent 332a396 commit 7851975

File tree

6 files changed

+68
-215
lines changed

6 files changed

+68
-215
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ export abstract class AbstractCursor<
608608
abstract clone(): AbstractCursor<TSchema>;
609609

610610
/** @internal */
611-
abstract _initialize(
611+
protected abstract _initialize(
612612
session: ClientSession | undefined,
613613
callback: Callback<ExecutionResult>
614614
): void;

src/cursor/run_command_cursor.ts

Lines changed: 52 additions & 198 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
import type { BSONSerializeOptions, Document, Long } from '../bson';
22
import type { Db } from '../db';
3-
import {
4-
MongoCursorExhaustedError,
5-
MongoError,
6-
MongoRuntimeError,
7-
MongoUnexpectedServerResponseError
8-
} from '../error';
9-
import type { MongoClient } from '../mongo_client';
10-
import { executeOperation } from '../operations/execute_operation';
3+
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
4+
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
115
import { GetMoreOperation } from '../operations/get_more';
12-
import { KillCursorsOperation } from '../operations/kill_cursors';
136
import { RunCommandOperation } from '../operations/run_command';
14-
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
15-
import type { Server } from '../sdam/server';
16-
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
17-
import { List, MongoDBNamespace, ns } from '../utils';
7+
import type { ReadPreferenceLike } from '../read_preference';
8+
import type { ClientSession } from '../sessions';
9+
import { Callback, ns } from '../utils';
10+
import { AbstractCursor } from './abstract_cursor';
1811

1912
/** @public */
2013
export type RunCommandCursorOptions = {
@@ -29,213 +22,74 @@ type RunCursorCommandResponse = {
2922
};
3023

3124
/** @public */
32-
export class RunCommandCursor {
33-
/** @internal */
34-
client: MongoClient;
35-
/** @internal */
36-
db: Db;
37-
/** @internal */
38-
options: RunCommandCursorOptions;
39-
/** @internal */
40-
documents: List<Document>;
41-
/** @internal */
42-
getMoreOptions: { comment?: any; maxAwaitTimeMS?: number; batchSize?: number } = {};
43-
/** @internal */
44-
id: bigint | null = null;
45-
/** @internal */
46-
ns: MongoDBNamespace | null = null;
47-
/** @internal */
48-
server: Server | null = null;
49-
/** @internal */
50-
readPreference: ReadPreference;
51-
/** @internal */
52-
session: ClientSession;
53-
/** @internal */
54-
done = false;
55-
56-
readonly command: Readonly<Record<string, any>>;
57-
58-
set comment(comment: any) {
25+
export class RunCommandCursor extends AbstractCursor {
26+
public readonly command: Readonly<Record<string, any>>;
27+
public readonly getMoreOptions: {
28+
comment?: any;
29+
maxAwaitTimeMS?: number;
30+
batchSize?: number;
31+
} = {};
32+
33+
public setComment(comment: any) {
5934
this.getMoreOptions.comment = comment;
6035
}
6136

62-
get comment() {
63-
return this.getMoreOptions.comment;
64-
}
65-
66-
set maxTimeMS(maxAwaitTimeMS: number) {
67-
this.getMoreOptions.maxAwaitTimeMS = maxAwaitTimeMS;
37+
public setMaxTimeMS(maxTimeMS: number) {
38+
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
6839
}
6940

70-
get maxTimeMS() {
71-
return this.getMoreOptions.maxAwaitTimeMS ?? 0;
72-
}
73-
74-
set batchSize(batchSize: number) {
41+
public setBatchSize(batchSize: number) {
7542
this.getMoreOptions.batchSize = batchSize;
7643
}
7744

78-
get batchSize() {
79-
return this.getMoreOptions.batchSize ?? 0;
45+
public clone(): never {
46+
throw new MongoAPIError('RunCommandCursor cannot be cloned');
8047
}
8148

8249
/** @internal */
83-
constructor(
84-
client: MongoClient,
85-
db: Db,
86-
command: Document | Map<string, any>,
87-
options: RunCommandCursorOptions = {}
88-
) {
89-
this.client = client;
90-
this.db = db;
91-
this.command =
92-
Symbol.toStringTag in command && command[Symbol.toStringTag] === 'Map'
93-
? Object.freeze(Object.fromEntries(command.entries()))
94-
: Object.freeze({ ...command });
95-
this.options = options;
96-
this.documents = new List();
97-
this.readPreference = ReadPreference.fromOptions(options) ?? ReadPreference.primary;
98-
this.session = options.session == null ? client.startSession({ owner: this }) : options.session;
99-
}
50+
private db: Db | undefined;
10051

10152
/** @internal */
102-
private static getIdFromResponse(response: { cursor: { id: number | Long | bigint } }): bigint {
103-
return typeof response.cursor.id === 'number'
104-
? BigInt(response.cursor.id)
105-
: typeof response.cursor.id === 'object'
106-
? response.cursor.id.toBigInt()
107-
: response.cursor.id;
53+
constructor(db: Db, command: Document, options: RunCommandCursorOptions = {}) {
54+
super(db.s.client, ns(db.namespace), options);
55+
this.db = db;
56+
this.command = Object.freeze({ ...command });
10857
}
10958

11059
/** @internal */
111-
private async runCommand() {
60+
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
11261
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
113-
...this.options,
114-
session: this.session,
62+
...this.cursorOptions,
63+
session: session,
11564
readPreference: this.readPreference
11665
});
117-
118-
try {
119-
const commandResponse = await executeOperation(this.client, operation);
120-
121-
if (commandResponse.cursor == null) {
122-
throw new MongoUnexpectedServerResponseError('command must return a cursor document');
123-
}
124-
125-
this.id = RunCommandCursor.getIdFromResponse(commandResponse);
126-
this.ns = ns(commandResponse.cursor.ns);
127-
this.documents.pushMany(commandResponse.cursor.firstBatch);
128-
this.server = operation.server;
129-
} catch (error) {
130-
await this.cleanup(error);
131-
throw error;
132-
}
133-
}
134-
135-
/** @internal */
136-
private async getMore() {
137-
if (this.ns == null || this.id == null || this.server == null) {
138-
throw new MongoRuntimeError('getMore cannot be invoked with null namespace, id, nor server');
139-
}
140-
141-
try {
142-
const getMoreResponse = await executeOperation(
143-
this.client,
144-
new GetMoreOperation(this.ns, this.id, this.server, {
145-
...this.options,
146-
session: this.session,
147-
readPreference: this.readPreference,
148-
...this.getMoreOptions
149-
})
150-
);
151-
152-
this.documents.pushMany(getMoreResponse.cursor.nextBatch);
153-
this.id = RunCommandCursor.getIdFromResponse(getMoreResponse);
154-
155-
if (this.id === 0n) {
156-
await this.close();
157-
}
158-
} catch (error) {
159-
await this.cleanup(error);
160-
throw error;
161-
}
162-
}
163-
164-
async *[Symbol.asyncIterator](): AsyncGenerator<Document> {
165-
if (this.done) {
166-
throw new MongoCursorExhaustedError('This cursor needs a nap');
167-
}
168-
try {
169-
if (this.id == null) {
170-
await this.runCommand();
171-
}
172-
while (this.documents.length !== 0 || this.id !== 0n) {
173-
const doc = this.documents.shift();
174-
if (doc != null) {
175-
yield doc;
176-
}
177-
if (this.documents.length === 0 && this.id !== 0n) {
178-
await this.getMore();
66+
executeOperation(this.client, operation).then(
67+
response => {
68+
if (!response.cursor) {
69+
callback(
70+
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
71+
);
72+
return;
17973
}
180-
}
181-
} finally {
182-
await this.close();
183-
}
184-
}
185-
186-
async next(): Promise<Document | null> {
187-
if (this.done) {
188-
throw new MongoCursorExhaustedError('This cursor needs a nap');
189-
}
190-
191-
const result = await this[Symbol.asyncIterator]().next();
192-
193-
return result.done ? null : result.value;
194-
}
195-
196-
async tryNext(): Promise<Document | null> {
197-
if (this.done) {
198-
throw new MongoCursorExhaustedError('This cursor needs a nap');
199-
}
200-
201-
if (this.documents.length === 0 && this.id !== 0n) {
202-
await this.getMore();
203-
}
204-
205-
return this.documents.shift();
206-
}
207-
208-
async toArray() {
209-
const documents = new List();
210-
for await (const doc of this) {
211-
documents.push(doc);
212-
}
213-
return documents.toArray();
214-
}
215-
216-
async close() {
217-
if (this.done) {
218-
return;
219-
}
220-
await this.cleanup().catch(() => null);
74+
callback(undefined, {
75+
server: operation.server,
76+
session,
77+
response
78+
});
79+
},
80+
err => callback(err)
81+
);
22182
}
22283

22384
/** @internal */
224-
async cleanup(error?: MongoError) {
225-
if (this.done) {
226-
return;
227-
}
228-
if (this.id != null && this.id !== 0n && this.ns != null && this.server != null) {
229-
const options = this.session.hasEnded ? {} : { session: this.session };
230-
const killCursorsOperation = new KillCursorsOperation(this.id, this.ns, this.server, options);
231-
await executeOperation(this.client, killCursorsOperation).catch(() => null);
232-
}
233-
if (this.session.owner === this) {
234-
await this.session.endSession().catch(() => null);
235-
}
236-
if (!this.session.inTransaction()) {
237-
maybeClearPinnedConnection(this.session, { error });
238-
}
239-
this.done = true;
85+
override _getMore(_batchSize: number, callback: Callback<Document>) {
86+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
87+
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
88+
...this.cursorOptions,
89+
session: this.session,
90+
...this.getMoreOptions
91+
});
92+
93+
executeOperation(this.client, getMoreOperation, callback);
24094
}
24195
}

src/db.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,11 +534,8 @@ export class Db {
534534
* @param command - The command that will start a cursor on the server.
535535
* @param options - Configurations for running the command, bson options will apply to getMores
536536
*/
537-
runCursorCommand(
538-
command: Document | Map<string, any>,
539-
options?: RunCommandCursorOptions
540-
): RunCommandCursor {
541-
return new RunCommandCursor(this.s.client, this, command, options);
537+
runCursorCommand(command: Document, options?: RunCommandCursorOptions): RunCommandCursor {
538+
return new RunCommandCursor(this, command, options);
542539
}
543540
}
544541

test/spec/run-command/runCursorCommand.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@
8080
"command": {
8181
"checkMetadataConsistency": 1
8282
}
83-
},
84-
"expectResult": []
83+
}
8584
}
8685
],
8786
"expectEvents": [

test/spec/run-command/runCursorCommand.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ tests:
4141
arguments:
4242
commandName: checkMetadataConsistency
4343
command: { checkMetadataConsistency: 1 }
44-
expectResult: []
4544
expectEvents:
4645
- client: *client
4746
eventType: command
@@ -255,6 +254,8 @@ tests:
255254
expectEvents:
256255
- client: *client
257256
eventType: command
257+
# The getMore should receive an error here because we do not have the right kind of cursor
258+
# So drivers should run a killCursors, but neither the error nor the killCursors command is relevant to this test
258259
ignoreExtraEvents: true
259260
events:
260261
- commandStartedEvent:

test/tools/unified-spec-runner/operations.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -666,9 +666,9 @@ operations.set('runCursorCommand', async ({ entities, operation }: OperationFunc
666666
session: opts.session
667667
});
668668

669-
if (!Number.isNaN(+opts.batchSize)) cursor.batchSize = +opts.batchSize;
670-
if (!Number.isNaN(+opts.maxTimeMS)) cursor.maxTimeMS = +opts.maxTimeMS;
671-
if (opts.comment !== undefined) cursor.comment = opts.comment;
669+
if (!Number.isNaN(+opts.batchSize)) cursor.setBatchSize(+opts.batchSize);
670+
if (!Number.isNaN(+opts.maxTimeMS)) cursor.setMaxTimeMS(+opts.maxTimeMS);
671+
if (opts.comment !== undefined) cursor.setComment(opts.comment);
672672

673673
return cursor.toArray();
674674
});
@@ -683,14 +683,16 @@ operations.set(
683683
session: opts.session
684684
});
685685

686-
if (!Number.isNaN(+opts.batchSize)) cursor.batchSize = +opts.batchSize;
687-
if (!Number.isNaN(+opts.maxTimeMS)) cursor.maxTimeMS = +opts.maxTimeMS;
688-
if (opts.comment !== undefined) cursor.comment = opts.comment;
686+
if (!Number.isNaN(+opts.batchSize)) cursor.setBatchSize(+opts.batchSize);
687+
if (!Number.isNaN(+opts.maxTimeMS)) cursor.setMaxTimeMS(+opts.maxTimeMS);
688+
if (opts.comment !== undefined) cursor.setComment(opts.comment);
689689

690690
// The spec dictates that we create the cursor and force the find command
691691
// to execute, but the first document must still be returned for the first iteration.
692-
const result = await cursor[Symbol.asyncIterator]().next();
693-
if (!result.done) cursor.documents.unshift(result.value);
692+
const result = await cursor.tryNext();
693+
const kDocuments = getSymbolFrom(cursor, 'documents');
694+
if (result) cursor[kDocuments].unshift(result);
695+
694696
return cursor;
695697
}
696698
);

0 commit comments

Comments
 (0)