diff --git a/.doc_gen/metadata/serverless_metadata.yaml b/.doc_gen/metadata/serverless_metadata.yaml index 5c2bbb9..477fabe 100644 --- a/.doc_gen/metadata/serverless_metadata.yaml +++ b/.doc_gen/metadata/serverless_metadata.yaml @@ -852,3 +852,47 @@ serverless_MSK_Lambda: services: lambda: kafka: +serverless_DynamoDB_Lambda_batch_processor: + title: Process &DDB; Stream records with &LAM; Powertools Batch Processor + title_abbrev: Process &DDB; Stream records with &LAM; Powertools Batch Processor + synopsis: implement a Lambda function that processes DynamoDB Stream records using the AWS Lambda Powertools Batch Processor utility to handle partial failures gracefully and prevent Lambda from retrying the entire batch when only some records fail. + category: Serverless examples + languages: + JavaScript: + versions: + - sdk_version: 3 + github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb + excerpts: + - description: Processing &DDB; Stream records with Powertools Batch Processor using JavaScript. + snippet_files: + - tools-powertools-batch-processor-ddb/example.js + - description: Processing &DDB; Stream records with Powertools Batch Processor using TypeScript. + snippet_files: + - tools-powertools-batch-processor-ddb/example.ts + Python: + versions: + - sdk_version: 3 + github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb + excerpts: + - description: Processing &DDB; Stream records with Powertools Batch Processor using Python. + snippet_files: + - tools-powertools-batch-processor-ddb/example.py + Java: + versions: + - sdk_version: 2 + github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb + excerpts: + - description: Processing &DDB; Stream records with Powertools Batch Processor using Java. + snippet_files: + - tools-powertools-batch-processor-ddb/example.java + .NET: + versions: + - sdk_version: 3 + github: https://github.com/aws-samples/serverless-snippets/tree/main/tools-powertools-batch-processor-ddb + excerpts: + - description: Processing &DDB; Stream records with Powertools Batch Processor using .NET. + snippet_files: + - tools-powertools-batch-processor-ddb/Function.cs + services: + lambda: + dynamodb: diff --git a/tools-powertools-batch-processor-ddb/Function.cs b/tools-powertools-batch-processor-ddb/Function.cs new file mode 100644 index 0000000..9ceb896 --- /dev/null +++ b/tools-powertools-batch-processor-ddb/Function.cs @@ -0,0 +1,28 @@ +public class Customer +{ + public string? CustomerId { get; set; } + public string? Name { get; set; } + public string? Email { get; set; } + public DateTime CreatedAt { get; set; } +} + +internal class TypedDynamoDbRecordHandler : ITypedRecordHandler +{ + public async Task HandleAsync(Customer customer, CancellationToken cancellationToken) + { + Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}"); + + if (string.IsNullOrEmpty(customer.Email)) + { + throw new ArgumentException("Customer email is required"); + } + + return await Task.FromResult(RecordHandlerResult.None); + } +} + +[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))] +public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _) +{ + return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; +} \ No newline at end of file diff --git a/tools-powertools-batch-processor-ddb/example.java b/tools-powertools-batch-processor-ddb/example.java new file mode 100644 index 0000000..99d903e --- /dev/null +++ b/tools-powertools-batch-processor-ddb/example.java @@ -0,0 +1,26 @@ +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder; +import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler; + +public class DynamoDBStreamBatchHandler implements RequestHandler { + + private final BatchMessageHandler handler; + + public DynamoDBStreamBatchHandler() { + handler = new BatchMessageHandlerBuilder() + .withDynamoDbBatchHandler() + .buildWithRawMessageHandler(this::processMessage); + } + + @Override + public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) { + return handler.processBatch(ddbEvent, context); + } + + private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) { + // Process the change record + } +} \ No newline at end of file diff --git a/tools-powertools-batch-processor-ddb/example.js b/tools-powertools-batch-processor-ddb/example.js new file mode 100644 index 0000000..439f0ac --- /dev/null +++ b/tools-powertools-batch-processor-ddb/example.js @@ -0,0 +1,25 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from "@aws-lambda-powertools/batch"; +import { Logger } from "@aws-lambda-powertools/logger"; + +const processor = new BatchProcessor(EventType.DynamoDBStreams); +const logger = new Logger(); + +const recordHandler = async (record) => { + if (record.dynamodb?.NewImage) { + logger.info("Processing record", { record: record.dynamodb.NewImage }); + const message = record.dynamodb.NewImage.Message.S; + if (message) { + const payload = JSON.parse(message); + logger.info("Processed item", { item: payload }); + } + } +}; + +export const handler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + }); diff --git a/tools-powertools-batch-processor-ddb/example.py b/tools-powertools-batch-processor-ddb/example.py new file mode 100644 index 0000000..70040b9 --- /dev/null +++ b/tools-powertools-batch-processor-ddb/example.py @@ -0,0 +1,34 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + if record.dynamodb and record.dynamodb.new_image: + logger.info(record.dynamodb.new_image) + message = record.dynamodb.new_image.get("Message") + if message: + payload: dict = json.loads(message) + logger.info(payload) + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response( + event=event, record_handler=record_handler, processor=processor, context=context + ) diff --git a/tools-powertools-batch-processor-ddb/example.ts b/tools-powertools-batch-processor-ddb/example.ts new file mode 100644 index 0000000..ec918a5 --- /dev/null +++ b/tools-powertools-batch-processor-ddb/example.ts @@ -0,0 +1,26 @@ +import { + BatchProcessor, + EventType, + processPartialResponse, +} from "@aws-lambda-powertools/batch"; +import { Logger } from "@aws-lambda-powertools/logger"; +import type { DynamoDBRecord, DynamoDBStreamHandler } from "aws-lambda"; + +const processor = new BatchProcessor(EventType.DynamoDBStreams); +const logger = new Logger(); + +const recordHandler = async (record: DynamoDBRecord): Promise => { + if (record.dynamodb?.NewImage) { + logger.info("Processing record", { record: record.dynamodb.NewImage }); + const message = record.dynamodb.NewImage.Message.S; + if (message) { + const payload = JSON.parse(message); + logger.info("Processed item", { item: payload }); + } + } +}; + +export const handler: DynamoDBStreamHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + });