From 6e37543a797dea3ed6840d5d929b556c126ca149 Mon Sep 17 00:00:00 2001 From: Michel Yoeung Date: Fri, 10 Nov 2023 17:43:50 +0100 Subject: [PATCH] feat(batch-write): batch write command (WIP) --- src/base-model.ts | 67 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/src/base-model.ts b/src/base-model.ts index e4ee75e..5db7ca7 100644 --- a/src/base-model.ts +++ b/src/base-model.ts @@ -1,8 +1,11 @@ import { ObjectSchema } from 'joi'; -import { DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb'; +import {DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb'; import { BatchGetCommand, BatchGetCommandInput, + BatchWriteCommand, + BatchWriteCommandInput, + BatchWriteCommandOutput, DeleteCommand, DeleteCommandInput, DeleteCommandOutput, @@ -28,6 +31,8 @@ type SimpleKey = KeyValue; type CompositeKey = { pk: KeyValue; sk: KeyValue }; type Keys = SimpleKey[] | CompositeKey[]; +export const DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE = 25; + const isComposite = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is CompositeKey[] => (hashKeys_compositeKeys[0] as { pk: string } & unknown).pk !== undefined; @@ -167,6 +172,66 @@ export default abstract class Model { return this.save(toCreate, putOptions); } + private splitIntoChunks(items: Array, chunkSize = DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE): Array> { + const chunks: T[][] = []; + for (let i = 0; i < items.length; i += chunkSize) { + chunks.push(items.slice(i, i + chunkSize)); + } + return chunks; + } + + private async batchWriteItemInternal(batchWriteCommandInput: BatchWriteCommandInput): Promise { + const batchWriteCommand = new BatchWriteCommand(batchWriteCommandInput); + return this.documentClient.send(batchWriteCommand); + } + + private async formatWriteItemPut(item: T) { + return ({ + PutRequest: { + Item: item, + } + }); + } + + private async formatWriteItemDelete(item: T) { + return ({ + DeleteRequest: { + Key: { + [this.pk as string]: this.pkValue(item), + [this.sk as string]: this.skValue(item) + }, + } + }); + } + + async batchWrite(itemsToCreate: Array, itemsToDelete: Array): Promise<{failed: Array}> { + const items = []; + const results = { + failed: {itemsToCreate: [] as Array, itemsToDelete: [] as Array>} + }; + itemsToCreate.forEach((item) => { + this.testKeys(this.pkValue(item), this.skValue(item)); + items.push(this.formatWriteItemPut(item)); + })); + itemsToDelete.forEach((item) => { + this.testKeys(this.pkValue(item), this.skValue(item)); + items.push((this.formatWriteItemDelete(item)) + })); + const batchs = this.splitIntoChunks(items); + for (const batch of batchs) { + const result = await this.batchWriteItemInternal({ + RequestItems: { + [this.tableName as string]: batch + } + }); + if (result.UnprocessedItems && Object.keys(result.UnprocessedItems || {})?.length) { + results.failed.itemsToCreate.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.PutRequest).map((request) => request.PutRequest?.Item as T)); + results.failed.itemsToDelete.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.DeleteRequest).map((request) => request.DeleteRequest?.Key as Partial)); + } + } + // TODO + } + /** * Save the item hold by the class. Overwrite of an existing item with the same key(s) * if it exists.