From d8d7fdbea2d059d1baffd546a85d1e6d1a13fbc5 Mon Sep 17 00:00:00 2001 From: sjuarezgx Date: Sun, 25 Sep 2022 19:08:06 -0300 Subject: [PATCH] support all options of Azure SB API --- .../GXAzureServiceBus/AzureServiceBus.cs | 746 ++++++++++++++---- .../GXAzureServiceBus.csproj | 2 +- .../ServiceBusMessageBrokerProvider.cs | 120 ++- .../GXMessageBroker/MessageBroker.cs | 51 +- .../GXMessageBroker/MessageBrokerProvider.cs | 4 +- .../Messaging/GXMessageBroker/MessageQueue.cs | 88 ++- .../GXMessageBroker/PropertyConstants.cs | 14 +- .../GXMessageBroker/ServiceSettings.cs | 5 + 8 files changed, 795 insertions(+), 235 deletions(-) diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs index 863df75d1..0bbd6ffbe 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Reflection; +using System.Runtime.Serialization; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using GeneXus.Messaging.Common; @@ -12,125 +14,358 @@ namespace GeneXus.Messaging.GXAzureServiceBus { public class AzureServiceBus : MessageBrokerBase, IMessageBroker { - + private const int MAX_MESSAGES_DEFAULT = 10; + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(AzureServiceBus)); public static String Name = "AZURESB"; + + private ConcurrentDictionary m_messages = new ConcurrentDictionary(); ServiceBusClient _serviceBusClient { get; set; } private string _queueOrTopicName { get; set; } private string _connectionString { get; set; } + private string _subscriptionName { get; set; } private ServiceBusSender _sender { get; set; } - - static readonly ILog logger = log4net.LogManager.GetLogger(typeof(AzureServiceBus)); + private ServiceBusReceiver _receiver { get; set; } + private ServiceBusSessionReceiver _sessionReceiver { get; set; } + private ServiceBusSessionReceiverOptions _sessionReceiverOptions { get; set; } + private bool _sessionEnabled { get; set; } + private string _sessionId { get; set; } public AzureServiceBus() : this(null) { } - public AzureServiceBus(GXService providerService) : base(providerService) { Initialize(providerService); } - - public void Dispose() - { - Task task = Task.Run(async () => await ServiceClientDisposeAsync()); - } - private async Task ServiceClientDisposeAsync() - { - await _serviceBusClient.DisposeAsync().ConfigureAwait(false); - } - - private async Task RemoveMessageAsync(ServiceBusReceivedMessage serviceBusReceivedMessage) - { - ServiceBusReceiver receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName); - await receiver.CompleteMessageAsync(serviceBusReceivedMessage).ConfigureAwait(false); - await receiver.DisposeAsync().ConfigureAwait(false); - } - private void Initialize(GXService providerService) { ServiceSettings serviceSettings = new(PropertyConstants.MESSAGE_BROKER, Name, providerService); _queueOrTopicName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_NAME); _connectionString = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_CONNECTION_STRING); + _subscriptionName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.TOPIC_SUBSCRIPTION); + + _sessionEnabled = Convert.ToBoolean(serviceSettings.GetEncryptedPropertyValue(PropertyConstants.SESSION_ENABLED)); + + ServiceBusReceiverOptions serviceBusReceiverOptions = new ServiceBusReceiverOptions(); + _sessionReceiverOptions = new ServiceBusSessionReceiverOptions(); + + string receiveMode = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVE_MODE); + string prefetchCount = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.PREFETCH_COUNT); + string receiverIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_IDENTIFIER); ; + _sessionId = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_SESSIONID); + + if (!string.IsNullOrEmpty(receiveMode)) + { + if (_sessionEnabled) + _sessionReceiverOptions.ReceiveMode = (ServiceBusReceiveMode)Convert.ToInt16(receiveMode); + else + serviceBusReceiverOptions.ReceiveMode = (ServiceBusReceiveMode)Convert.ToInt16(receiveMode); + } + if (!string.IsNullOrEmpty(prefetchCount)) + { + int prefetchcnt = Convert.ToInt32(prefetchCount); + if (prefetchcnt != 0) + { + if (_sessionEnabled) + _sessionReceiverOptions.PrefetchCount = prefetchcnt; + else + serviceBusReceiverOptions.PrefetchCount = prefetchcnt; + } + } + if (!string.IsNullOrEmpty(receiverIdentifier)) + { + if (_sessionEnabled) + _sessionReceiverOptions.Identifier = receiverIdentifier; + else + serviceBusReceiverOptions.Identifier = receiverIdentifier; + } + + string senderIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.SENDER_IDENTIFIER); + + ServiceBusSenderOptions serviceBusSenderOptions = new ServiceBusSenderOptions(); + if (!string.IsNullOrEmpty(senderIdentifier)) + serviceBusSenderOptions.Identifier = senderIdentifier; //TO DO Consider connection options here + //https://docs.microsoft.com/en-us/javascript/api/@azure/service-bus/servicebusclientoptions?view=azure-node-latest#@azure-service-bus-servicebusclientoptions-websocketoptions + try { _serviceBusClient = new ServiceBusClient(_connectionString); if (_serviceBusClient != null) - { - _sender = _serviceBusClient.CreateSender(_queueOrTopicName); + { + _sender = _serviceBusClient.CreateSender(_queueOrTopicName, serviceBusSenderOptions); + + if (_sessionEnabled && _sender != null) + { + if (!string.IsNullOrEmpty(_sessionId)) + { + if (string.IsNullOrEmpty(_subscriptionName)) + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + } + else + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + } + } + else + { + if (string.IsNullOrEmpty(_subscriptionName)) + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + } + else + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _subscriptionName, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + } + } + } + else + if (string.IsNullOrEmpty(_subscriptionName)) + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, serviceBusReceiverOptions); + + else + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _subscriptionName, serviceBusReceiverOptions); } } catch (Exception ex) { GXLogging.Error(logger, ex.Message); - } + } + } + public override string GetName() + { + return Name; } - public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) + #region Async methods + private async Task ServiceClientDisposeAsync() { - try + await _serviceBusClient.DisposeAsync().ConfigureAwait(false); + } + private async Task sendAsync(ServiceBusMessage serviceBusMessage, string options) + { + SendMessageOptions sendOptions = JSONHelper.Deserialize(options); + if ((sendOptions != null) && (!string.IsNullOrEmpty(sendOptions.ScheduledEnqueueTime))) { - Azure.RequestFailedException az_ex = (Azure.RequestFailedException)ex; - msg.gxTpr_Id = az_ex.ErrorCode; - msg.gxTpr_Description = az_ex.Message; - return true; + try + { + await _sender.ScheduleMessageAsync(serviceBusMessage, DateTimeOffset.Parse(sendOptions.ScheduledEnqueueTime)).ConfigureAwait(false); + return true; + } + catch (Exception ex) + { + throw ex; + } } - catch (Exception) + else { - return false; + try + { + await _sender.SendMessageAsync(serviceBusMessage).ConfigureAwait(false); + return true; + } + catch (Exception ex) + { + throw ex; + } } } - - public override string GetName() + private async Task SendMessagesBatchAsync(IList brokerMessages, string options) { - return Name; - } + bool success = false; + if (_sender == null) + { + throw new Exception("There was an error at the Message Broker initialization."); + } + else + { + ServiceBusMessage serviceBusMessage; + IList serviceBusMessages = new List(); + foreach (BrokerMessage brokerMessage in brokerMessages) + { + serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + serviceBusMessages.Add(serviceBusMessage); + } - public bool SendMessage(BrokerMessage brokerMessage) + SendMessageOptions sendOptions = JSONHelper.Deserialize(options); + if ((sendOptions != null) && (!string.IsNullOrEmpty(sendOptions.ScheduledEnqueueTime))) + { + try + { + await _sender.ScheduleMessagesAsync(serviceBusMessages, DateTimeOffset.Parse(sendOptions.ScheduledEnqueueTime)).ConfigureAwait(false); + success = true; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex.Message.ToString()); + } + } + else + { + try + { + await _sender.SendMessagesAsync(serviceBusMessages).ConfigureAwait(false); + success = true; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex.Message.ToString()); + } + } + } + return success; + } + private async Task> ReceiveMessagesAsync(string options) { - bool success = false; - ServiceBusMessage serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + ReceiveMessageOptions receiveOptions = JSONHelper.Deserialize(options); + + IReadOnlyList receivedMessages; try { - Task task; - if (_sender != null) + ServiceBusReceiverOptions serviceBusReceiverOptions = new ServiceBusReceiverOptions(); + + if ((receiveOptions != null) && (!string.IsNullOrEmpty(receiveOptions.SessionId)) && _sessionEnabled && (receiveOptions.SessionId != _sessionId)) { - task = Task.Run(async () => await sendAsync(serviceBusMessage)); - success = task.Result; + //Create new session receiver + + if (string.IsNullOrEmpty(_subscriptionName)) + { + _sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, receiveOptions.SessionId, _sessionReceiverOptions).ConfigureAwait(false); + _sessionId = receiveOptions.SessionId; + } + else + { + _sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, receiveOptions.SessionId, _sessionReceiverOptions).ConfigureAwait(false); + _sessionId = receiveOptions.SessionId; + } + } + + int maxMessagesReceive = MAX_MESSAGES_DEFAULT; + if ((receiveOptions != null) && (receiveOptions.MaxMessages != 0)) + maxMessagesReceive = receiveOptions.MaxMessages; + + TimeSpan maxWait = TimeSpan.Zero; + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + ServiceBusReceiver receiver = _receiver; + if ((_sessionReceiver != null) && (_sessionEnabled)) + receiver = _sessionReceiver; + + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + { + receivedMessages = await receiver.ReceiveDeferredMessagesAsync(receiveOptions.ReceiveDeferredSequenceNumbers).ConfigureAwait(false); } else { - throw new Exception("There was an error at the Message Broker initialization."); + + if (maxWait == TimeSpan.Zero) + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessages = await receiver.PeekMessagesAsync(maxMessages: maxMessagesReceive, receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessages = await receiver.PeekMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive, maxWaitTime: maxWait).ConfigureAwait(false); } + return receivedMessages; } - catch (AggregateException ae) + catch (Exception ex) { - throw ae; + GXLogging.Error(logger, ex.Message.ToString()); } - return success; + return null; } - private async Task sendAsync(ServiceBusMessage serviceBusMessage) + private async Task ReceiveMessageAsync(string options) { + ReceiveMessageOptions receiveOptions = JSONHelper.Deserialize(options); + ServiceBusReceivedMessage receivedMessage; + try - { - await _sender.SendMessageAsync(serviceBusMessage).ConfigureAwait(false); - return true; + { + ServiceBusReceiverOptions serviceBusReceiverOptions = new ServiceBusReceiverOptions(); + + if ((receiveOptions != null) && (!string.IsNullOrEmpty(receiveOptions.SessionId)) && _sessionEnabled && (receiveOptions.SessionId != _sessionId)) + { + //Create new session receiver + + if (string.IsNullOrEmpty(_subscriptionName)) + { + _sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, receiveOptions.SessionId, _sessionReceiverOptions).ConfigureAwait(false); + _sessionId = receiveOptions.SessionId; + } + else + { + _sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, receiveOptions.SessionId, _sessionReceiverOptions).ConfigureAwait(false); + _sessionId = receiveOptions.SessionId; + } + } + + TimeSpan maxWait = TimeSpan.Zero; + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + ServiceBusReceiver receiver = _receiver; + if ((_sessionReceiver != null) && (_sessionEnabled)) + receiver = _sessionReceiver; + + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + { + receivedMessage = await receiver.ReceiveDeferredMessageAsync(receiveOptions.ReceiveDeferredSequenceNumbers[0]).ConfigureAwait(false); + } + else + { + if (maxWait != TimeSpan.Zero) + receivedMessage = await receiver.ReceiveMessageAsync(maxWaitTime: maxWait).ConfigureAwait(false); + else + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessage = await receiver.PeekMessageAsync(receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessage = await receiver.PeekMessageAsync().ConfigureAwait(false); + else + receivedMessage = await receiver.ReceiveMessageAsync().ConfigureAwait(false); + } + return receivedMessage; } catch (Exception ex) { - return false; - throw ex; + GXLogging.Error(logger, ex.Message.ToString()); } + return null; } - bool IMessageBroker.SendMessages(IList brokerMessages, BrokerMessageOptions messageQueueOptions) + #endregion + + #region API Methods + public bool SendMessage(BrokerMessage brokerMessage, string options) { bool success = false; + ServiceBusMessage serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); try { - Task task = Task.Run(async () => await SendMessagesBatchAsync(brokerMessages)); - success = task.Result; + Task task; + if (_sender != null) + { + task = Task.Run(async () => await sendAsync(serviceBusMessage, options)); + success = task.Result; + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } } catch (AggregateException ae) { @@ -138,47 +373,34 @@ bool IMessageBroker.SendMessages(IList brokerMessages, BrokerMess } return success; } - - private async Task SendMessagesBatchAsync(IList brokerMessages) + bool IMessageBroker.SendMessages(IList brokerMessages, string options) { bool success = false; - if (_sender == null) + try { - throw new Exception("There was an error at the Message Broker initialization."); + Task task = Task.Run(async () => await SendMessagesBatchAsync(brokerMessages, options)); + success = task.Result; } - else + catch (AggregateException ae) { - ServiceBusMessage serviceBusMessage; - IList serviceBusMessages = new List(); - foreach (BrokerMessage brokerMessage in brokerMessages) - { - serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); - serviceBusMessages.Add(serviceBusMessage); - } - try - { - await _sender.SendMessagesAsync(serviceBusMessages).ConfigureAwait(false); - success = true; - } - catch (Exception ex) - { - GXLogging.Error(logger, ex.Message.ToString()); - } + throw ae; } return success; } - IList IMessageBroker.GetMessages(BrokerMessageOptions messageQueueOptions, out bool success) + IList IMessageBroker.GetMessages(string options, out bool success) { IList brokerMessages = new List(); success = false; try { - Task> receivedMessages = Task>.Run(async () => await ReceiveMessagesAsync(messageQueueOptions)); + Task> receivedMessages = Task>.Run(async () => await ReceiveMessagesAsync(options)); if (receivedMessages != null && receivedMessages.Result != null) { foreach (ServiceBusReceivedMessage serviceBusReceivedMessage in receivedMessages.Result) { - brokerMessages.Add(SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + if (serviceBusReceivedMessage != null) + if (AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) + brokerMessages.Add(SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); } success = true; } @@ -190,55 +412,106 @@ IList IMessageBroker.GetMessages(BrokerMessageOptions messageQueu return brokerMessages; } - private async Task> ReceiveMessagesAsync(BrokerMessageOptions messageQueueOptions) + public bool ConsumeMessage(BrokerMessage brokerMessage, string options) { - IReadOnlyList receivedMessages; - try + ConsumeMessageOptions consumeOptions = JSONHelper.Deserialize(options); + if (consumeOptions != null) { - ServiceBusReceiverOptions serviceBusReceiverOptions = new ServiceBusReceiverOptions(); - - if (messageQueueOptions.ReceiveMode == 1) // Valid values : PeekLock (0), ReceiveAndDelete (1) - serviceBusReceiverOptions.ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete; + ServiceBusReceiver receiver = _receiver; + if ((_sessionReceiver != null) && (_sessionEnabled)) + receiver = _sessionReceiver; - if (messageQueueOptions.PrefetchCount != 0) - serviceBusReceiverOptions.PrefetchCount = messageQueueOptions.PrefetchCount; - - ServiceBusReceiver receiver; - - if (!string.IsNullOrEmpty(messageQueueOptions.SubscriptionName)) + ServiceBusReceivedMessage serviceBusReceviedMessage = GetStoredServiceBusReceivedMessage(brokerMessage); + if (serviceBusReceviedMessage != null) { - receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, messageQueueOptions.SubscriptionName, serviceBusReceiverOptions); - } - else - { - receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, serviceBusReceiverOptions); + try + { + Task task; + switch (consumeOptions.ConsumeMode) + { + case ConsumeMessageOptions.ConsumeModeOpts.Complete: + { + task = Task.Run(async () => await receiver.CompleteMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + break; + } + case ConsumeMessageOptions.ConsumeModeOpts.Abandon: + { + task = Task.Run(async () => await receiver.AbandonMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + break; + } + case ConsumeMessageOptions.ConsumeModeOpts.DeadLetter: + { + task = Task.Run(async () => await receiver.DeadLetterMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + break; + } + case ConsumeMessageOptions.ConsumeModeOpts.Defer: + { + task = Task.Run(async () => await receiver.DeferMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + break; + } + case ConsumeMessageOptions.ConsumeModeOpts.RenewMessageLock: + { + task = Task.Run(async () => await receiver.RenewMessageLockAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + break; + } + } + return true; + } + catch (AggregateException ae) + { + throw ae; + } } + } + return false; + } - int maxMessagesReceive = 1; - TimeSpan maxWaitTimeout = TimeSpan.Zero; - if (messageQueueOptions != null && messageQueueOptions.WaitTimeout != 0) - maxWaitTimeout = TimeSpan.FromSeconds(messageQueueOptions.WaitTimeout); - - if (messageQueueOptions != null && messageQueueOptions.MaxNumberOfMessages != 0) - maxMessagesReceive = messageQueueOptions.MaxNumberOfMessages; - - if (maxWaitTimeout == TimeSpan.Zero) - receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); - else - receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive, maxWaitTime: maxWaitTimeout).ConfigureAwait(false); - - await receiver.DisposeAsync().ConfigureAwait(false); - - return receivedMessages; + public BrokerMessage GetMessage(string options, out bool success) + { + BrokerMessage brokerMessage = new BrokerMessage(); + success = false; + try + { + Task receivedMessage = Task.Run(async () => await ReceiveMessageAsync(options)); + if (receivedMessage != null && receivedMessage.Result != null) + { + ServiceBusReceivedMessage serviceBusReceivedMessage = receivedMessage.Result; + if (AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) + { + success = true; + return (SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + } + } } - catch (Exception ex) + catch (AggregateException ae) { - GXLogging.Error(logger, ex.Message.ToString()); + throw ae; + } + return brokerMessage; + } + public void Dispose() + { + Task task = Task.Run(async () => await ServiceClientDisposeAsync().ConfigureAwait(false)); + } + public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) + { + try + { + Azure.RequestFailedException az_ex = (Azure.RequestFailedException)ex; + msg.gxTpr_Id = az_ex.ErrorCode; + msg.gxTpr_Description = az_ex.Message; + return true; + } + catch (Exception) + { + return false; } - return null; } + #endregion - #region Transform Methods + #region Transformation Methods private ServiceBusMessage BrokerMessageToServiceBusMessage(BrokerMessage brokerMessage) { ServiceBusMessage serviceBusMessage = new ServiceBusMessage(brokerMessage.MessageBody); @@ -255,92 +528,221 @@ private BrokerMessage SBReceivedMessageToBrokerMessage(ServiceBusReceivedMessage BrokerMessage brokerMessage = new BrokerMessage(); brokerMessage.MessageId = serviceBusReceivedMessage.MessageId; brokerMessage.MessageBody = serviceBusReceivedMessage.Body.ToString(); - + LoadReceivedMessageProperties(serviceBusReceivedMessage, ref brokerMessage); return brokerMessage; } - private void LoadReceivedMessageProperties(ServiceBusReceivedMessage serviceBusReceivedMessage, ref BrokerMessage brokerMessage) - { - GXProperties properties = new GXProperties(); - - if (serviceBusReceivedMessage != null) - { - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.Subject)) - properties.Add("Subject", serviceBusReceivedMessage.Subject); - - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.ReplyToSessionId)) - properties.Add("ReplyToSessionId", serviceBusReceivedMessage.ReplyToSessionId); + #endregion - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.DeadLetterSource)) - properties.Add("DeadLetterSource", serviceBusReceivedMessage.DeadLetterSource); + #region Data + [DataContract] + internal class SendMessageOptions + { + [DataMember] + internal string ScheduledEnqueueTime { get; set; } + } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.ContentType)) - properties.Add("ContentType", serviceBusReceivedMessage.ContentType); + [DataContract()] + internal class ReceiveMessageOptions + { + int _maxmessages; + int _maxwaittime; + string _sessionid; + PeekReceiveOpts _peekreceiveopts; + IList _receivedeferredsequencenumbers; - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.CorrelationId)) - properties.Add("CorrelationId", serviceBusReceivedMessage.CorrelationId); + [DataMember()] + internal int MaxMessages { get => _maxmessages; set => _maxmessages = value; } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.DeadLetterErrorDescription)) - properties.Add("DeadLetterErrorDescription", serviceBusReceivedMessage.DeadLetterErrorDescription); + [DataMember()] + internal int MaxWaitTime { get => _maxwaittime; set => _maxwaittime = value; } - if (string.IsNullOrEmpty(serviceBusReceivedMessage.DeadLetterReason)) - properties.Add("DeadLetterReason", serviceBusReceivedMessage.DeadLetterReason); + [DataMember()] + internal string SessionId { get => _sessionid; set => _sessionid = value ; } - if (serviceBusReceivedMessage.DeliveryCount != 0) - properties.Add("DeliveryCount", serviceBusReceivedMessage.DeliveryCount.ToString()); + [DataMember()] + internal PeekReceiveOpts PeekReceive { get => _peekreceiveopts; set => _peekreceiveopts = value; } - if (serviceBusReceivedMessage.EnqueuedSequenceNumber != 0) - properties.Add("EnqueuedSequenceNumber", serviceBusReceivedMessage.EnqueuedSequenceNumber.ToString()); + [DataMember()] + internal IList ReceiveDeferredSequenceNumbers { get => _receivedeferredsequencenumbers; set => _receivedeferredsequencenumbers = value; } + } - properties.Add("EnqueuedTime", serviceBusReceivedMessage.EnqueuedTime.UtcDateTime.ToString()); + [DataContract()] + public class PeekReceiveOpts + { + bool _peek; + long _peekfromsequencenumber; - properties.Add("ExpiresAt", serviceBusReceivedMessage.ExpiresAt.UtcDateTime.ToString()); - properties.Add("LockedUntil", serviceBusReceivedMessage.LockedUntil.UtcDateTime.ToString()); + [DataMember()] + internal bool Peek { get => _peek; set => _peek = value; } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.LockToken)) - properties.Add("LockToken", serviceBusReceivedMessage.LockToken); + [DataMember()] + internal long PeekFromSequenceNumber { get => _peekfromsequencenumber ; set => _peekfromsequencenumber = value; } + } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.PartitionKey)) - properties.Add("PartitionKey", serviceBusReceivedMessage.PartitionKey); + [DataContract] + internal class ConsumeMessageOptions + { + [DataMember] + internal ConsumeModeOpts ConsumeMode { get; set; } + internal enum ConsumeModeOpts + { + Complete, + Abandon, + DeadLetter, + Defer, + RenewMessageLock + } + } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.ReplyTo)) - properties.Add("ReplyTo", serviceBusReceivedMessage.ReplyTo); + #endregion - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.SessionId)) - properties.Add("SessionId", serviceBusReceivedMessage.SessionId); + #region helper methods - properties.Add("TimeToLive", serviceBusReceivedMessage.TimeToLive.ToString()); + private ServiceBusReceivedMessage GetStoredServiceBusReceivedMessage(BrokerMessage message) + { + string messageIdentifier = GetMessageIdentifier(message); + if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage serviceBusReceivedMessage)) + return serviceBusReceivedMessage; + else + return null; + } + private void RemoveStoredServiceBusReceivedMessage(BrokerMessage message) + { + string messageIdentifier = GetMessageIdentifier(message); + lock (m_messages) + { + if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage serviceBusReceivedMessage)) + { + KeyValuePair keyValuePair = new KeyValuePair(messageIdentifier, serviceBusReceivedMessage); + m_messages.TryRemove(keyValuePair); + } + } + } + private bool AddOrUpdateStoredServiceReceivedMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) + { + string messageIdentifier = GetMessageIdentifierFromServiceBus(serviceBusReceivedMessage); + if (!string.IsNullOrEmpty(messageIdentifier)) + lock (m_messages) + { + if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage originalMessage)) + { + return (m_messages.TryUpdate(messageIdentifier, serviceBusReceivedMessage, originalMessage)); + } + else + return m_messages.TryAdd(messageIdentifier, serviceBusReceivedMessage); + } + return false; + } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.ReplyToSessionId)) - properties.Add("ReplyToSessionId", serviceBusReceivedMessage.ReplyToSessionId); + private string GetMessageIdentifier(BrokerMessage message) + { + //The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true identifier. + //For partitioned entities, the sequence number is issued relative to the partition. + //https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sequencing + //Follow this to identify the message + //https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#using-a-partition-key - properties.Add("ScheduledEnqueueTime", serviceBusReceivedMessage.ScheduledEnqueueTime.UtcDateTime.ToString()); - if (serviceBusReceivedMessage.SequenceNumber != 0) - properties.Add("SequenceNumber", serviceBusReceivedMessage.SequenceNumber.ToString()); + string messageSequenceNumber = GetMessageSequenceNumber(message); + string messageIdentifier = string.Empty; - properties.Add("State", serviceBusReceivedMessage.State.ToString()); + //Get SessionId of the message + string messageSessionId = GetMessageSessionId(message); + if (!string.IsNullOrEmpty(messageSessionId)) + messageIdentifier = $"{messageSequenceNumber}{messageSessionId}"; + else + { + //Get PartitionKey of the message + string messagePartitionKey = GetMessagePartitionKey(message); + if (!string.IsNullOrEmpty(messagePartitionKey)) + messageIdentifier = $"{messageSequenceNumber}{messagePartitionKey}"; + else + messageIdentifier = $"{messageSequenceNumber}{message.MessageId}"; + } + return messageIdentifier.GetHashCode().ToString(); + } - if (!string.IsNullOrEmpty(serviceBusReceivedMessage.TransactionPartitionKey)) - properties.Add("TransactionPartitionKey", serviceBusReceivedMessage.TransactionPartitionKey); + private string GetMessageIdentifierFromServiceBus(ServiceBusReceivedMessage message) + { + string messageSequenceNumber = message.SequenceNumber.ToString(); + string messageIdentifier = string.Empty; + //Get SessionId of the message + string messageSessionId = message.SessionId; + if (!string.IsNullOrEmpty(messageSessionId)) + messageIdentifier = $"{messageSequenceNumber}{messageSessionId}"; + else + { + //Get PartitionKey of the message + string messagePartitionKey = message.PartitionKey; + if (!string.IsNullOrEmpty(messagePartitionKey)) + messageIdentifier = $"{messageSequenceNumber}{messagePartitionKey}"; + else + messageIdentifier = $"{messageSequenceNumber}{message.MessageId}"; + } + return messageIdentifier.GetHashCode().ToString(); + } + private string GetMessageSequenceNumber(BrokerMessage message) + { + string sequenceNumberValue = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("SequenceNumber")) + sequenceNumberValue = message.MessageAttributes.Get("SequenceNumber"); + } + return sequenceNumberValue; + } + private string GetMessageSessionId(BrokerMessage message) + { + string messageSessionId = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("SessionId")) + messageSessionId = message.MessageAttributes.Get("SessionId"); + } + return messageSessionId; + } + private string GetMessagePartitionKey(BrokerMessage message) + { + string messagePartitionKey = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("PartitionKey")) + messagePartitionKey = message.MessageAttributes.Get("PartitionKey"); + } + return messagePartitionKey; + } + private void LoadReceivedMessageProperties(ServiceBusReceivedMessage serviceBusReceivedMessage, ref BrokerMessage brokerMessage) + { + GXProperties properties = new GXProperties(); - //Application Properties + if (serviceBusReceivedMessage != null) + { brokerMessage.MessageAttributes = new GXProperties(); Type t = serviceBusReceivedMessage.GetType(); PropertyInfo[] props = t.GetProperties(); foreach (PropertyInfo prop in props) { object value; - if (prop.GetIndexParameters().Length == 0 && serviceBusReceivedMessage != null) + if (prop.Name != "ApplicationProperties") { - value = prop.GetValue(serviceBusReceivedMessage); - if (value != null) - brokerMessage.MessageAttributes.Add(prop.Name, value.ToString()); + if (prop.GetIndexParameters().Length == 0 && serviceBusReceivedMessage != null) + { + value = prop.GetValue(serviceBusReceivedMessage); + + if (value != null) + brokerMessage.MessageAttributes.Add(prop.Name, value.ToString()); + } } } + //Application Properties + foreach (KeyValuePair o in serviceBusReceivedMessage.ApplicationProperties) + { + brokerMessage.MessageAttributes.Add(o.Key, o.Value.ToString()); + } } - brokerMessage.MessageAttributes = properties; + } private void LoadMessageProperties(GXProperties properties, ref ServiceBusMessage serviceBusMessage) { @@ -393,7 +795,9 @@ private void LoadMessageProperties(GXProperties properties, ref ServiceBusMessag } } } - #endregion } } + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj index e7274a6f5..fded4a3b6 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj @@ -7,7 +7,7 @@ - + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs index 6fadcf7dc..401009749 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs @@ -1,3 +1,5 @@ +using System; +using System.Collections; using GeneXus.Messaging.Common; using GeneXus.Utils; @@ -5,16 +7,128 @@ namespace GeneXus.Messaging.GXAzureServiceBus { public class ServiceBusMessageBrokerProvider { - public MessageQueue Connect(string queueName, string queueConnection, out GXBaseCollection errorMessages, out bool success) + public MessageQueue Connect(string queueName, string connectionString, out GXBaseCollection errorMessages, out bool success) { + System.Diagnostics.Debugger.Launch(); MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); GXProperties properties = new GXProperties(); - properties.Add(PropertyConstants. MESSAGEBROKER_AZURESB_QUEUENAME, queueName); - properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUECONNECTION, queueConnection); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, out GXBaseCollection errorMessages, out bool success) + { + System.Diagnostics.Debugger.Launch(); + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + GXProperties properties = new GXProperties(); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); errorMessages = errorMessagesConnect; success = successConnect; return messageQueue; } + public MessageQueue Connect(string queueName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) + { + System.Diagnostics.Debugger.Launch(); + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + + ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); + + GXProperties properties = new GXProperties(); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString()); + properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString()); + properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString()); + properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier); + properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId); + properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) + { + System.Diagnostics.Debugger.Launch(); + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + GXProperties properties = new GXProperties(); + ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); + + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString()); + properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString()); + properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString()); + properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier); + properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId); + properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + #region Transformation methods + private ReceiverOptions TransformGXUserTypeToReceiverOptions(GxUserType options) + { + ReceiverOptions receiverOptions = new ReceiverOptions(); + if (options != null) + { + receiverOptions.ReceiveMode = options.GetPropertyValue("Receivemode"); + receiverOptions.Identifier = options.GetPropertyValue("Identifier"); + receiverOptions.PrefetchCount = options.GetPropertyValue("Prefetchcount"); + receiverOptions.SessionId = options.GetPropertyValue("Sessionid"); + } + return receiverOptions; + } + #endregion + public class ReceiverOptions : GxUserType + { + public short ReceiveMode { get; set; } + public short PrefetchCount { get; set; } + public string Identifier { get; set; } + public string SessionId { get; set; } + + #region Json + private static Hashtable mapper; + public override String JsonMap(String value) + { + if (mapper == null) + { + mapper = new Hashtable(); + } + return (String)mapper[value]; ; + } + + public override void ToJSON() + { + ToJSON(true); + return; + } + + public override void ToJSON(bool includeState) + { + AddObjectProperty("ReceiveMode", ReceiveMode, false); + AddObjectProperty("PrefetchCount", PrefetchCount, false); + AddObjectProperty("Identifier", Identifier, false); + AddObjectProperty("SessionId", Identifier, false); + return; + } + + #endregion + } } } diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs index f86bb79ca..30097724c 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs @@ -7,11 +7,13 @@ namespace GeneXus.Messaging.Common { public interface IMessageBroker { - bool SendMessage(BrokerMessage brokerMessage); - bool SendMessages(IList brokerMessages, BrokerMessageOptions messageQueueOptions); - IList GetMessages(BrokerMessageOptions messageQueueOptions, out bool success); + bool SendMessage(BrokerMessage brokerMessage, string options); + bool SendMessages(IList brokerMessages, string options); + IList GetMessages(string options, out bool success); + BrokerMessage GetMessage(string options, out bool success); void Dispose(); bool GetMessageFromException(Exception ex, SdtMessages_Message msg); + bool ConsumeMessage(BrokerMessage brokerMessage, string options); } public class BrokerMessage : GxUserType { @@ -48,45 +50,6 @@ public override void ToJSON(bool includeState) #endregion } - - public class BrokerMessageResult : GxUserType - { - public string MessageId { get; set; } - public string ServerMessageId { get; set; } - public GXProperties MessageAttributes { get; set; } - public string MessageHandleId { get; set; } - public string MessageStatus { get; set; } = "Unknown"; - - #region Json - private static Hashtable mapper; - public override String JsonMap(String value) - { - if (mapper == null) - { - mapper = new Hashtable(); - } - return (String)mapper[value]; ; - } - - public override void ToJSON() - { - ToJSON(true); - return; - } - - public override void ToJSON(bool includeState) - { - AddObjectProperty("MessageId", MessageId, false); - AddObjectProperty("ServerMessageId", ServerMessageId, false); - AddObjectProperty("MessageHandleId", MessageHandleId, false); - AddObjectProperty("MessageStatus", MessageStatus, false); - - return; - } - - #endregion - } - public class BrokerMessageOptions : GxUserType { public short MaxNumberOfMessages { get; set; } @@ -97,8 +60,8 @@ public class BrokerMessageOptions : GxUserType public int DelaySeconds { get; set; } public string ReceiveRequestAttemptId { get; set; } public bool ReceiveMessageAttributes { get; set; } - public int ReceiveMode { get; set; } - public int PrefetchCount { get; set; } + public short ReceiveMode { get; set; } + public short PrefetchCount { get; set; } public string SubscriptionName { get; set; } } diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs index e890964d0..bfc0d1e25 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs @@ -73,7 +73,9 @@ private static void Preprocess(String name, GXProperties properties) case Providers.AzureServiceBus: className = PropertyConstants.AZURE_SB_CLASSNAME; SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME); - SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUECONNECTION); + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME); + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME); + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING); if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className)) { providerService.ClassName = PropertyConstants.AZURE_SB_PROVIDER_CLASSNAME; diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs index 8345212b8..580c3a3e1 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs @@ -20,7 +20,7 @@ public class MessageQueue private const string SDT_MESSAGE_CLASS_NAME = @"SdtMessage"; private const string SDT_MESSAGEPROPERTY_CLASS_NAME = @"SdtMessageProperty"; //private const string SDT_MESSAGERESULT_CLASS_NAME = @"SdtMessageResult"; - private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingqueue.MessageBroker"; + private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingmessagebroker"; private const string GENEXUS_COMMON_DLL = @"GeneXus.Programs.Common.dll"; public MessageQueue() @@ -73,15 +73,76 @@ public void Dispose() } } - public IList GetMessages(GxUserType messageQueueOptions, out GXBaseCollection errorMessages, out bool success) + public bool ConsumeMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) + { + bool success = false; + errorMessages = new GXBaseCollection(); + GxUserType result = new GxUserType(); + try + { + BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + return (messageBroker.ConsumeMessage(brokerQueueMessage, options)); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + success = false; + GXLogging.Error(logger, ex); + throw ex; + } + return success; + } + public GxUserType GetMessage(string options, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + GxUserType receivedMessage = new GxUserType(); + success = false; + try + { + try + { + ValidQueue(); + BrokerMessage brokerMessage = messageBroker.GetMessage(options, out success); + LoadAssemblyIfRequired(); + + if (TransformBrokerMessage(brokerMessage) is GxUserType result) + { + success = true; + return result; + } + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + success = false; + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + throw ex; + } + return receivedMessage; + } + public IList GetMessages(string options, out GXBaseCollection errorMessages, out bool success) { errorMessages = new GXBaseCollection(); IList resultMessages = new List(); success = false; try { - BrokerMessageOptions options = TransformOptions(messageQueueOptions); - try { ValidQueue(); @@ -111,7 +172,7 @@ public IList GetMessages(GxUserType messageQueueOptions, out GXBaseC return resultMessages; } - public bool SendMessage(GxUserType messageQueue, out GXBaseCollection errorMessages) + public bool SendMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) { bool success = false; errorMessages = new GXBaseCollection(); @@ -123,7 +184,7 @@ public bool SendMessage(GxUserType messageQueue, out GXBaseCollection errorMessages) + public bool SendMessages(IList queueMessages, string options, out GXBaseCollection errorMessages) { errorMessages = new GXBaseCollection(); bool success = false; try { - BrokerMessageOptions options = TransformOptions(messageQueueOptions); IList brokerMessagesList = new List(); foreach (GxUserType queueMessage in queueMessages) { @@ -239,15 +299,19 @@ private GxUserType TransformBrokerMessage(BrokerMessage brokerMessage) private BrokerMessageOptions TransformOptions(GxUserType messageQueueOptions) { BrokerMessageOptions options = new BrokerMessageOptions(); - //TO DO Check the valid options, ex - //https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.createmessagebatchoptions.maxsizeinbytes?view=azure-dotnet#azure-messaging-servicebus-createmessagebatchoptions-maxsizeinbytes - + options.MaxNumberOfMessages = messageQueueOptions.GetPropertyValue("Maxnumberofmessages"); options.DeleteConsumedMessages = messageQueueOptions.GetPropertyValue("Deleteconsumedmessages"); options.WaitTimeout = messageQueueOptions.GetPropertyValue("Waittimeout"); options.VisibilityTimeout = messageQueueOptions.GetPropertyValue("Visibilitytimeout"); options.TimetoLive = messageQueueOptions.GetPropertyValue("Timetolive"); - return options; + options.DelaySeconds = messageQueueOptions.GetPropertyValue("Delayseconds"); + options.ReceiveMode = messageQueueOptions.GetPropertyValue("Receivemode"); + options.ReceiveRequestAttemptId = messageQueueOptions.GetPropertyValue("Receiverequestattemptid"); + options.ReceiveMessageAttributes = messageQueueOptions.GetPropertyValue("Receivemessageattributes"); + options.PrefetchCount = messageQueueOptions.GetPropertyValue("Prefetchcount"); + options.SubscriptionName = messageQueueOptions.GetPropertyValue("Subscriptionname"); +; return options; } private BrokerMessage TransformGXUserTypeToBrokerMessage(GxUserType queueMessage) diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs index 0234167d2..9c5ed2930 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs @@ -2,16 +2,24 @@ namespace GeneXus.Messaging.Common { public static class PropertyConstants { - // Azure Service Bus // + //Azure Service Bus internal const string AZURE_SB_CLASSNAME = "GeneXus.Messaging.GXAzureServiceBus.AzureServiceBus"; internal const string AZURE_SB_PROVIDER_CLASSNAME = "GeneXus.Messaging.GXAzureServiceBus.AzureServiceBus, GXAzureServiceBus, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"; public const string AZURESERVICEBUS = "AZURESERVICEBUS"; - + public const string RECEIVE_MODE = "RECEIVE_MODE"; + public const string PREFETCH_COUNT = "PREFETCH_COUNT"; + public const string RECEIVER_IDENTIFIER = "RECEIVER_IDENTIFIER"; + public const string RECEIVER_SESSIONID = "RECEIVER_SESSIONID"; + public const string SENDER_IDENTIFIER = "SENDER_IDENTIFIER"; public const string MESSAGEBROKER_AZURESB_QUEUENAME = "MESSAGEBROKER_AZURESB_QUEUENAME"; - public const string MESSAGEBROKER_AZURESB_QUEUECONNECTION = "MESSAGEBROKER_AZURESB_QUEUECONNECTION"; + public const string MESSAGEBROKER_AZURESB_TOPICNAME = "MESSAGEBROKER_AZURESB_TOPICNAME"; + public const string MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME = "MESSAGEBROKER_AZURESB_SUBSCRIPTION"; + public const string MESSAGEBROKER_AZURESB_CONNECTIONSTRING = "MESSAGEBROKER_AZURESB_QUEUECONNECTION"; public const string QUEUE_NAME = "QUEUENAME"; public const string QUEUE_CONNECTION_STRING = "QUEUECONNECTION"; + public const string TOPIC_SUBSCRIPTION = "SUBSCRIPTION"; public const string MESSAGE_BROKER = "MESSAGEBROKER"; + public const string SESSION_ENABLED = "SESSION_ENABLED"; } } diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs index 8ed48053d..2d920710c 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs @@ -25,6 +25,11 @@ public ServiceSettings(string serviceNameResolver, string name, GXService gXServ this.service = gXService; } + public string GetEncryptedOptPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetEncryptedPropertyValue(propertyName, alternativePropertyName, null); + return value; + } public string GetEncryptedPropertyValue(string propertyName, string alternativePropertyName = null) { String value = GetEncryptedPropertyValue(propertyName, alternativePropertyName, null);