Skip to content

Commit

Permalink
Added fatal checker configuration option in SQSMessagePollerOptions (#62
Browse files Browse the repository at this point in the history
)
  • Loading branch information
madhub authored Oct 26, 2023
1 parent d74bafe commit 770cf84
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/AWS.Messaging/Configuration/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public IMessageBusBuilder AddSQSPoller(string queueUrl, Action<SQSMessagePollerO
VisibilityTimeout = sqsMessagePollerOptions.VisibilityTimeout,
VisibilityTimeoutExtensionThreshold = sqsMessagePollerOptions.VisibilityTimeoutExtensionThreshold,
VisibilityTimeoutExtensionHeartbeatInterval = sqsMessagePollerOptions.VisibilityTimeoutExtensionHeartbeatInterval,
WaitTimeSeconds = sqsMessagePollerOptions.WaitTimeSeconds
WaitTimeSeconds = sqsMessagePollerOptions.WaitTimeSeconds,
IsSQSExceptionFatal = sqsMessagePollerOptions.IsSQSExceptionFatal

};

_messageConfiguration.MessagePollerConfigurations.Add(sqsMessagePollerConfiguration);
Expand Down
24 changes: 24 additions & 0 deletions src/AWS.Messaging/Configuration/SQSMessagePollerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.SQS;
using Amazon.SQS.Model;

namespace AWS.Messaging.Configuration;
Expand Down Expand Up @@ -87,6 +88,11 @@ internal class SQSMessagePollerConfiguration : IMessagePollerConfiguration
/// </remarks>
public int WaitTimeSeconds { get; init; } = DEFAULT_WAIT_TIME_SECONDS;

/// <summary>
/// Function that indicates whether an <see cref="AmazonSQSException"/> should stop the poller or continue.
/// </summary>
public Func<AmazonSQSException, bool> IsSQSExceptionFatal { get; set; } = IsFatalException;

/// <summary>
/// Construct an instance of <see cref="SQSMessagePollerConfiguration" />
/// </summary>
Expand All @@ -113,4 +119,22 @@ internal MessageManagerConfiguration ToMessageManagerConfiguration()
VisibilityTimeoutExtensionHeartbeatInterval = VisibilityTimeoutExtensionHeartbeatInterval
};
}

/// <summary>
/// <see cref="AmazonSQSException"/> error codes that should be treated as fatal and stop the poller
/// </summary>
private static readonly HashSet<string> _fatalSQSErrorCodes = new HashSet<string>
{
"InvalidAddress", // Returned for an invalid queue URL
"AccessDenied" // Returned with insufficient IAM permissions to read from the configured queue
};

/// <summary>
/// Determines if a given SQS exception should be treated as fatal and rethrown to stop the poller
/// </summary>
/// <param name="sqsException">SQS Exception</param>
internal static bool IsFatalException(AmazonSQSException sqsException)
{
return sqsException is QueueDoesNotExistException ? true : _fatalSQSErrorCodes.Contains(sqsException.ErrorCode);
}
}
7 changes: 7 additions & 0 deletions src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.SQS;

namespace AWS.Messaging.Configuration;

/// <summary>
Expand All @@ -23,6 +25,11 @@ public class SQSMessagePollerOptions
/// <inheritdoc cref="SQSMessagePollerConfiguration.VisibilityTimeoutExtensionHeartbeatInterval"/>
public int VisibilityTimeoutExtensionHeartbeatInterval { get; set; } = SQSMessagePollerConfiguration.DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_HEARTBEAT_INTERVAL;

/// <summary>
/// <inheritdoc cref="SQSMessagePollerConfiguration.IsFatalException(AmazonSQSException)" />
/// </summary>
public Func<AmazonSQSException, bool> IsSQSExceptionFatal { get; set; } = SQSMessagePollerConfiguration.IsFatalException;

/// <summary>
/// Validates that the options are valid against the message framework's and/or SQS limits
/// </summary>
Expand Down
25 changes: 4 additions & 21 deletions src/AWS.Messaging/SQS/SQSMessagePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private async Task PollQueue(CancellationToken token)

// Rethrow the exception to fail fast for invalid configuration, permissioning, etc.
// TODO: explore a "cool down mode" for repeated exceptions
if (IsSQSExceptionFatal(ex))
if (_configuration.IsSQSExceptionFatal(ex))
{
throw;
}
Expand Down Expand Up @@ -213,7 +213,7 @@ public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages, Can
string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint);

// Rethrow the exception to fail fast for invalid configuration, permissioning, etc.
if (IsSQSExceptionFatal(ex))
if (_configuration.IsSQSExceptionFatal(ex))
{
throw;
}
Expand Down Expand Up @@ -319,7 +319,7 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable<MessageEnvelop
string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint);

// Rethrow the exception to fail fast for invalid configuration, permissioning, etc.
if (IsSQSExceptionFatal(amazonEx))
if (_configuration.IsSQSExceptionFatal(amazonEx))
{
throw amazonEx;
}
Expand All @@ -338,22 +338,5 @@ public ValueTask ReportMessageFailureAsync(MessageEnvelope message, Cancellation
{
return ValueTask.CompletedTask;
}

/// <summary>
/// <see cref="AmazonSQSException"/> error codes that should be treated as fatal and stop the poller
/// </summary>
private static readonly HashSet<string> _fatalSQSErrorCodes = new HashSet<string>
{
"InvalidAddress", // Returned for an invalid queue URL
"AccessDenied" // Returned with insufficient IAM permissions to read from the configured queue
};

/// <summary>
/// Determines if a given SQS exception should be treated as fatal and rethrown to stop the poller
/// </summary>
/// <param name="sqsException">SQS Exception</param>
private bool IsSQSExceptionFatal(AmazonSQSException sqsException)
{
return _fatalSQSErrorCodes.Contains(sqsException.ErrorCode);
}

}

0 comments on commit 770cf84

Please sign in to comment.