From e1d88c26bc52e9271604d47ad4f12883267d43f4 Mon Sep 17 00:00:00 2001 From: Lasse Christiansen Date: Thu, 19 Sep 2024 21:04:10 +0200 Subject: [PATCH 1/5] Added new batch processing option: 'ThrowOnFullBatchFailure' to control if a BatchProcessingException should be raised on full batch failure or not. --- .../src/HelloWorld/Function.cs | 32 +-- examples/BatchProcessing/template.yaml | 212 ++++++++++-------- .../BatchProcessor.cs | 22 +- .../BatchProcessorAttribute.cs | 12 +- .../ProcessingOptions.cs | 15 +- .../Core/Constants.cs | 7 +- .../Core/IPowertoolsConfigurations.cs | 10 +- .../Core/PowertoolsConfigurations.cs | 3 + .../Handlers/SQS/Handler/HandlerFunction.cs | 91 ++++++-- .../Handlers/SQS/HandlerTests.cs | 149 +++++++++++- .../Helpers/Helpers.cs | 46 +++- 11 files changed, 449 insertions(+), 150 deletions(-) diff --git a/examples/BatchProcessing/src/HelloWorld/Function.cs b/examples/BatchProcessing/src/HelloWorld/Function.cs index f1ff8d42..f8bd0aad 100644 --- a/examples/BatchProcessing/src/HelloWorld/Function.cs +++ b/examples/BatchProcessing/src/HelloWorld/Function.cs @@ -42,58 +42,58 @@ static Function() Services.Init(); } - [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))] [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))] public BatchItemFailuresResponse DynamoDbStreamHandlerUsingAttribute(DynamoDBEvent _) { return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; } - - [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))] + [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))] public BatchItemFailuresResponse KinesisEventHandlerUsingAttribute(KinesisEvent _) { return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; } - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] public BatchItemFailuresResponse SqsHandlerUsingAttribute(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] + [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] public BatchItemFailuresResponse SqsHandlerUsingAttributeWithErrorPolicy(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } #region More example handlers... - - [BatchProcessor(RecordHandlerProvider = typeof(CustomSqsRecordHandlerProvider), BatchProcessor = typeof(CustomSqsBatchProcessor))] + [Logging(LogEvent = true)] + [BatchProcessor(RecordHandlerProvider = typeof(CustomSqsRecordHandlerProvider), BatchProcessor = typeof(CustomSqsBatchProcessor))] public BatchItemFailuresResponse HandlerUsingAttributeAndCustomRecordHandlerProvider(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; - } - - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))] + } + [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))] public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))] + [Logging(LogEvent = true)] + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))] public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessorProvider(SQSEvent _) { var batchProcessor = Services.Provider.GetRequiredService(); return batchProcessor.ProcessingResult.BatchItemFailuresResponse; } - + [Logging(LogEvent = true)] public async Task HandlerUsingUtility(SQSEvent sqsEvent) { @@ -103,7 +103,7 @@ public async Task HandlerUsingUtility(SQSEvent sqsEve })); return result.BatchItemFailuresResponse; } - + [Logging(LogEvent = true)] public async Task HandlerUsingUtilityFromIoc(SQSEvent sqsEvent) { @@ -112,6 +112,6 @@ public async Task HandlerUsingUtilityFromIoc(SQSEvent var result = await batchProcessor.ProcessAsync(sqsEvent, recordHandler); return result.BatchItemFailuresResponse; } - + #endregion } diff --git a/examples/BatchProcessing/template.yaml b/examples/BatchProcessing/template.yaml index 937f1e75..ad7ed024 100644 --- a/examples/BatchProcessing/template.yaml +++ b/examples/BatchProcessing/template.yaml @@ -2,122 +2,110 @@ # SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 -Description: Example project demoing the Batch Processing utility SQS in Powertools for AWS Lambda (.NET) +Description: Example project demoing the Batch Processing Utility in Powertools for AWS Lambda (.NET) -# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: Timeout: 20 - Runtime: dotnet6 - MemorySize: 256 + Runtime: dotnet8 + MemorySize: 1024 Environment: Variables: - POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing + POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch POWERTOOLS_LOG_LEVEL: Debug - POWERTOOLS_LOGGER_CASE: SnakeCase # Allowed values are: CamelCase, PascalCase and SnakeCase + POWERTOOLS_LOGGER_CASE: SnakeCase POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1 POWERTOOLS_BATCH_PARALLEL_ENABLED : false + POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources: - + # -------------- - # KMS key for encrypted queues + # KMS key for encrypted messages / records CustomerKey: Type: AWS::KMS::Key Properties: Description: KMS key for encrypted queues Enabled: true KeyPolicy: - Version: '2012-10-17' + Version: "2012-10-17" Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: - AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' - Action: 'kms:*' - Resource: '*' - - Sid: Allow use of the key + AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" + Action: "kms:*" + Resource: "*" + - Sid: Allow AWS Lambda to use the key Effect: Allow Principal: Service: lambda.amazonaws.com Action: - kms:Decrypt - kms:GenerateDataKey - Resource: '*' - + Resource: "*" + CustomerKeyAlias: Type: AWS::KMS::Alias Properties: - AliasName: alias/powertools-batch-sqs-demo + AliasName: alias/powertools-dotnet-sample-batch-kms-key TargetKeyId: !Ref CustomerKey # -------------- - # SQS DL Queue - DemoDlqSqsQueue: + # Batch Processing for SQS Queue + SqsDeadLetterQueue: Type: AWS::SQS::Queue Properties: KmsMasterKeyId: !Ref CustomerKey - - # -------------- - # SQS Queue - DemoSqsQueue: + + SqsQueue: Type: AWS::SQS::Queue Properties: RedrivePolicy: - deadLetterTargetArn: - Fn::GetAtt: - - "DemoDlqSqsQueue" - - "Arn" + deadLetterTargetArn: !GetAtt SqsDeadLetterQueue.Arn maxReceiveCount: 2 KmsMasterKeyId: !Ref CustomerKey - - # -------------- - # Batch Processing for SQS - SampleSqsBatchProcessorFunction: - Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction + + SqsBatchProcessorFunction: + Type: AWS::Serverless::Function Properties: - FunctionName: powertools-dotnet-sample-batch-processor-sqs CodeUri: ./src/HelloWorld/ Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute - # ReservedConcurrentExecutions: 1 Policies: - Statement: - - Sid: SQSDeleteGetAttribute + - Sid: DlqPermissions Effect: Allow Action: - - sqs:DeleteMessageBatch - - sqs:GetQueueAttributes - Resource: !GetAtt DemoSqsQueue.Arn - - Sid: SQSSendMessageBatch - Effect: Allow - Action: - - sqs:SendMessageBatch - sqs:SendMessage - Resource: !GetAtt DemoDlqSqsQueue.Arn - - Sid: SQSKMSKey + - sqs:SendMessageBatch + Resource: !GetAtt SqsDeadLetterQueue.Arn + - Sid: KmsKeyPermissions Effect: Allow Action: - - kms:GenerateDataKey - kms:Decrypt + - kms:GenerateDataKey Resource: !GetAtt CustomerKey.Arn Events: SqsBatch: - Type: SQS # More info about SQS Event Source: https://github.com/aws/serverless-application-model/blob/master/versions/2016-10-31.md#sqs + Type: SQS Properties: - Queue: !GetAtt DemoSqsQueue.Arn BatchSize: 5 -# MaximumBatchingWindowInSeconds: 300 - FunctionResponseTypes: + Enabled: true + FunctionResponseTypes: - ReportBatchItemFailures - + Queue: !GetAtt SqsQueue.Arn + # -------------- - # Batch Processing for DynamoDb - - SampleDynamoDBTable: + # Batch Processing for DynamoDb (DDB) Stream + DdbStreamDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + KmsMasterKeyId: !Ref CustomerKey + + DdbTable: Type: AWS::DynamoDB::Table Properties: - TableName: powertools-dotnet-sample-dynamodb-table BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: id @@ -127,28 +115,51 @@ Resources: KeyType: HASH StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES - - DemoDynamoDBStreamProcessorFunction: + + DdbStreamProcessorFunction: Type: AWS::Serverless::Function Properties: - FunctionName: powertools-dotnet-sample-batch-processor-dynamodb CodeUri: ./src/HelloWorld/ Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute - Policies: AWSLambdaDynamoDBExecutionRole + Policies: + - AWSLambdaDynamoDBExecutionRole + - Statement: + - Sid: DlqPermissions + Effect: Allow + Action: + - sqs:SendMessage + - sqs:SendMessageBatch + Resource: !GetAtt DdbStreamDeadLetterQueue.Arn + - Sid: KmsKeyPermissions + Effect: Allow + Action: + - kms:GenerateDataKey + Resource: !GetAtt CustomerKey.Arn Events: Stream: Type: DynamoDB Properties: - Stream: !GetAtt SampleDynamoDBTable.StreamArn - BatchSize: 100 - StartingPosition: TRIM_HORIZON + BatchSize: 5 + BisectBatchOnFunctionError: true + DestinationConfig: + OnFailure: + Destination: !GetAtt DdbStreamDeadLetterQueue.Arn + Enabled: true FunctionResponseTypes: - ReportBatchItemFailures - + MaximumRetryAttempts: 2 + ParallelizationFactor: 1 + StartingPosition: LATEST + Stream: !GetAtt DdbTable.StreamArn + # -------------- - # Batch Processing for Kinesis Data Streams - - DemoKinesisStream: + # Batch Processing for Kinesis Data Stream + KinesisStreamDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + KmsMasterKeyId: !Ref CustomerKey + + KinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 @@ -156,42 +167,61 @@ Resources: EncryptionType: KMS KeyId: !Ref CustomerKey - StreamConsumer: - Type: "AWS::Kinesis::StreamConsumer" + KinesisStreamConsumer: + Type: AWS::Kinesis::StreamConsumer Properties: - StreamARN: !GetAtt DemoKinesisStream.Arn - ConsumerName: KinesisBatchHandlerConsumer - - - SampleKinesisEventBatchProcessorFunction: + ConsumerName: powertools-dotnet-sample-batch-kds-consumer + StreamARN: !GetAtt KinesisStream.Arn + + KinesisBatchProcessorFunction: Type: AWS::Serverless::Function Properties: - FunctionName: powertools-dotnet-sample-batch-processor-kinesis-data-stream - Runtime: dotnet6 + Policies: + - Statement: + - Sid: KinesisStreamConsumerPermissions + Effect: Allow + Action: + - kinesis:DescribeStreamConsumer + Resource: + - !GetAtt KinesisStreamConsumer.ConsumerARN + - Sid: DlqPermissions + Effect: Allow + Action: + - sqs:SendMessage + - sqs:SendMessageBatch + Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn + - Sid: KmsKeyPermissions + Effect: Allow + Action: + - kms:Decrypt + - kms:GenerateDataKey + Resource: !GetAtt CustomerKey.Arn CodeUri: ./src/HelloWorld/ Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute - MemorySize: 256 Events: Kinesis: Type: Kinesis Properties: - Stream: !GetAtt StreamConsumer.ConsumerARN + BatchSize: 5 + BisectBatchOnFunctionError: true + DestinationConfig: + OnFailure: + Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn + Enabled: true + FunctionResponseTypes: + - ReportBatchItemFailures + MaximumRetryAttempts: 2 + ParallelizationFactor: 1 StartingPosition: LATEST - BatchSize: 2 + Stream: !GetAtt KinesisStreamConsumer.ConsumerARN Outputs: - DemoSqsQueue: - Description: "ARN for main SQS queue" - Value: !GetAtt DemoSqsQueue.Arn - DemoDlqSqsQueue: - Description: "ARN for DLQ" - Value: !GetAtt DemoDlqSqsQueue.Arn - SampleSqsBatchProcessorFunction: - Description: "SQS Batch Handler - Lambda Function ARN" - Value: !GetAtt SampleSqsBatchProcessorFunction.Arn - DemoKinesisQueue: - Description: "ARN for Kinesis Stream" - Value: !GetAtt DemoKinesisStream.Arn - DemoSQSConsumerFunction: - Description: "SQS Batch Handler - Lambda Function ARN" - Value: !GetAtt SampleKinesisEventBatchProcessorFunction.Arn \ No newline at end of file + SqsQueueUrl: + Description: "SQS Queue URL" + Value: !Ref SqsQueue + DdbTableName: + Description: "DynamoDB Table Name" + Value: !Ref DdbTable + KinesisStreamArn: + Description: "Kinesis Stream ARN" + Value: !GetAtt KinesisStream.Arn \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs index 2b3b84ce..ba3c5f3f 100644 --- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs +++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessor.cs @@ -87,7 +87,7 @@ public virtual async Task> ProcessAsync(TEvent @event, : processingOptions.ErrorHandlingPolicy; // Invoke hook - await BeforeBatchProcessingAsync(@event); + await BeforeBatchProcessingAsync(@event, processingOptions); try { @@ -139,9 +139,13 @@ await ProcessRecord(recordHandler, pair, cancellationToken, failureRecords, succ ProcessingResult.FailureRecords.AddRange(failureRecords.Values); } - if (successRecords != null) ProcessingResult.SuccessRecords.AddRange(successRecords.Values); + if (successRecords != null) + { + ProcessingResult.SuccessRecords.AddRange(successRecords.Values); + } + // Invoke hook - await AfterBatchProcessingAsync(@event, ProcessingResult); + await AfterBatchProcessingAsync(@event, ProcessingResult, processingOptions); // Return result return ProcessingResult; @@ -226,8 +230,9 @@ private async Task ProcessRecord(IRecordHandler recordHandler, KeyValue /// Hook invoked before the batch event is processed. /// /// The event to be processed. + /// The configured batch processing options for this batch processing run. /// An awaitable . - protected virtual async Task BeforeBatchProcessingAsync(TEvent @event) + protected virtual async Task BeforeBatchProcessingAsync(TEvent @event, ProcessingOptions processingOptions) { await Task.CompletedTask; } @@ -272,12 +277,15 @@ protected virtual async Task HandleRecordFailureAsync(TRecord record, Exception /// Hook invoked after the batch event has been processed. /// /// The event that was processed. - /// + /// The result of this batch processing run. + /// The configured batch processing options for this batch processing run. /// An awaitable . /// - protected virtual async Task AfterBatchProcessingAsync(TEvent @event, ProcessingResult processingResult) + protected virtual async Task AfterBatchProcessingAsync(TEvent @event, + ProcessingResult processingResult, + ProcessingOptions processingOptions) { - if (processingResult.BatchRecords.Count == processingResult.FailureRecords.Count) + if (processingOptions.ThrowOnFullBatchFailure && processingResult.BatchRecords.Count == processingResult.FailureRecords.Count) { throw new BatchProcessingException( $"Entire batch of '{processingResult.BatchRecords.Count}' record(s) failed processing. See inner exceptions for details.", diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs index b8c0b281..64d70f9d 100644 --- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs +++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs @@ -23,6 +23,7 @@ using Amazon.Lambda.SQSEvents; using AspectInjector.Broker; using AWS.Lambda.Powertools.BatchProcessing.DynamoDb; +using AWS.Lambda.Powertools.BatchProcessing.Exceptions; using AWS.Lambda.Powertools.BatchProcessing.Internal; using AWS.Lambda.Powertools.BatchProcessing.Kinesis; using AWS.Lambda.Powertools.BatchProcessing.Sqs; @@ -157,7 +158,7 @@ public class BatchProcessorAttribute : UniversalWrapperAttribute /// /// Batch processing enabled (default false) /// - public bool BatchParallelProcessingEnabled = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled; + public bool BatchParallelProcessingEnabled = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled; /// /// The maximum degree of parallelism to apply during batch processing. @@ -165,6 +166,12 @@ public class BatchProcessorAttribute : UniversalWrapperAttribute /// public int MaxDegreeOfParallelism = PowertoolsConfigurations.Instance.BatchProcessingMaxDegreeOfParallelism; + /// + /// By default, the Batch processor throws a on full batch failure. + /// This behaviour can be disabled by setting this value to false. + /// + public bool ThrowOnFullBatchFailure = PowertoolsConfigurations.Instance.BatchThrowOnFullBatchFailureEnabled; + private static readonly Dictionary EventTypes = new() { {typeof(DynamoDBEvent), BatchEventType.DynamoDbStream}, @@ -332,7 +339,8 @@ private BatchProcessingAspectHandler CreateBatchProcessingAspec CancellationToken = CancellationToken.None, ErrorHandlingPolicy = errorHandlingPolicy, MaxDegreeOfParallelism = MaxDegreeOfParallelism, - BatchParallelProcessingEnabled = BatchParallelProcessingEnabled + BatchParallelProcessingEnabled = BatchParallelProcessingEnabled, + ThrowOnFullBatchFailure = ThrowOnFullBatchFailure }); } } diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs index 4c0b72da..8fc7701f 100644 --- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs +++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/ProcessingOptions.cs @@ -13,7 +13,9 @@ * permissions and limitations under the License. */ +using System; using System.Threading; +using AWS.Lambda.Powertools.BatchProcessing.Exceptions; namespace AWS.Lambda.Powertools.BatchProcessing; @@ -28,17 +30,22 @@ public class ProcessingOptions public CancellationToken? CancellationToken { get; init; } /// - /// The maximum degree of parallelism to apply during batch processing. + /// The maximum degree of parallelism to apply during batch processing if is enabled (default is -1, which means ). /// public int? MaxDegreeOfParallelism { get; init; } /// - /// The error handling policy to apply during batch processing. + /// The error handling policy to apply during batch processing (default is ). /// public BatchProcessorErrorHandlingPolicy? ErrorHandlingPolicy { get; init; } /// - /// Batch processing enabled (default false) + /// Controls whether parallel batch processing is enabled (default false). /// - public bool BatchParallelProcessingEnabled { get; init; } + public bool BatchParallelProcessingEnabled { get; init; } = false; + + /// + /// Controls whether the Batch processor throws a on full batch failure (default true). + /// + public bool ThrowOnFullBatchFailure { get; init; } = true; } diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs index 1a15d30b..912196da 100644 --- a/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs +++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/Constants.cs @@ -124,5 +124,10 @@ internal static class Constants /// /// Constant for POWERTOOLS_BATCH_PARALLEL_ENABLED environment variable /// - public const string BatchParallelProcessingEnabled = "POWERTOOLS_BATCH_PARALLEL_ENABLED"; + internal const string BatchParallelProcessingEnabled = "POWERTOOLS_BATCH_PARALLEL_ENABLED"; + + /// + /// Constant for POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE environment variable + /// + internal const string BatchThrowOnFullBatchFailureEnv = "POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE"; } \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs index ee95b318..35224437 100644 --- a/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs +++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/IPowertoolsConfigurations.cs @@ -155,7 +155,11 @@ public interface IPowertoolsConfigurations /// Gets the maximum degree of parallelism to apply during batch processing. /// /// Defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount. - int BatchProcessingMaxDegreeOfParallelism { get; } - - + int BatchProcessingMaxDegreeOfParallelism { get; } + + /// + /// Gets a value indicating whether Batch processing will throw an exception on full batch failure. + /// + /// Defaults to true + bool BatchThrowOnFullBatchFailureEnabled { get; } } \ No newline at end of file diff --git a/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs b/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs index f098d426..a7f1742f 100644 --- a/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs +++ b/libraries/src/AWS.Lambda.Powertools.Common/Core/PowertoolsConfigurations.cs @@ -214,4 +214,7 @@ public void SetExecutionEnvironment(T type) /// public int BatchProcessingMaxDegreeOfParallelism => GetEnvironmentVariableOrDefault(Constants.BatchMaxDegreeOfParallelismEnv, 1); + + /// + public bool BatchThrowOnFullBatchFailureEnabled => GetEnvironmentVariableOrDefault(Constants.BatchThrowOnFullBatchFailureEnv, true); } \ No newline at end of file diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs index 3c965372..9bfbf90b 100644 --- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs +++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/Handler/HandlerFunction.cs @@ -30,7 +30,6 @@ public class HandlerFunction public HandlerFunction() { - } public HandlerFunction(ISqsBatchProcessor batchProcessor, ISqsRecordHandler recordHandler) @@ -38,20 +37,19 @@ public HandlerFunction(ISqsBatchProcessor batchProcessor, ISqsRecordHandler reco _batchProcessor = batchProcessor; _recordHandler = recordHandler; } - - + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttributeAllFail(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] public BatchItemFailuresResponse HandlerUsingAttributeErrorPolicy(SQSEvent _) { @@ -63,61 +61,60 @@ public Task HandlerUsingAttributeAsync(SQSEvent _) { return Task.FromResult(SqsBatchProcessor.Result.BatchItemFailuresResponse); } - + [BatchProcessor] public BatchItemFailuresResponse HandlerUsingAttributeWithoutHandler(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor] public BatchItemFailuresResponse HandlerUsingAttributeWithoutEvent(string _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandler = typeof(BadCustomSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttributeBadHandler(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(BatchProcessor = typeof(BadCustomSqsRecordProcessor))] public BatchItemFailuresResponse HandlerUsingAttributeBadProcessor(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(BatchProcessorProvider = typeof(BadCustomSqsRecordProcessor))] public BatchItemFailuresResponse HandlerUsingAttributeBadProcessorProvider(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandlerProvider = typeof(BadCustomSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttributeBadHandlerProvider(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessor = typeof(CustomSqsBatchProcessor))] public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessor(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler), BatchProcessorProvider = typeof(CustomSqsBatchProcessorProvider))] public BatchItemFailuresResponse HandlerUsingAttributeAndCustomBatchProcessorProvider(SQSEvent _) { return SqsBatchProcessor.Result.BatchItemFailuresResponse; } - + public async Task HandlerUsingUtility(SQSEvent sqsEvent) { var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(sqsMessage => { var product = JsonSerializer.Deserialize(sqsMessage.Body); - if (product.GetProperty("Id").GetInt16() == 4) { throw new ArgumentException("Error on 4"); @@ -125,7 +122,7 @@ public async Task HandlerUsingUtility(SQSEvent sqsEve })); return result.BatchItemFailuresResponse; } - + public async Task HandlerUsingUtilityFromIoc(SQSEvent sqsEvent) { var batchProcessor = Services.Provider.GetRequiredService(); @@ -133,10 +130,70 @@ public async Task HandlerUsingUtilityFromIoc(SQSEvent var result = await batchProcessor.ProcessAsync(sqsEvent, recordHandler); return result.BatchItemFailuresResponse; } - + public async Task HandlerUsingUtilityFromIocConstructor(SQSEvent sqsEvent) { var result = await _batchProcessor.ProcessAsync(sqsEvent, _recordHandler); return result.BatchItemFailuresResponse; } + + [BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler), ThrowOnFullBatchFailure = false)] + public BatchItemFailuresResponse HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseAttribute(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } + + [BatchProcessor(RecordHandler = typeof(CustomFailSqsRecordHandler))] + public BatchItemFailuresResponse HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseEnv(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } + + public async Task HandlerUsingUtilityAllFail_ThrowOnFullBatchFailureFalseOption(SQSEvent sqsEvent) + { + var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, + RecordHandler.From(_ => throw new ArgumentException("Raise exception on all!")), + new ProcessingOptions + { + ThrowOnFullBatchFailure = false + }); + return result.BatchItemFailuresResponse; + } + + [BatchProcessor( + RecordHandler = typeof(CustomFailSqsRecordHandler), + ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure, + ThrowOnFullBatchFailure = false)] + public BatchItemFailuresResponse HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseAttr(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } + + [BatchProcessor( + RecordHandler = typeof(CustomFailSqsRecordHandler), + ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] + public BatchItemFailuresResponse HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseEnv(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } + + public async Task HandlerUsingUtility_StopOnFirstErrorOption_ThrowOnFullBatchFailureFalseOption(SQSEvent sqsEvent) + { + var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, + RecordHandler.From(async record => + { + var product = JsonSerializer.Deserialize(record.Body); + if (product.GetProperty("Id").GetInt16() == 4) + { + throw new ArgumentException("Error on 4"); + } + return await Task.FromResult(RecordHandlerResult.None); + }), + new ProcessingOptions + { + ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure, + ThrowOnFullBatchFailure = false + }); + return result.BatchItemFailuresResponse; + } } \ No newline at end of file diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs index c81d7f7b..04bd0b21 100644 --- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs +++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Handlers/SQS/HandlerTests.cs @@ -43,8 +43,8 @@ public Task Sqs_Handler_Using_Attribute() Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier); return Task.CompletedTask; - } - + } + [Fact] public Task Sqs_Handler_All_Fail_Using_Attribute_Should_Throw_BatchProcessingException() { @@ -186,9 +186,152 @@ public Task Sqs_Handler_Using_Attribute_Error_Policy_Attribute_StopOnFirstBatchI return Task.CompletedTask; } - + + [Fact] + public Task Sqs_Handler_Using_Attribute_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Attribute() + { + // Arrange + var request = new SQSEvent + { + Records = TestHelper.SqsMessages + }; + var function = new HandlerFunction(); + + // Act + var response = function.HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseAttribute(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + + return Task.CompletedTask; + } + + [Fact] + public Task Sqs_Handler_Using_Attribute_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Env() + { + // Arrange + Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "false"); + var request = new SQSEvent + { + Records = TestHelper.SqsMessages + }; + var function = new HandlerFunction(); + + // Act + var response = function.HandlerUsingAttributeAllFail_ThrowOnFullBatchFailureFalseEnv(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + + return Task.CompletedTask; + } + + [Fact] + public async Task Sqs_Handler_Using_Utility_All_Fail_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Option() + { + // Arrange + var request = new SQSEvent + { + Records = TestHelper.SqsMessages + }; + var function = new HandlerFunction(); + + // Act + var response = await function.HandlerUsingUtilityAllFail_ThrowOnFullBatchFailureFalseOption(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + } + + [Fact] + public Task Sqs_Fifo_Handler_Using_Attribute_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Attribute() + { + // Arrange + var request = new SQSEvent + { + Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened + }; + var function = new HandlerFunction(); + + // Act + var response = function.HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseAttr(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + + return Task.CompletedTask; + } + + [Fact] + public Task Sqs_Fifo_Handler_Using_Attribute_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Env() + { + // Arrange + Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "false"); + var request = new SQSEvent + { + Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened + }; + var function = new HandlerFunction(); + + // Act + var response = function.HandlerUsingAttributeFailAll_StopOnFirstErrorAttr_ThrowOnFullBatchFailureFalseEnv(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + + return Task.CompletedTask; + } + + [Fact] + public async Task Sqs_Fifo_Handler_Using_Utility_All_Fail_With_Stop_On_First_Error_Attr_Should_Not_Throw_BatchProcessingException_With_Throw_On_Full_Batch_Failure_False_Option() + { + // Arrange + var request = new SQSEvent + { + Records = TestHelper.SqsFifoMessagesWithFirstMessagePoisened + }; + var function = new HandlerFunction(); + + // Act + var response = await function.HandlerUsingUtility_StopOnFirstErrorOption_ThrowOnFullBatchFailureFalseOption(request); + + // Assert + Assert.Equal(5, response.BatchItemFailures.Count); + Assert.Equal("1", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("2", response.BatchItemFailures[1].ItemIdentifier); + Assert.Equal("3", response.BatchItemFailures[2].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[3].ItemIdentifier); + Assert.Equal("5", response.BatchItemFailures[4].ItemIdentifier); + } + public void Dispose() { + Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE", "true"); Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_PARALLEL_ENABLED", "false"); Environment.SetEnvironmentVariable("POWERTOOLS_BATCH_ERROR_HANDLING_POLICY", null); } diff --git a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs index 5b3cc8df..da090c51 100644 --- a/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs +++ b/libraries/tests/AWS.Lambda.Powertools.BatchProcessing.Tests/Helpers/Helpers.cs @@ -32,7 +32,7 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "1", - Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":1,\"Name\":\"product-1\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" }, new SQSEvent.SQSMessage @@ -44,7 +44,7 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "3", - Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" }, new SQSEvent.SQSMessage @@ -56,7 +56,7 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "5", - Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" }, }; @@ -66,7 +66,7 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "1", - Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":1,\"Name\":\"product-1\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" }, new SQSEvent.SQSMessage @@ -78,7 +78,7 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "3", - Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" }, new SQSEvent.SQSMessage @@ -90,11 +90,45 @@ internal static class Helpers new SQSEvent.SQSMessage { MessageId = "5", - Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}", + Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}", EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" }, }; + internal static List SqsFifoMessagesWithFirstMessagePoisened => + [ + new SQSEvent.SQSMessage + { + MessageId = "1", + Body = "fail", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" + }, + new SQSEvent.SQSMessage + { + MessageId = "2", + Body = "{\"Id\":2,\"Name\":\"product-2\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" + }, + new SQSEvent.SQSMessage + { + MessageId = "3", + Body = "{\"Id\":3,\"Name\":\"product-3\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" + }, + new SQSEvent.SQSMessage + { + MessageId = "4", + Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" + }, + new SQSEvent.SQSMessage + { + MessageId = "5", + Body = "{\"Id\":5,\"Name\":\"product-5\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue.fifo" + } + ]; + internal static List DynamoDbMessages => new() { new DynamoDBEvent.DynamodbStreamRecord From 09fc7f934f0c5b6dca272ac95d5709e00778ff1e Mon Sep 17 00:00:00 2001 From: Lasse Christiansen Date: Fri, 20 Sep 2024 15:54:13 +0200 Subject: [PATCH 2/5] Sonar fixes. --- examples/BatchProcessing/template.yaml | 20 ++++++++++++++++++- .../BatchProcessorAttribute.cs | 20 +++++++++---------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/examples/BatchProcessing/template.yaml b/examples/BatchProcessing/template.yaml index ad7ed024..8e302fcc 100644 --- a/examples/BatchProcessing/template.yaml +++ b/examples/BatchProcessing/template.yaml @@ -96,6 +96,12 @@ Resources: - ReportBatchItemFailures Queue: !GetAtt SqsQueue.Arn + SqsBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${SqsBatchProcessorFunction}" + RetentionInDays: 7 + # -------------- # Batch Processing for DynamoDb (DDB) Stream DdbStreamDeadLetterQueue: @@ -116,7 +122,7 @@ Resources: StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES - DdbStreamProcessorFunction: + DdbStreamBatchProcessorFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./src/HelloWorld/ @@ -152,6 +158,12 @@ Resources: StartingPosition: LATEST Stream: !GetAtt DdbTable.StreamArn + DdbStreamBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}" + RetentionInDays: 7 + # -------------- # Batch Processing for Kinesis Data Stream KinesisStreamDeadLetterQueue: @@ -215,6 +227,12 @@ Resources: StartingPosition: LATEST Stream: !GetAtt KinesisStreamConsumer.ConsumerARN + KinesisBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}" + RetentionInDays: 7 + Outputs: SqsQueueUrl: Description: "SQS Queue URL" diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs index 64d70f9d..08c12180 100644 --- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs +++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs @@ -133,44 +133,44 @@ public class BatchProcessorAttribute : UniversalWrapperAttribute /// /// Type of batch processor. /// - public Type BatchProcessor; + public Type BatchProcessor { get; set; } /// /// Type of batch processor provider. /// - public Type BatchProcessorProvider; + public Type BatchProcessorProvider { get; set; } /// /// Type of record handler. /// - public Type RecordHandler; + public Type RecordHandler { get; set; } /// /// Type of record handler provider. /// - public Type RecordHandlerProvider; + public Type RecordHandlerProvider { get; set; } /// /// Error handling policy. /// - public BatchProcessorErrorHandlingPolicy ErrorHandlingPolicy; + public BatchProcessorErrorHandlingPolicy ErrorHandlingPolicy { get; set; } /// /// Batch processing enabled (default false) /// - public bool BatchParallelProcessingEnabled = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled; + public bool BatchParallelProcessingEnabled { get; set; } = PowertoolsConfigurations.Instance.BatchParallelProcessingEnabled; /// /// The maximum degree of parallelism to apply during batch processing. /// Must enable BatchParallelProcessingEnabled /// - public int MaxDegreeOfParallelism = PowertoolsConfigurations.Instance.BatchProcessingMaxDegreeOfParallelism; + public int MaxDegreeOfParallelism { get; set; } = PowertoolsConfigurations.Instance.BatchProcessingMaxDegreeOfParallelism; - /// + /// /// By default, the Batch processor throws a on full batch failure. - /// This behaviour can be disabled by setting this value to false. + /// This behaviour can be disabled by setting this value to false. /// - public bool ThrowOnFullBatchFailure = PowertoolsConfigurations.Instance.BatchThrowOnFullBatchFailureEnabled; + public bool ThrowOnFullBatchFailure { get; set; } = PowertoolsConfigurations.Instance.BatchThrowOnFullBatchFailureEnabled; private static readonly Dictionary EventTypes = new() { From a93b3be24b9bd85024d3e29319eb762ba4ce122f Mon Sep 17 00:00:00 2001 From: Lasse Christiansen Date: Fri, 20 Sep 2024 15:55:44 +0200 Subject: [PATCH 3/5] Added information about the new 'ThrowOnFullBatchFailure' setting to the of the BatchProcessorAttribute. --- .../BatchProcessorAttribute.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs index 08c12180..d693d4ec 100644 --- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs +++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs @@ -91,6 +91,10 @@ namespace AWS.Lambda.Powertools.BatchProcessing; /// POWERTOOLS_BATCH_PROCESSING_MAX_DEGREE_OF_PARALLELISM /// int, defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount. /// +/// +/// POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE +/// bool, defaults to true. Controls if an exception is thrown on full batch failure. +/// /// ///
/// Parameters
@@ -124,6 +128,10 @@ namespace AWS.Lambda.Powertools.BatchProcessing; /// MaxDegreeOfParallelism /// int, defaults to 1 (no parallelism). Specify -1 to automatically use the value of ProcessorCount. /// +/// +/// ThrowOnFullBatchFailure +/// bool, defaults to true. Controls if an exception is thrown on full batch failure. +/// /// ///
[AttributeUsage(AttributeTargets.Method)] From 922d92fa72601915d8436c21a06eed60010234c1 Mon Sep 17 00:00:00 2001 From: Lasse Christiansen Date: Thu, 26 Sep 2024 08:05:04 +0200 Subject: [PATCH 4/5] Updated docs to include information about the new 'ThrowOnFullBatchFailure' option. --- docs/snippets/batch/templates/dynamodb.yaml | 159 +++++++++++--------- docs/snippets/batch/templates/kinesis.yaml | 154 +++++++++++-------- docs/snippets/batch/templates/sqs.yaml | 123 +++++++++------ docs/utilities/batch-processing.md | 63 ++++++-- examples/BatchProcessing/template.yaml | 4 +- 5 files changed, 303 insertions(+), 200 deletions(-) diff --git a/docs/snippets/batch/templates/dynamodb.yaml b/docs/snippets/batch/templates/dynamodb.yaml index 0508117a..13c83673 100644 --- a/docs/snippets/batch/templates/dynamodb.yaml +++ b/docs/snippets/batch/templates/dynamodb.yaml @@ -1,105 +1,118 @@ -AWSTemplateFormatVersion: '2010-09-09' +AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 -Description: partial batch response sample +Description: Example project demoing DynamoDB Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals: Function: - Timeout: 5 - MemorySize: 256 - Runtime: nodejs18.x - Tracing: Active + Timeout: 20 + Runtime: dotnet8 + MemorySize: 1024 Environment: Variables: - POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing + POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-ddb POWERTOOLS_LOG_LEVEL: Debug - POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase + POWERTOOLS_LOGGER_CASE: PascalCase POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1 - POWERTOOLS_BATCH_PARALLEL_ENABLED: false - -Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: ./src/HelloWorld/ - Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute - Policies: - # Lambda Destinations require additional permissions - # to send failure records from Kinesis/DynamoDB - - Version: '2012-10-17' - Statement: - Effect: 'Allow' - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - - KMSDecryptPolicy: - KeyId: !Ref CustomerKey - Events: - DynamoDBStream: - Type: DynamoDB - Properties: - Stream: !GetAtt SampleTable.StreamArn - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures + POWERTOOLS_BATCH_PARALLEL_ENABLED : false + POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true - SampleDLQ: - Type: AWS::SQS::Queue - Properties: - KmsMasterKeyId: !Ref CustomerKey +Resources: - SampleTable: - Type: AWS::DynamoDB::Table - Properties: - BillingMode: PAY_PER_REQUEST - AttributeDefinitions: - - AttributeName: pk - AttributeType: S - - AttributeName: sk - AttributeType: S - KeySchema: - - AttributeName: pk - KeyType: HASH - - AttributeName: sk - KeyType: RANGE - SSESpecification: - SSEEnabled: true - StreamSpecification: - StreamViewType: NEW_AND_OLD_IMAGES - # -------------- - # KMS key for encrypted queues + # KMS key for encrypted messages / records CustomerKey: Type: AWS::KMS::Key Properties: Description: KMS key for encrypted queues Enabled: true KeyPolicy: - Version: '2012-10-17' + Version: "2012-10-17" Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: - AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' - Action: 'kms:*' - Resource: '*' - - Sid: Allow use of the key + AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" + Action: "kms:*" + Resource: "*" + - Sid: Allow AWS Lambda to use the key Effect: Allow Principal: Service: lambda.amazonaws.com Action: - kms:Decrypt - kms:GenerateDataKey - Resource: '*' + Resource: "*" CustomerKeyAlias: Type: AWS::KMS::Alias Properties: - AliasName: alias/powertools-batch-sqs-demo - TargetKeyId: !Ref CustomerKey \ No newline at end of file + AliasName: !Sub alias/${AWS::StackName}-kms-key + TargetKeyId: !Ref CustomerKey + + # -------------- + # Batch Processing for DynamoDb (DDB) Stream + DdbStreamDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + KmsMasterKeyId: !Ref CustomerKey + + DdbTable: + Type: AWS::DynamoDB::Table + Properties: + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: id + AttributeType: S + KeySchema: + - AttributeName: id + KeyType: HASH + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + + DdbStreamBatchProcessorFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: ./src/HelloWorld/ + Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute + Policies: + - AWSLambdaDynamoDBExecutionRole + - Statement: + - Sid: DlqPermissions + Effect: Allow + Action: + - sqs:SendMessage + - sqs:SendMessageBatch + Resource: !GetAtt DdbStreamDeadLetterQueue.Arn + - Sid: KmsKeyPermissions + Effect: Allow + Action: + - kms:GenerateDataKey + Resource: !GetAtt CustomerKey.Arn + Events: + Stream: + Type: DynamoDB + Properties: + BatchSize: 5 + BisectBatchOnFunctionError: true + DestinationConfig: + OnFailure: + Destination: !GetAtt DdbStreamDeadLetterQueue.Arn + Enabled: true + FunctionResponseTypes: + - ReportBatchItemFailures + MaximumRetryAttempts: 2 + ParallelizationFactor: 1 + StartingPosition: LATEST + Stream: !GetAtt DdbTable.StreamArn + + DdbStreamBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}" + RetentionInDays: 7 + +Outputs: + DdbTableName: + Description: "DynamoDB Table Name" + Value: !Ref DdbTable \ No newline at end of file diff --git a/docs/snippets/batch/templates/kinesis.yaml b/docs/snippets/batch/templates/kinesis.yaml index 5088fbf4..911bd151 100644 --- a/docs/snippets/batch/templates/kinesis.yaml +++ b/docs/snippets/batch/templates/kinesis.yaml @@ -1,95 +1,125 @@ -AWSTemplateFormatVersion: '2010-09-09' +AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 -Description: partial batch response sample +Description: Example project demoing Kinesis Data Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals: Function: - Timeout: 5 - MemorySize: 256 - Runtime: nodejs18.x - Tracing: Active + Timeout: 20 + Runtime: dotnet8 + MemorySize: 1024 Environment: Variables: - POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing + POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-kinesis POWERTOOLS_LOG_LEVEL: Debug - POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase + POWERTOOLS_LOGGER_CASE: PascalCase POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1 - POWERTOOLS_BATCH_PARALLEL_ENABLED: false + POWERTOOLS_BATCH_PARALLEL_ENABLED : false + POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: ./src/HelloWorld/ - Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute - Policies: - # Lambda Destinations require additional permissions - # to send failure records to DLQ from Kinesis/DynamoDB - - Version: '2012-10-17' - Statement: - Effect: 'Allow' - Action: - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:SendMessage - Resource: !GetAtt SampleDLQ.Arn - - KMSDecryptPolicy: - KeyId: !Ref CustomerKey - Events: - KinesisStream: - Type: Kinesis - Properties: - Stream: !GetAtt SampleStream.Arn - BatchSize: 100 - StartingPosition: LATEST - MaximumRetryAttempts: 2 - DestinationConfig: - OnFailure: - Destination: !GetAtt SampleDLQ.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - SampleDLQ: - Type: AWS::SQS::Queue - Properties: - KmsMasterKeyId: !Ref CustomerKey - - SampleStream: - Type: AWS::Kinesis::Stream - Properties: - ShardCount: 1 - StreamEncryption: - EncryptionType: KMS - KeyId: alias/aws/kinesis - # -------------- - # KMS key for encrypted queues + # KMS key for encrypted messages / records CustomerKey: Type: AWS::KMS::Key Properties: Description: KMS key for encrypted queues Enabled: true KeyPolicy: - Version: '2012-10-17' + Version: "2012-10-17" Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: - AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' - Action: 'kms:*' - Resource: '*' - - Sid: Allow use of the key + AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" + Action: "kms:*" + Resource: "*" + - Sid: Allow AWS Lambda to use the key Effect: Allow Principal: Service: lambda.amazonaws.com Action: - kms:Decrypt - kms:GenerateDataKey - Resource: '*' + Resource: "*" CustomerKeyAlias: Type: AWS::KMS::Alias Properties: - AliasName: alias/powertools-batch-sqs-demo - TargetKeyId: !Ref CustomerKey \ No newline at end of file + AliasName: !Sub alias/${AWS::StackName}-kms-key + TargetKeyId: !Ref CustomerKey + + # -------------- + # Batch Processing for Kinesis Data Stream + KinesisStreamDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + KmsMasterKeyId: !Ref CustomerKey + + KinesisStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 + StreamEncryption: + EncryptionType: KMS + KeyId: !Ref CustomerKey + + KinesisStreamConsumer: + Type: AWS::Kinesis::StreamConsumer + Properties: + ConsumerName: powertools-dotnet-sample-batch-kds-consumer + StreamARN: !GetAtt KinesisStream.Arn + + KinesisBatchProcessorFunction: + Type: AWS::Serverless::Function + Properties: + Policies: + - Statement: + - Sid: KinesisStreamConsumerPermissions + Effect: Allow + Action: + - kinesis:DescribeStreamConsumer + Resource: + - !GetAtt KinesisStreamConsumer.ConsumerARN + - Sid: DlqPermissions + Effect: Allow + Action: + - sqs:SendMessage + - sqs:SendMessageBatch + Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn + - Sid: KmsKeyPermissions + Effect: Allow + Action: + - kms:Decrypt + - kms:GenerateDataKey + Resource: !GetAtt CustomerKey.Arn + CodeUri: ./src/HelloWorld/ + Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute + Events: + Kinesis: + Type: Kinesis + Properties: + BatchSize: 5 + BisectBatchOnFunctionError: true + DestinationConfig: + OnFailure: + Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn + Enabled: true + FunctionResponseTypes: + - ReportBatchItemFailures + MaximumRetryAttempts: 2 + ParallelizationFactor: 1 + StartingPosition: LATEST + Stream: !GetAtt KinesisStreamConsumer.ConsumerARN + + KinesisBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}" + RetentionInDays: 7 + +Outputs: + KinesisStreamArn: + Description: "Kinesis Stream ARN" + Value: !GetAtt KinesisStream.Arn \ No newline at end of file diff --git a/docs/snippets/batch/templates/sqs.yaml b/docs/snippets/batch/templates/sqs.yaml index dfda6bed..fbd21305 100644 --- a/docs/snippets/batch/templates/sqs.yaml +++ b/docs/snippets/batch/templates/sqs.yaml @@ -1,83 +1,106 @@ -AWSTemplateFormatVersion: '2010-09-09' +AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 -Description: partial batch response sample +Description: Example project demoing SQS Queue processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals: Function: - Timeout: 5 - MemorySize: 256 - Runtime: nodejs18.x - Tracing: Active + Timeout: 20 + Runtime: dotnet8 + MemorySize: 1024 Environment: Variables: - POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-processing + POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-sqs POWERTOOLS_LOG_LEVEL: Debug - POWERTOOLS_LOGGER_CASE: PascalCase # Allowed values are: CamelCase, PascalCase and SnakeCase + POWERTOOLS_LOGGER_CASE: PascalCase POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1 - POWERTOOLS_BATCH_PARALLEL_ENABLED: false + POWERTOOLS_BATCH_PARALLEL_ENABLED : false + POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources: - HelloWorldFunction: - Type: AWS::Serverless::Function - Properties: - CodeUri: ./src/HelloWorld/ - Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute - Policies: - - SQSPollerPolicy: - QueueName: !GetAtt SampleQueue.QueueName - - KMSDecryptPolicy: - KeyId: !Ref CustomerKey - Events: - Batch: - Type: SQS - Properties: - Queue: !GetAtt SampleQueue.Arn - FunctionResponseTypes: - - ReportBatchItemFailures - - SampleDLQ: - Type: AWS::SQS::Queue - Properties: - KmsMasterKeyId: !Ref CustomerKey - SampleQueue: - Type: AWS::SQS::Queue - Properties: - VisibilityTimeout: 30 # Fn timeout * 6 - SqsManagedSseEnabled: true - RedrivePolicy: - maxReceiveCount: 2 - deadLetterTargetArn: !GetAtt SampleDLQ.Arn - KmsMasterKeyId: !Ref CustomerKey - # -------------- - # KMS key for encrypted queues + # KMS key for encrypted messages / records CustomerKey: Type: AWS::KMS::Key Properties: Description: KMS key for encrypted queues Enabled: true KeyPolicy: - Version: '2012-10-17' + Version: "2012-10-17" Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: - AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root' - Action: 'kms:*' - Resource: '*' - - Sid: Allow use of the key + AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root" + Action: "kms:*" + Resource: "*" + - Sid: Allow AWS Lambda to use the key Effect: Allow Principal: Service: lambda.amazonaws.com Action: - kms:Decrypt - kms:GenerateDataKey - Resource: '*' + Resource: "*" CustomerKeyAlias: Type: AWS::KMS::Alias Properties: - AliasName: alias/powertools-batch-sqs-demo - TargetKeyId: !Ref CustomerKey \ No newline at end of file + AliasName: !Sub alias/${AWS::StackName}-kms-key + TargetKeyId: !Ref CustomerKey + + # -------------- + # Batch Processing for SQS Queue + SqsDeadLetterQueue: + Type: AWS::SQS::Queue + Properties: + KmsMasterKeyId: !Ref CustomerKey + + SqsQueue: + Type: AWS::SQS::Queue + Properties: + RedrivePolicy: + deadLetterTargetArn: !GetAtt SqsDeadLetterQueue.Arn + maxReceiveCount: 2 + KmsMasterKeyId: !Ref CustomerKey + + SqsBatchProcessorFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: ./src/HelloWorld/ + Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute + Policies: + - Statement: + - Sid: DlqPermissions + Effect: Allow + Action: + - sqs:SendMessage + - sqs:SendMessageBatch + Resource: !GetAtt SqsDeadLetterQueue.Arn + - Sid: KmsKeyPermissions + Effect: Allow + Action: + - kms:Decrypt + - kms:GenerateDataKey + Resource: !GetAtt CustomerKey.Arn + Events: + SqsBatch: + Type: SQS + Properties: + BatchSize: 5 + Enabled: true + FunctionResponseTypes: + - ReportBatchItemFailures + Queue: !GetAtt SqsQueue.Arn + + SqsBatchProcessorFunctionLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${SqsBatchProcessorFunction}" + RetentionInDays: 7 + +Outputs: + SqsQueueUrl: + Description: "SQS Queue URL" + Value: !Ref SqsQueue \ No newline at end of file diff --git a/docs/utilities/batch-processing.md b/docs/utilities/batch-processing.md index eb18eb7c..021f8ef8 100644 --- a/docs/utilities/batch-processing.md +++ b/docs/utilities/batch-processing.md @@ -84,11 +84,12 @@ You use your preferred deployment framework to set the correct configuration whi Batch processing can be configured with the settings bellow: -Setting | Description | Environment variable | Default -------------------------------------------------- |-------------------------------------------------------------------------| ------------------------------------------------- | ------------------------------------------------- -**Error Handling Policy** | Sets the error handling policy applied during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent` -**Parallel Enabled** | Sets if parallelism is enabled | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false` -**Max Degree of Parallelism** | Sets the maximum degree of parallelism to apply during batch processing | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1` +Setting | Description | Environment variable | Default +------- | ----------- | -------------------- | ------- +**Error Handling Policy** | The error handling policy to apply during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent` +**Parallel Enabled** | Controls if parallel processing of batch items is enabled. | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false` +**Max Degree of Parallelism** | The maximum degree of parallelism to apply if parallel processing is enabled. | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1` +**Throw on Full Batch Failure** | Controls if a `BatchProcessingException` is thrown on full batch failure. | `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE` | `true` ### Required resources @@ -98,19 +99,19 @@ The remaining sections of the documentation will rely on these samples. For comp === "SQS" - ```yaml title="template.yaml" hl_lines="36-37" + ```yaml title="template.yaml" hl_lines="93-94" --8<-- "docs/snippets/batch/templates/sqs.yaml" ``` === "Kinesis Data Streams" - ```yaml title="template.yaml" hl_lines="50-51" + ```yaml title="template.yaml" hl_lines="109-110" --8<-- "docs/snippets/batch/templates/kinesis.yaml" ``` === "DynamoDB Streams" - ```yaml title="template.yaml" hl_lines="49-50" + ```yaml title="template.yaml" hl_lines="102-103" --8<-- "docs/snippets/batch/templates/dynamodb.yaml" ``` @@ -613,14 +614,13 @@ Another approach is to decorate the handler and use one of the policies in the * ``` - ### Partial failure mechanics All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: -* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` -* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing -* **All records failed to be processed**. We will raise `AWS.Lambda.Powertools.BatchProcessing.Exceptions.BatchProcessingException` exception with a list of all exceptions raised when processing +* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`. +* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing. +* **All records failed to be processed**. By defaullt, we will throw a `BatchProcessingException` with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See [Working with full batch failures](#working-with-full-batch-failures) for more information. The following sequence diagrams explain how each Batch processor behaves under different scenarios. @@ -735,7 +735,6 @@ sequenceDiagram Kinesis and DynamoDB streams mechanism with multiple batch item failures - ### Advanced #### Using utility outside handler and IoC @@ -848,6 +847,44 @@ You can also set `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` Environment Variab } ``` +#### Working with full batch failures + +By default, the `BatchProcessor` will throw a `BatchProcessingException` if all records in the batch fail to process. We do this to reflect the failure in your operational metrics. + +When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance. + +For these scenarios, you can set `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false`, or the equivalent on either the `BatchProcessor` decorator or on the `ProcessingOptions` object. See examples below. + +=== "Setting ThrowOnFullBatchFailure on Decorator" + + ```csharp hl_lines="3" + [BatchProcessor( + RecordHandler = typeof(CustomSqsRecordHandler), + ThrowOnFullBatchFailure = false)] + public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } + + ``` + +=== "Setting ThrowOnFullBatchFailure outside Decorator" + + ```csharp hl_lines="8" + public async Task HandlerUsingUtility(SQSEvent sqsEvent) + { + var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x => + { + // Inline handling of SQS message... + }), new ProcessingOptions + { + ThrowOnFullBatchFailure = false + }); + return result.BatchItemFailuresResponse; + } + + ``` + #### Extending BatchProcessor You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. diff --git a/examples/BatchProcessing/template.yaml b/examples/BatchProcessing/template.yaml index 8e302fcc..17dfd20c 100644 --- a/examples/BatchProcessing/template.yaml +++ b/examples/BatchProcessing/template.yaml @@ -13,7 +13,7 @@ Globals: Variables: POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch POWERTOOLS_LOG_LEVEL: Debug - POWERTOOLS_LOGGER_CASE: SnakeCase + POWERTOOLS_LOGGER_CASE: PascalCase POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1 POWERTOOLS_BATCH_PARALLEL_ENABLED : false @@ -49,7 +49,7 @@ Resources: CustomerKeyAlias: Type: AWS::KMS::Alias Properties: - AliasName: alias/powertools-dotnet-sample-batch-kms-key + AliasName: !Sub alias/${AWS::StackName}-kms-key TargetKeyId: !Ref CustomerKey # -------------- From 20dee71e82767da7e714f8ac33a5905a55d43e7c Mon Sep 17 00:00:00 2001 From: Lasse Christiansen Date: Fri, 27 Sep 2024 11:39:52 +0200 Subject: [PATCH 5/5] Streamlined the .NET 8 targeting in the Batch Processing HelloWorld example project. --- examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj | 2 +- .../test/HelloWorld.Test/HelloWorld.Tests.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj b/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj index 90cb91fd..7342934b 100644 --- a/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj +++ b/examples/BatchProcessing/src/HelloWorld/HelloWorld.csproj @@ -1,6 +1,6 @@  - net6.0;net8.0 + net8.0 true enable diff --git a/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj b/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj index fd097307..b0fe1327 100644 --- a/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj +++ b/examples/BatchProcessing/test/HelloWorld.Test/HelloWorld.Tests.csproj @@ -1,6 +1,6 @@ - net6.0;net8.0 + net8.0