From 770cf84f59054c25c3fdf49f50f1dc5e91a46ee5 Mon Sep 17 00:00:00 2001 From: madhub Date: Thu, 26 Oct 2023 20:49:18 +0530 Subject: [PATCH] Added fatal checker configuration option in SQSMessagePollerOptions (#62) --- .../Configuration/MessageBusBuilder.cs | 4 ++- .../SQSMessagePollerConfiguration.cs | 24 ++++++++++++++++++ .../Configuration/SQSMessagePollerOptions.cs | 7 ++++++ src/AWS.Messaging/SQS/SQSMessagePoller.cs | 25 +++---------------- 4 files changed, 38 insertions(+), 22 deletions(-) diff --git a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs index 340b29e..ce821ba 100644 --- a/src/AWS.Messaging/Configuration/MessageBusBuilder.cs +++ b/src/AWS.Messaging/Configuration/MessageBusBuilder.cs @@ -123,7 +123,9 @@ public IMessageBusBuilder AddSQSPoller(string queueUrl, Action public int WaitTimeSeconds { get; init; } = DEFAULT_WAIT_TIME_SECONDS; + /// + /// Function that indicates whether an should stop the poller or continue. + /// + public Func IsSQSExceptionFatal { get; set; } = IsFatalException; + /// /// Construct an instance of /// @@ -113,4 +119,22 @@ internal MessageManagerConfiguration ToMessageManagerConfiguration() VisibilityTimeoutExtensionHeartbeatInterval = VisibilityTimeoutExtensionHeartbeatInterval }; } + + /// + /// error codes that should be treated as fatal and stop the poller + /// + private static readonly HashSet _fatalSQSErrorCodes = new HashSet + { + "InvalidAddress", // Returned for an invalid queue URL + "AccessDenied" // Returned with insufficient IAM permissions to read from the configured queue + }; + + /// + /// Determines if a given SQS exception should be treated as fatal and rethrown to stop the poller + /// + /// SQS Exception + internal static bool IsFatalException(AmazonSQSException sqsException) + { + return sqsException is QueueDoesNotExistException ? true : _fatalSQSErrorCodes.Contains(sqsException.ErrorCode); + } } diff --git a/src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs b/src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs index 69000a2..9d4922a 100644 --- a/src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs +++ b/src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs @@ -1,6 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using Amazon.SQS; + namespace AWS.Messaging.Configuration; /// @@ -23,6 +25,11 @@ public class SQSMessagePollerOptions /// public int VisibilityTimeoutExtensionHeartbeatInterval { get; set; } = SQSMessagePollerConfiguration.DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_HEARTBEAT_INTERVAL; + /// + /// + /// + public Func IsSQSExceptionFatal { get; set; } = SQSMessagePollerConfiguration.IsFatalException; + /// /// Validates that the options are valid against the message framework's and/or SQS limits /// diff --git a/src/AWS.Messaging/SQS/SQSMessagePoller.cs b/src/AWS.Messaging/SQS/SQSMessagePoller.cs index 775ea2c..585af58 100644 --- a/src/AWS.Messaging/SQS/SQSMessagePoller.cs +++ b/src/AWS.Messaging/SQS/SQSMessagePoller.cs @@ -124,7 +124,7 @@ private async Task PollQueue(CancellationToken token) // Rethrow the exception to fail fast for invalid configuration, permissioning, etc. // TODO: explore a "cool down mode" for repeated exceptions - if (IsSQSExceptionFatal(ex)) + if (_configuration.IsSQSExceptionFatal(ex)) { throw; } @@ -213,7 +213,7 @@ public async Task DeleteMessagesAsync(IEnumerable messages, Can string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint); // Rethrow the exception to fail fast for invalid configuration, permissioning, etc. - if (IsSQSExceptionFatal(ex)) + if (_configuration.IsSQSExceptionFatal(ex)) { throw; } @@ -319,7 +319,7 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable x.Id)), _configuration.SubscriberEndpoint); // Rethrow the exception to fail fast for invalid configuration, permissioning, etc. - if (IsSQSExceptionFatal(amazonEx)) + if (_configuration.IsSQSExceptionFatal(amazonEx)) { throw amazonEx; } @@ -338,22 +338,5 @@ public ValueTask ReportMessageFailureAsync(MessageEnvelope message, Cancellation { return ValueTask.CompletedTask; } - - /// - /// error codes that should be treated as fatal and stop the poller - /// - private static readonly HashSet _fatalSQSErrorCodes = new HashSet - { - "InvalidAddress", // Returned for an invalid queue URL - "AccessDenied" // Returned with insufficient IAM permissions to read from the configured queue - }; - - /// - /// Determines if a given SQS exception should be treated as fatal and rethrown to stop the poller - /// - /// SQS Exception - private bool IsSQSExceptionFatal(AmazonSQSException sqsException) - { - return _fatalSQSErrorCodes.Contains(sqsException.ErrorCode); - } + }