Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueAPI: DeleteMessage Signature modification #630

Merged
merged 2 commits into from
Jul 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 121 additions & 123 deletions dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ private void Initialize(GXService providerService)
_isFIFO = _queueURL.EndsWith(".fifo");

}

public override string GetName()
{
return Name;
}
public bool GetMessageFromException(Exception ex, SdtMessages_Message msg)
{
try
{
AmazonSQSException sqs_ex = (AmazonSQSException)ex;
msg.gxTpr_Id = sqs_ex.ErrorCode;
msg.gxTpr_Description = sqs_ex.Message;
return true;
}
catch (Exception)
{
return false;
}
}

#region API methods
public void Clear(out bool success)
{
try
Expand All @@ -81,134 +102,32 @@ public void Clear(out bool success)
}
}

public MessageQueueResult DeleteMessage(string messageHandleId, out bool success)
public MessageQueueResult DeleteMessage(SimpleQueueMessage simpleQueueMessage, out bool success)
{
success = false;
MessageQueueResult messageQueueResult = new MessageQueueResult();

List<string> messageHandleIdToDelete = new List<string> { messageHandleId };
IList<MessageQueueResult> messageQueueResults = RemoveMessages(messageHandleIdToDelete, out bool operationOK);
if ((operationOK) && (messageQueueResults != null))
{
messageQueueResult = messageQueueResults[0];
Task<DeleteMessageResponse> task = Task.Run<DeleteMessageResponse>(async () => await DeleteQueueMessageAsync(simpleQueueMessage.MessageHandleId));
DeleteMessageResponse deleteMessageResponse = task.Result;
if ((deleteMessageResponse != null) && ((deleteMessageResponse.HttpStatusCode == System.Net.HttpStatusCode.Accepted) || (deleteMessageResponse.HttpStatusCode == System.Net.HttpStatusCode.Created) || (deleteMessageResponse.HttpStatusCode == System.Net.HttpStatusCode.OK)))
{
success = true;
}
return messageQueueResult;
}
SetupMessageQueueResult(simpleQueueMessage, MessageQueueResultStatus.Deleted);

public IList<MessageQueueResult> DeleteMessages(List<string> messageHandleId, out bool success)
{
return RemoveMessages(messageHandleId, out success);
}
private IList<MessageQueueResult> RemoveMessages(List<string> messageHandleId, out bool success)
{
IList<MessageQueueResult> messageQueueResults = new List<MessageQueueResult>();
success = false;
try
{
Task<DeleteMessageBatchResponse> task = Task.Run<DeleteMessageBatchResponse>(async () => await DeleteQueueMessageBatchAsync(messageHandleId));

DeleteMessageBatchResponse deleteMessageBatchResponse = task.Result;
if (deleteMessageBatchResponse != null)
success = (deleteMessageBatchResponse.Failed.Count == 0);

foreach (BatchResultErrorEntry entry in deleteMessageBatchResponse.Failed)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

foreach (DeleteMessageBatchResultEntry entry in deleteMessageBatchResponse.Successful)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

}
catch (AggregateException ae)
{
throw ae;
}
return messageQueueResults;
return messageQueueResult;
}
public IList<MessageQueueResult> DeleteMessages(IList<SimpleQueueMessage> simpleQueueMessages, out bool success)
{
return RemoveMessages(simpleQueueMessages, out success);
}
private IList<MessageQueueResult> RemoveMessages(IList<SimpleQueueMessage> simpleQueueMessages, out bool success)
{
IList<MessageQueueResult> messageQueueResults = new List<MessageQueueResult>();
List<string> messageHandleIds = new List<string>();
success = false;
try
{
foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages)
{
messageHandleIds.Add(simpleQueueMessage.MessageHandleId);
}
Task<DeleteMessageBatchResponse> task = Task.Run<DeleteMessageBatchResponse>(async () => await DeleteQueueMessageBatchAsync(messageHandleIds));

DeleteMessageBatchResponse deleteMessageBatchResponse = task.Result;
if (deleteMessageBatchResponse != null)
success = (deleteMessageBatchResponse.Failed.Count == 0);

foreach (BatchResultErrorEntry entry in deleteMessageBatchResponse.Failed)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

foreach (DeleteMessageBatchResultEntry entry in deleteMessageBatchResponse.Successful)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

}
catch (AggregateException ae)
{
throw ae;
}
return messageQueueResults;
}

public IList<SimpleQueueMessage> GetMessages(out bool success)
{
return RetrieveMessages(success : out success);
}

public IList<SimpleQueueMessage> GetMessages(MessageQueueOptions messageQueueOptions, out bool success)
{
return RetrieveMessages(out success, messageQueueOptions);
}
private IList<SimpleQueueMessage> RetrieveMessages(out bool success, MessageQueueOptions messageQueueOptions = null)
{
success = false;
IList<SimpleQueueMessage> simpleQueueMessages = new List<SimpleQueueMessage>();
try
{
Task<ReceiveMessageResponse> task = Task.Run<ReceiveMessageResponse>(async () => await GetMessageAsync(messageQueueOptions));

ReceiveMessageResponse response = task.Result;
success = response != null;
if (success)
{
List<Message> messagesList = response.Messages;

foreach (Message message in messagesList)
{
SimpleQueueMessage simpleQueueMessage = SetupSimpleQueueMessage(message);
simpleQueueMessages.Add(simpleQueueMessage);
}
}
}
catch (AggregateException ae)
{
throw ae;
}
return simpleQueueMessages;
}

public int GetQueueLength(out bool success)
{
int approxNumberMessages = 0;
Expand All @@ -231,7 +150,6 @@ public int GetQueueLength(out bool success)
}
return approxNumberMessages;
}

public MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, out bool success)
{
success = false;
Expand Down Expand Up @@ -261,7 +179,6 @@ public MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, out
}
return messageQueueResult;
}

protected MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, MessageQueueOptions messageQueueOptions, out bool success)
{
success = false;
Expand Down Expand Up @@ -291,7 +208,6 @@ protected MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage,
}
return messageQueueResult;
}

public IList<MessageQueueResult> SendMessages(IList<SimpleQueueMessage> simpleQueueMessages, MessageQueueOptions messageQueueOptions, out bool success)
{
success = false;
Expand Down Expand Up @@ -322,26 +238,74 @@ public IList<MessageQueueResult> SendMessages(IList<SimpleQueueMessage> simpleQu
}
return messageQueueResults;
}
public override string GetName()

private IList<MessageQueueResult> RemoveMessages(IList<SimpleQueueMessage> simpleQueueMessages, out bool success)
{
return Name;
IList<MessageQueueResult> messageQueueResults = new List<MessageQueueResult>();
List<string> messageHandleIds = new List<string>();
success = false;
try
{
foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages)
{
messageHandleIds.Add(simpleQueueMessage.MessageHandleId);
}
Task<DeleteMessageBatchResponse> task = Task.Run<DeleteMessageBatchResponse>(async () => await DeleteQueueMessageBatchAsync(messageHandleIds));

DeleteMessageBatchResponse deleteMessageBatchResponse = task.Result;
if (deleteMessageBatchResponse != null)
success = (deleteMessageBatchResponse.Failed.Count == 0);

foreach (BatchResultErrorEntry entry in deleteMessageBatchResponse.Failed)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

foreach (DeleteMessageBatchResultEntry entry in deleteMessageBatchResponse.Successful)
{
MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry);
messageQueueResults.Add(messageQueueResult);
}

}
catch (AggregateException ae)
{
throw ae;
}
return messageQueueResults;
}

public bool GetMessageFromException(Exception ex, SdtMessages_Message msg)
private IList<SimpleQueueMessage> RetrieveMessages(out bool success, MessageQueueOptions messageQueueOptions = null)
{
success = false;
IList<SimpleQueueMessage> simpleQueueMessages = new List<SimpleQueueMessage>();
try
{
AmazonSQSException sqs_ex = (AmazonSQSException)ex;
msg.gxTpr_Id = sqs_ex.ErrorCode;
msg.gxTpr_Description = sqs_ex.Message;
return true;
Task<ReceiveMessageResponse> task = Task.Run<ReceiveMessageResponse>(async () => await GetMessageAsync(messageQueueOptions));

ReceiveMessageResponse response = task.Result;
success = response != null;
if (success)
{
List<Message> messagesList = response.Messages;

foreach (Message message in messagesList)
{
SimpleQueueMessage simpleQueueMessage = SetupSimpleQueueMessage(message);
simpleQueueMessages.Add(simpleQueueMessage);
}
}
}
catch (Exception)
catch (AggregateException ae)
{
return false;
throw ae;
}
return simpleQueueMessages;
}
#endregion

#region Transformation methods

private MessageQueueResult SetupMessageQueueResult(SendMessageResponse response)
{
Expand Down Expand Up @@ -433,6 +397,19 @@ private SimpleQueueMessage SetupSimpleQueueMessage(Message response)
return simpleQueueMessage;
}

private MessageQueueResult SetupMessageQueueResult(SimpleQueueMessage simpleQueueMessage, string messageStatus)
{
MessageQueueResult messageQueueResult = new MessageQueueResult();
messageQueueResult.MessageId = simpleQueueMessage.MessageId;
messageQueueResult.MessageStatus = messageStatus;
messageQueueResult.ServerMessageId = simpleQueueMessage.MessageId;
messageQueueResult.MessageHandleId = simpleQueueMessage.MessageHandleId;
return messageQueueResult;
}

#endregion

#region Async methods
private async Task<SendMessageResponse> SendMessageAsync(SimpleQueueMessage simpleQueueMessage, MessageQueueOptions messageQueueOptions = null)
{
SendMessageResponse sendMessageResponse = new SendMessageResponse();
Expand Down Expand Up @@ -499,7 +476,11 @@ private async Task<SendMessageBatchResponse> SendMessageBatchAsync(IList<SimpleQ
}

requestEntry.MessageBody = simpleQueueMessage.MessageBody;
requestEntry.Id = simpleQueueMessage.MessageId;
if (!string.IsNullOrEmpty(simpleQueueMessage.MessageId))
requestEntry.Id = simpleQueueMessage.MessageId;
else
requestEntry.Id = new Guid().ToString();

requestEntry.MessageAttributes = properties;
if ((messageQueueOptions != null) && (messageQueueOptions.DelaySeconds != 0))
requestEntry.DelaySeconds = messageQueueOptions.DelaySeconds;
Expand Down Expand Up @@ -590,6 +571,21 @@ private async Task<DeleteMessageBatchResponse> DeleteQueueMessageBatchAsync(List
return deleteMessageBatchResponse;
}

private async Task<DeleteMessageResponse> DeleteQueueMessageAsync(string messageHandleId)
{
DeleteMessageResponse deleteMessageResponse = new DeleteMessageResponse();
try
{

deleteMessageResponse = await _sqsClient.DeleteMessageAsync(_queueURL, messageHandleId).ConfigureAwait(false);
}
catch (Exception ex)
{
throw (ex);
}
return deleteMessageResponse;
}

private async Task<PurgeQueueResponse> PurgeQueueAsync()
{
PurgeQueueResponse purgeQueueResponse = new PurgeQueueResponse();
Expand Down Expand Up @@ -627,6 +623,8 @@ private async Task<GetQueueAttributesResponse> GetQueueAttributeAsync(List<strin
return getQueueAttributesResponse;

}

#endregion
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ public class AzureMessageQueueProvider
{
private const string AZUREQUEUE = "AZUREQUEUE";
public SimpleMessageQueue Connect(string queueName, string queueURL, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)

{
{
MessageQueueProvider messageQueueProvider = new MessageQueueProvider();
GXProperties properties = new GXProperties();
properties.Add("QUEUE_AZUREQUEUE_QUEUENAME", queueName);
Expand Down
Loading