Skip to content

Commit

Permalink
feat(batch-write): batch write command (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Yoeung committed Nov 10, 2023
1 parent 27f8c47 commit 6e37543
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion src/base-model.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { ObjectSchema } from 'joi';
import { DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import {DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import {
BatchGetCommand,
BatchGetCommandInput,
BatchWriteCommand,
BatchWriteCommandInput,
BatchWriteCommandOutput,
DeleteCommand,
DeleteCommandInput,
DeleteCommandOutput,
Expand All @@ -28,6 +31,8 @@ type SimpleKey = KeyValue;
type CompositeKey = { pk: KeyValue; sk: KeyValue };
type Keys = SimpleKey[] | CompositeKey[];

export const DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE = 25;

const isComposite = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is CompositeKey[] =>
(hashKeys_compositeKeys[0] as { pk: string } & unknown).pk !== undefined;

Expand Down Expand Up @@ -167,6 +172,66 @@ export default abstract class Model<T> {
return this.save(toCreate, putOptions);
}

private splitIntoChunks(items: Array<any>, chunkSize = DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE): Array<Array<T>> {
const chunks: T[][] = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize));
}
return chunks;
}

private async batchWriteItemInternal(batchWriteCommandInput: BatchWriteCommandInput): Promise<BatchWriteCommandOutput> {
const batchWriteCommand = new BatchWriteCommand(batchWriteCommandInput);
return this.documentClient.send(batchWriteCommand);
}

private async formatWriteItemPut(item: T) {
return ({
PutRequest: {
Item: item,
}
});
}

private async formatWriteItemDelete(item: T) {
return ({
DeleteRequest: {
Key: {
[this.pk as string]: this.pkValue(item),
[this.sk as string]: this.skValue(item)
},
}
});
}

async batchWrite(itemsToCreate: Array<T>, itemsToDelete: Array<T>): Promise<{failed: Array<T>}> {
const items = [];
const results = {
failed: {itemsToCreate: [] as Array<T>, itemsToDelete: [] as Array<Partial<T>>}
};
itemsToCreate.forEach((item) => {
this.testKeys(this.pkValue(item), this.skValue(item));
items.push(this.formatWriteItemPut(item));
}));
itemsToDelete.forEach((item) => {
this.testKeys(this.pkValue(item), this.skValue(item));
items.push((this.formatWriteItemDelete(item))
}));
const batchs = this.splitIntoChunks(items);
for (const batch of batchs) {
const result = await this.batchWriteItemInternal({
RequestItems: {
[this.tableName as string]: batch
}
});
if (result.UnprocessedItems && Object.keys(result.UnprocessedItems || {})?.length) {
results.failed.itemsToCreate.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.PutRequest).map((request) => request.PutRequest?.Item as T));
results.failed.itemsToDelete.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.DeleteRequest).map((request) => request.DeleteRequest?.Key as Partial<T>));
}
}
// TODO
}

/**
* Save the item hold by the class. Overwrite of an existing item with the same key(s)
* if it exists.
Expand Down

0 comments on commit 6e37543

Please sign in to comment.