Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch-write): batch write command (WIP) #286

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion src/base-model.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { ObjectSchema } from 'joi';
import { DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import {DynamoDBClient, DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import {
BatchGetCommand,
BatchGetCommandInput,
BatchWriteCommand,
BatchWriteCommandInput,
BatchWriteCommandOutput,
DeleteCommand,
DeleteCommandInput,
DeleteCommandOutput,
Expand All @@ -28,6 +31,8 @@ type SimpleKey = KeyValue;
type CompositeKey = { pk: KeyValue; sk: KeyValue };
type Keys = SimpleKey[] | CompositeKey[];

export const DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE = 25;

const isComposite = (hashKeys_compositeKeys: Keys): hashKeys_compositeKeys is CompositeKey[] =>
(hashKeys_compositeKeys[0] as { pk: string } & unknown).pk !== undefined;

Expand Down Expand Up @@ -167,6 +172,66 @@ export default abstract class Model<T> {
return this.save(toCreate, putOptions);
}

private splitIntoChunks(items: Array<any>, chunkSize = DYNAMODB_BATCH_WRITE_MAX_CHUNK_SIZE): Array<Array<T>> {
const chunks: T[][] = [];
for (let i = 0; i < items.length; i += chunkSize) {
chunks.push(items.slice(i, i + chunkSize));
}
return chunks;
}

private async batchWriteItemInternal(batchWriteCommandInput: BatchWriteCommandInput): Promise<BatchWriteCommandOutput> {
const batchWriteCommand = new BatchWriteCommand(batchWriteCommandInput);
return this.documentClient.send(batchWriteCommand);
}

private async formatWriteItemPut(item: T) {
return ({
PutRequest: {
Item: item,
}
});
}

private async formatWriteItemDelete(item: T) {
return ({
DeleteRequest: {
Key: {
[this.pk as string]: this.pkValue(item),
[this.sk as string]: this.skValue(item)
},
}
});
}

async batchWrite(itemsToCreate: Array<T>, itemsToDelete: Array<T>): Promise<{failed: Array<T>}> {
const items = [];
const results = {
failed: {itemsToCreate: [] as Array<T>, itemsToDelete: [] as Array<Partial<T>>}
};
itemsToCreate.forEach((item) => {
this.testKeys(this.pkValue(item), this.skValue(item));
items.push(this.formatWriteItemPut(item));
}));
itemsToDelete.forEach((item) => {
this.testKeys(this.pkValue(item), this.skValue(item));
items.push((this.formatWriteItemDelete(item))
}));
const batchs = this.splitIntoChunks(items);
for (const batch of batchs) {
const result = await this.batchWriteItemInternal({
RequestItems: {
[this.tableName as string]: batch
}
});
if (result.UnprocessedItems && Object.keys(result.UnprocessedItems || {})?.length) {
results.failed.itemsToCreate.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.PutRequest).map((request) => request.PutRequest?.Item as T));
results.failed.itemsToDelete.push(...(result.UnprocessedItems[this.tableName as string] ?? []).filter((request) => !!request.DeleteRequest).map((request) => request.DeleteRequest?.Key as Partial<T>));
}
}
// TODO
}

/**
* Save the item hold by the class. Overwrite of an existing item with the same key(s)
* if it exists.
Expand Down
Loading