Skip to content

Commit f524b27

Browse files
committed
feat(NODE-6337): implement client bulk write batching
1 parent ddcae44 commit f524b27

File tree

6 files changed

+330
-174
lines changed

6 files changed

+330
-174
lines changed

src/cmap/commands.ts

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,68 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header?: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
this.init();
448+
if (documents) {
449+
for (const doc of documents) {
450+
this.push(doc);
451+
}
452+
}
453+
}
454+
455+
/**
456+
* Initialize the buffer chunks.
457+
*/
458+
private init() {
459+
// Document sequences starts with type 1 at the first byte.
460+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
461+
buffer[0] = 1;
462+
// Third part is the field name at offset 5 with trailing null byte.
463+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
464+
this.chunks.push(buffer);
465+
this.header = buffer;
466+
}
467+
468+
/**
469+
* Push a document to the document sequence. Will serialize the document
470+
* as well and return the current serialized length of all documents.
471+
* @param document - The document to add.
472+
* @returns The serialized documents length.
473+
*/
474+
push(document: Document): number {
475+
// First serialize the document and recalculate the documents length.
476+
const docBuffer = BSON.serialize(document);
477+
this.serializedDocumentsLength += docBuffer.length;
478+
// Push the document.
479+
this.documents.push(document);
480+
// Push the document raw bson.
481+
this.chunks.push(docBuffer);
482+
// Write the new length.
483+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
484+
return this.serializedDocumentsLength;
485+
}
486+
487+
/**
488+
* Get the fully serialized bytes for the document sequence section.
489+
* @returns The section bytes.
490+
*/
491+
toBin(): Uint8Array {
492+
// TODO: What to do if no documents?
493+
return Buffer.concat(this.chunks);
436494
}
437495
}
438496

@@ -543,21 +601,7 @@ export class OpMsgRequest {
543601
const chunks = [];
544602
for (const [key, value] of Object.entries(document)) {
545603
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
604+
chunks.push(value.toBin());
561605
// Why are we removing the field from the command? This is because it needs to be
562606
// removed in the OP_MSG request first section, and DocumentSequence is not a
563607
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/operations/client_bulk_write/command_builder.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ export class ClientBulkWriteCommandBuilder {
5656
*/
5757
buildCommands(): ClientBulkWriteCommand[] {
5858
// Iterate the models to build the ops and nsInfo fields.
59+
// We need to do this in a loop which creates one command each up
60+
// to the max bson size or max message size.
5961
const operations = [];
6062
let currentNamespaceIndex = 0;
6163
const namespaces = new Map<string, number>();
@@ -78,8 +80,8 @@ export class ClientBulkWriteCommandBuilder {
7880
bulkWrite: 1,
7981
errorsOnly: this.errorsOnly,
8082
ordered: this.options.ordered ?? true,
81-
ops: new DocumentSequence(operations),
82-
nsInfo: new DocumentSequence(nsInfo)
83+
ops: new DocumentSequence('ops', operations),
84+
nsInfo: new DocumentSequence('nsInfo', nsInfo)
8385
};
8486
// Add bypassDocumentValidation if it was present in the options.
8587
if (this.options.bypassDocumentValidation != null) {

src/operations/client_bulk_write/executor.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,14 @@ async function executeAcknowledged(
7070
): Promise<ClientBulkWriteResult> {
7171
const resultsMerger = new ClientBulkWriteResultsMerger(options);
7272
// For each command will will create and exhaust a cursor for the results.
73+
let currentBatchOffset = 0;
7374
for (const command of commands) {
7475
const cursor = new ClientBulkWriteCursor(client, command, options);
7576
const docs = await cursor.toArray();
76-
resultsMerger.merge(command.ops.documents, cursor.response, docs);
77+
const operations = command.ops.documents;
78+
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
79+
// Set the new batch index so we can back back to the index in the original models.
80+
currentBatchOffset += operations.length;
7781
}
7882
return resultsMerger.result;
7983
}

src/operations/client_bulk_write/results_merger.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ export class ClientBulkWriteResultsMerger {
4444

4545
/**
4646
* Merge the results in the cursor to the existing result.
47+
* @param currentBatchOffset - The offset index to the original models.
4748
* @param response - The cursor response.
4849
* @param documents - The documents in the cursor.
4950
* @returns The current result.
5051
*/
5152
merge(
53+
currentBatchOffset: number,
5254
operations: Document[],
5355
response: ClientBulkWriteCursorResponse,
5456
documents: Document[]
@@ -69,7 +71,9 @@ export class ClientBulkWriteResultsMerger {
6971
const operation = operations[document.idx];
7072
// Handle insert results.
7173
if ('insert' in operation) {
72-
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
74+
this.result.insertResults?.set(document.idx + currentBatchOffset, {
75+
insertedId: operation.document._id
76+
});
7377
}
7478
// Handle update results.
7579
if ('update' in operation) {
@@ -80,11 +84,13 @@ export class ClientBulkWriteResultsMerger {
8084
if (document.upserted) {
8185
result.upsertedId = document.upserted._id;
8286
}
83-
this.result.updateResults?.set(document.idx, result);
87+
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
8488
}
8589
// Handle delete results.
8690
if ('delete' in operation) {
87-
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
91+
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
92+
deletedCount: document.n
93+
});
8894
}
8995
}
9096
}

test/unit/cmap/commands.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe('commands', function () {
1515
context('when there is one document sequence', function () {
1616
const command = {
1717
test: 1,
18-
field: new DocumentSequence([{ test: 1 }])
18+
field: new DocumentSequence('field', [{ test: 1 }])
1919
};
2020
const msg = new OpMsgRequest('admin', command, {});
2121
const buffers = msg.toBin();
@@ -53,8 +53,8 @@ describe('commands', function () {
5353
context('when there are multiple document sequences', function () {
5454
const command = {
5555
test: 1,
56-
fieldOne: new DocumentSequence([{ test: 1 }]),
57-
fieldTwo: new DocumentSequence([{ test: 1 }])
56+
fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]),
57+
fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }])
5858
};
5959
const msg = new OpMsgRequest('admin', command, {});
6060
const buffers = msg.toBin();

0 commit comments

Comments
 (0)