@@ -38,6 +38,7 @@ export class ClientBulkWriteCommandBuilder {
3838 models : AnyClientBulkWriteModel [ ] ;
3939 options : ClientBulkWriteOptions ;
4040 pkFactory : PkFactory ;
41+ currentModelIndex : number ;
4142
4243 /**
4344 * Create the command builder.
@@ -51,6 +52,7 @@ export class ClientBulkWriteCommandBuilder {
5152 this . models = models ;
5253 this . options = options ;
5354 this . pkFactory = pkFactory ?? DEFAULT_PK_FACTORY ;
55+ this . currentModelIndex = 0 ;
5456 }
5557
5658 /**
@@ -65,68 +67,54 @@ export class ClientBulkWriteCommandBuilder {
6567 }
6668
6769 /**
68- * Build the bulk write commands from the models.
70+ * Determines if there is another batch to process.
71+ * @returns True if not all batches have been built.
6972 */
70- buildCommands ( maxMessageSizeBytes : number , maxWriteBatchSize : number ) : ClientBulkWriteCommand [ ] {
71- // Iterate the models to build the ops and nsInfo fields.
72- // We need to do this in a loop which creates one command each up
73- // to the max bson size or max message size.
74- const commands : ClientBulkWriteCommand [ ] = [ ] ;
75- let currentCommandLength = 0 ;
73+ hasNextBatch ( ) : boolean {
74+ return this . currentModelIndex < this . models . length ;
75+ }
76+
77+ /**
78+ * Build a single batch of a client bulk write command.
79+ * @param maxMessageSizeBytes - The max message size in bytes.
80+ * @param maxWriteBatchSize - The max write batch size.
81+ * @returns The client bulk write command.
82+ */
83+ buildBatch ( maxMessageSizeBytes : number , maxWriteBatchSize : number ) : ClientBulkWriteCommand {
84+ let commandLength = 0 ;
7685 let currentNamespaceIndex = 0 ;
77- let currentCommand : ClientBulkWriteCommand = this . baseCommand ( ) ;
86+ const command : ClientBulkWriteCommand = this . baseCommand ( ) ;
7887 const namespaces = new Map < string , number > ( ) ;
7988
80- for ( const model of this . models ) {
89+ while ( this . currentModelIndex < this . models . length ) {
90+ const model = this . models [ this . currentModelIndex ] ;
8191 const ns = model . namespace ;
82- const index = namespaces . get ( ns ) ;
83-
84- /**
85- * Convenience function for resetting everything when a new batch
86- * is started.
87- */
88- const reset = ( ) => {
89- commands . push ( currentCommand ) ;
90- namespaces . clear ( ) ;
91- currentNamespaceIndex = 0 ;
92- currentCommand = this . baseCommand ( ) ;
93- namespaces . set ( ns , currentNamespaceIndex ) ;
94- } ;
92+ const nsIndex = namespaces . get ( ns ) ;
9593
96- if ( index != null ) {
97- // Pushing to the ops document sequence returns the bytes length added .
98- const operation = buildOperation ( model , index , this . pkFactory ) ;
94+ if ( nsIndex != null ) {
95+ // Build the operation and serialize it to get the bytes buffer .
96+ const operation = buildOperation ( model , nsIndex , this . pkFactory ) ;
9997 const operationBuffer = BSON . serialize ( operation ) ;
10098
101- // Check if the operation buffer can fit in the current command. If it can,
99+ // Check if the operation buffer can fit in the command. If it can,
102100 // then add the operation to the document sequence and increment the
103101 // current length as long as the ops don't exceed the maxWriteBatchSize.
104102 if (
105- currentCommandLength + operationBuffer . length < maxMessageSizeBytes &&
106- currentCommand . ops . documents . length < maxWriteBatchSize
103+ commandLength + operationBuffer . length < maxMessageSizeBytes &&
104+ command . ops . documents . length < maxWriteBatchSize
107105 ) {
108106 // Pushing to the ops document sequence returns the total byte length of the document sequence.
109- currentCommandLength =
110- MESSAGE_OVERHEAD_BYTES + this . addOperation ( currentCommand , operation , operationBuffer ) ;
107+ commandLength = MESSAGE_OVERHEAD_BYTES + command . ops . push ( operation , operationBuffer ) ;
108+ // Increment the builder's current model index.
109+ this . currentModelIndex ++ ;
111110 } else {
112- // We need to batch. Push the current command to the commands
113- // array and create a new current command. We aslo need to clear the namespaces
114- // map for the new command.
115- reset ( ) ;
116-
117- const nsInfo = { ns : ns } ;
118- const nsInfoBuffer = BSON . serialize ( nsInfo ) ;
119- currentCommandLength =
120- MESSAGE_OVERHEAD_BYTES +
121- this . addOperationAndNsInfo (
122- currentCommand ,
123- operation ,
124- operationBuffer ,
125- nsInfo ,
126- nsInfoBuffer
127- ) ;
111+ // The operation cannot fit in the current command and will need to
112+ // go in the next batch. Exit the loop.
113+ break ;
128114 }
129115 } else {
116+ // The namespace is not already in the nsInfo so we will set it in the map, and
117+ // construct our nsInfo and ops documents and buffers.
130118 namespaces . set ( ns , currentNamespaceIndex ) ;
131119 const nsInfo = { ns : ns } ;
132120 const nsInfoBuffer = BSON . serialize ( nsInfo ) ;
@@ -138,68 +126,26 @@ export class ClientBulkWriteCommandBuilder {
138126 // sequences and increment the current length as long as the ops don't exceed
139127 // the maxWriteBatchSize.
140128 if (
141- currentCommandLength + nsInfoBuffer . length + operationBuffer . length <
142- maxMessageSizeBytes &&
143- currentCommand . ops . documents . length < maxWriteBatchSize
129+ commandLength + nsInfoBuffer . length + operationBuffer . length < maxMessageSizeBytes &&
130+ command . ops . documents . length < maxWriteBatchSize
144131 ) {
145- currentCommandLength =
132+ // Pushing to the ops document sequence returns the total byte length of the document sequence.
133+ commandLength =
146134 MESSAGE_OVERHEAD_BYTES +
147- this . addOperationAndNsInfo (
148- currentCommand ,
149- operation ,
150- operationBuffer ,
151- nsInfo ,
152- nsInfoBuffer
153- ) ;
135+ command . nsInfo . push ( nsInfo , nsInfoBuffer ) +
136+ command . ops . push ( operation , operationBuffer ) ;
137+ // We've added a new namespace, increment the namespace index.
138+ currentNamespaceIndex ++ ;
139+ // Increment the builder's current model index.
140+ this . currentModelIndex ++ ;
154141 } else {
155- // We need to batch. Push the current command to the commands
156- // array and create a new current command. Aslo clear the namespaces map.
157- reset ( ) ;
158-
159- currentCommandLength =
160- MESSAGE_OVERHEAD_BYTES +
161- this . addOperationAndNsInfo (
162- currentCommand ,
163- operation ,
164- operationBuffer ,
165- nsInfo ,
166- nsInfoBuffer
167- ) ;
142+ // The operation cannot fit in the current command and will need to
143+ // go in the next batch. Exit the loop.
144+ break ;
168145 }
169- // We've added a new namespace, increment the namespace index.
170- currentNamespaceIndex ++ ;
171146 }
172147 }
173-
174- // After we've finisihed iterating all the models put the last current command
175- // only if there are operations in it.
176- if ( currentCommand . ops . documents . length > 0 ) {
177- commands . push ( currentCommand ) ;
178- }
179-
180- return commands ;
181- }
182-
183- private addOperation (
184- command : ClientBulkWriteCommand ,
185- operation : Document ,
186- operationBuffer : Uint8Array
187- ) : number {
188- // Pushing to the ops document sequence returns the total byte length of the document sequence.
189- return command . ops . push ( operation , operationBuffer ) ;
190- }
191-
192- private addOperationAndNsInfo (
193- command : ClientBulkWriteCommand ,
194- operation : Document ,
195- operationBuffer : Uint8Array ,
196- nsInfo : Document ,
197- nsInfoBuffer : Uint8Array
198- ) : number {
199- // Pushing to the nsInfo document sequence returns the total byte length of the document sequence.
200- const nsInfoLength = command . nsInfo . push ( nsInfo , nsInfoBuffer ) ;
201- const opsLength = this . addOperation ( command , operation , operationBuffer ) ;
202- return nsInfoLength + opsLength ;
148+ return command ;
203149 }
204150
205151 private baseCommand ( ) : ClientBulkWriteCommand {
0 commit comments