diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs index 0bbd6ffbe..e1259b853 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs @@ -15,10 +15,10 @@ namespace GeneXus.Messaging.GXAzureServiceBus public class AzureServiceBus : MessageBrokerBase, IMessageBroker { private const int MAX_MESSAGES_DEFAULT = 10; - static readonly ILog logger = log4net.LogManager.GetLogger(typeof(AzureServiceBus)); + private const short LOCK_DURATION = 5; public static String Name = "AZURESB"; - private ConcurrentDictionary m_messages = new ConcurrentDictionary(); + private ConcurrentDictionary> m_messages = new ConcurrentDictionary>(); ServiceBusClient _serviceBusClient { get; set; } private string _queueOrTopicName { get; set; } private string _connectionString { get; set; } @@ -29,7 +29,7 @@ public class AzureServiceBus : MessageBrokerBase, IMessageBroker private ServiceBusSessionReceiverOptions _sessionReceiverOptions { get; set; } private bool _sessionEnabled { get; set; } private string _sessionId { get; set; } - + private string receiveMode { get; set; } public AzureServiceBus() : this(null) { } @@ -49,7 +49,7 @@ private void Initialize(GXService providerService) ServiceBusReceiverOptions serviceBusReceiverOptions = new ServiceBusReceiverOptions(); _sessionReceiverOptions = new ServiceBusSessionReceiverOptions(); - string receiveMode = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVE_MODE); + receiveMode = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVE_MODE); string prefetchCount = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.PREFETCH_COUNT); string receiverIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_IDENTIFIER); ; _sessionId = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_SESSIONID); @@ -89,57 +89,55 @@ private void Initialize(GXService providerService) //TO DO Consider connection options here //https://docs.microsoft.com/en-us/javascript/api/@azure/service-bus/servicebusclientoptions?view=azure-node-latest#@azure-service-bus-servicebusclientoptions-websocketoptions - try + _serviceBusClient = new ServiceBusClient(_connectionString); + if (_serviceBusClient != null) { - _serviceBusClient = new ServiceBusClient(_connectionString); - if (_serviceBusClient != null) - { - _sender = _serviceBusClient.CreateSender(_queueOrTopicName, serviceBusSenderOptions); + _sender = _serviceBusClient.CreateSender(_queueOrTopicName, serviceBusSenderOptions); - if (_sessionEnabled && _sender != null) + if (_sessionEnabled && _sender != null) + { + if (!string.IsNullOrEmpty(_sessionId)) { - if (!string.IsNullOrEmpty(_sessionId)) + if (string.IsNullOrEmpty(_subscriptionName)) { - if (string.IsNullOrEmpty(_subscriptionName)) - { - Task task; - task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); - _sessionReceiver = task.Result; - } - else - { - Task task; - task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); - _sessionReceiver = task.Result; - } + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; } else { - if (string.IsNullOrEmpty(_subscriptionName)) - { - Task task; - task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _sessionReceiverOptions).ConfigureAwait(false)); - _sessionReceiver = task.Result; - } - else - { - Task task; - task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _subscriptionName, _sessionReceiverOptions).ConfigureAwait(false)); - _sessionReceiver = task.Result; - } + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, _sessionId, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; } } else + { + //These methods throw an exception when the service bus is empty: + //ServiceBusException: The operation did not complete within the allocated time (ServiceTimeout) + //so I remove them for now + /* if (string.IsNullOrEmpty(_subscriptionName)) - _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, serviceBusReceiverOptions); - - else - _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _subscriptionName, serviceBusReceiverOptions); + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + } + else + { + Task task; + task = Task.Run(async () => await _serviceBusClient.AcceptNextSessionAsync(_queueOrTopicName, _subscriptionName, _sessionReceiverOptions).ConfigureAwait(false)); + _sessionReceiver = task.Result; + }*/ + throw new Exception("ServiceBus: Specify a session to establish a service bus receiver for the session-enabled queue or topic."); + } } - } - catch (Exception ex) - { - GXLogging.Error(logger, ex.Message); + else + if (string.IsNullOrEmpty(_subscriptionName)) + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, serviceBusReceiverOptions); + + else + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _subscriptionName, serviceBusReceiverOptions); } } public override string GetName() @@ -153,32 +151,46 @@ private async Task ServiceClientDisposeAsync() await _serviceBusClient.DisposeAsync().ConfigureAwait(false); } private async Task sendAsync(ServiceBusMessage serviceBusMessage, string options) + { + try + { + await _sender.SendMessageAsync(serviceBusMessage).ConfigureAwait(false); + return true; + } + catch (Exception ex) + { + throw ex; + } + } + + private async Task CancelScheduleAsync(long sequenceNumber) { - SendMessageOptions sendOptions = JSONHelper.Deserialize(options); - if ((sendOptions != null) && (!string.IsNullOrEmpty(sendOptions.ScheduledEnqueueTime))) + try { - try - { - await _sender.ScheduleMessageAsync(serviceBusMessage, DateTimeOffset.Parse(sendOptions.ScheduledEnqueueTime)).ConfigureAwait(false); - return true; - } - catch (Exception ex) - { - throw ex; - } + await _sender.CancelScheduledMessageAsync(sequenceNumber).ConfigureAwait(false); + return true; } - else + catch (Exception ex) + { + throw ex; + } + } + private async Task ScheduleMessageAsync(ServiceBusMessage serviceBusMessage, string options) + { + ScheduleMessageOptions scheduleOptions = JSONHelper.Deserialize(options); + if ((serviceBusMessage != null) && (scheduleOptions != null) && (!string.IsNullOrEmpty(scheduleOptions.ScheduledEnqueueTime))) { try { - await _sender.SendMessageAsync(serviceBusMessage).ConfigureAwait(false); - return true; + return (await _sender.ScheduleMessageAsync(serviceBusMessage, DateTime.Parse(scheduleOptions.ScheduledEnqueueTime)).ConfigureAwait(false)); + } catch (Exception ex) { throw ex; } } + return 0; } private async Task SendMessagesBatchAsync(IList brokerMessages, string options) { @@ -196,32 +208,15 @@ private async Task SendMessagesBatchAsync(IList brokerMessa serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); serviceBusMessages.Add(serviceBusMessage); } - - SendMessageOptions sendOptions = JSONHelper.Deserialize(options); - if ((sendOptions != null) && (!string.IsNullOrEmpty(sendOptions.ScheduledEnqueueTime))) - { - try - { - await _sender.ScheduleMessagesAsync(serviceBusMessages, DateTimeOffset.Parse(sendOptions.ScheduledEnqueueTime)).ConfigureAwait(false); - success = true; - } - catch (Exception ex) - { - GXLogging.Error(logger, ex.Message.ToString()); - } + try + { + await _sender.SendMessagesAsync(serviceBusMessages).ConfigureAwait(false); + success = true; } - else + catch (Exception ex) { - try - { - await _sender.SendMessagesAsync(serviceBusMessages).ConfigureAwait(false); - success = true; - } - catch (Exception ex) - { - GXLogging.Error(logger, ex.Message.ToString()); - } - } + throw ex; + } } return success; } @@ -284,11 +279,9 @@ private async Task> ReceiveMessagesAsyn } catch (Exception ex) { - GXLogging.Error(logger, ex.Message.ToString()); + throw ex; } - return null; } - private async Task ReceiveMessageAsync(string options) { ReceiveMessageOptions receiveOptions = JSONHelper.Deserialize(options); @@ -343,9 +336,8 @@ private async Task ReceiveMessageAsync(string options } catch (Exception ex) { - GXLogging.Error(logger, ex.Message.ToString()); + throw ex; } - return null; } #endregion @@ -361,6 +353,7 @@ public bool SendMessage(BrokerMessage brokerMessage, string options) { task = Task.Run(async () => await sendAsync(serviceBusMessage, options)); success = task.Result; + ClearServiceBusAuxiliaryStorage(); } else { @@ -380,6 +373,7 @@ bool IMessageBroker.SendMessages(IList brokerMessages, string opt { Task task = Task.Run(async () => await SendMessagesBatchAsync(brokerMessages, options)); success = task.Result; + ClearServiceBusAuxiliaryStorage(); } catch (AggregateException ae) { @@ -396,11 +390,20 @@ IList IMessageBroker.GetMessages(string options, out bool success Task> receivedMessages = Task>.Run(async () => await ReceiveMessagesAsync(options)); if (receivedMessages != null && receivedMessages.Result != null) { + ClearServiceBusAuxiliaryStorage(); foreach (ServiceBusReceivedMessage serviceBusReceivedMessage in receivedMessages.Result) { if (serviceBusReceivedMessage != null) - if (AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) - brokerMessages.Add(SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + brokerMessages.Add(SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + + //If receive Mode = Peek Lock, save the messages to be retrieved later + if (!string.IsNullOrEmpty(receiveMode) && (Convert.ToInt16(receiveMode) == 0)) + { + if (!AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) + { + throw new Exception("Invalid operation."); + } + } } success = true; } @@ -421,6 +424,7 @@ public bool ConsumeMessage(BrokerMessage brokerMessage, string options) if ((_sessionReceiver != null) && (_sessionEnabled)) receiver = _sessionReceiver; + ClearServiceBusAuxiliaryStorage(); ServiceBusReceivedMessage serviceBusReceviedMessage = GetStoredServiceBusReceivedMessage(brokerMessage); if (serviceBusReceviedMessage != null) { @@ -438,6 +442,7 @@ public bool ConsumeMessage(BrokerMessage brokerMessage, string options) case ConsumeMessageOptions.ConsumeModeOpts.Abandon: { task = Task.Run(async () => await receiver.AbandonMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); break; } case ConsumeMessageOptions.ConsumeModeOpts.DeadLetter: @@ -449,11 +454,13 @@ public bool ConsumeMessage(BrokerMessage brokerMessage, string options) case ConsumeMessageOptions.ConsumeModeOpts.Defer: { task = Task.Run(async () => await receiver.DeferMessageAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); break; } case ConsumeMessageOptions.ConsumeModeOpts.RenewMessageLock: { task = Task.Run(async () => await receiver.RenewMessageLockAsync(serviceBusReceviedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); break; } } @@ -464,6 +471,10 @@ public bool ConsumeMessage(BrokerMessage brokerMessage, string options) throw ae; } } + else + { + throw new Exception("Invalid operation."); + } } return false; } @@ -474,6 +485,7 @@ public BrokerMessage GetMessage(string options, out bool success) success = false; try { + ClearServiceBusAuxiliaryStorage(); Task receivedMessage = Task.Run(async () => await ReceiveMessageAsync(options)); if (receivedMessage != null && receivedMessage.Result != null) { @@ -494,6 +506,54 @@ public BrokerMessage GetMessage(string options, out bool success) public void Dispose() { Task task = Task.Run(async () => await ServiceClientDisposeAsync().ConfigureAwait(false)); + m_messages.Clear(); + } + public long ScheduleMessage(BrokerMessage brokerMessage, string options) + { + long sequenceNumber = 0; + ServiceBusMessage serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + try + { + Task task; + if (_sender != null) + { + task = Task.Run(async () => await ScheduleMessageAsync(serviceBusMessage, options)); + sequenceNumber = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } + } + catch (AggregateException ae) + { + throw ae; + } + return sequenceNumber; + } + public bool CancelSchedule(long sequenceNumber) + { + bool success = false; + try + { + Task task; + if (_sender != null) + { + task = Task.Run(async () => await CancelScheduleAsync(sequenceNumber)); + success = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } + } + catch (AggregateException ae) + { + throw ae; + } + return false; } public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) { @@ -514,14 +574,18 @@ public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) #region Transformation Methods private ServiceBusMessage BrokerMessageToServiceBusMessage(BrokerMessage brokerMessage) { - ServiceBusMessage serviceBusMessage = new ServiceBusMessage(brokerMessage.MessageBody); - serviceBusMessage.MessageId = brokerMessage.MessageId; + if (brokerMessage != null) + { + ServiceBusMessage serviceBusMessage = new ServiceBusMessage(brokerMessage.MessageBody); + serviceBusMessage.MessageId = brokerMessage.MessageId; - GXProperties messageAttributes = brokerMessage.MessageAttributes; - if (messageAttributes != null) - LoadMessageProperties(messageAttributes, ref serviceBusMessage); + GXProperties messageAttributes = brokerMessage.MessageAttributes; + if (messageAttributes != null) + LoadMessageProperties(messageAttributes, ref serviceBusMessage); - return serviceBusMessage; + return serviceBusMessage; + } + return null; } private BrokerMessage SBReceivedMessageToBrokerMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) { @@ -536,11 +600,17 @@ private BrokerMessage SBReceivedMessageToBrokerMessage(ServiceBusReceivedMessage #endregion #region Data - [DataContract] - internal class SendMessageOptions + [DataContract()] + internal class ScheduleMessageOptions { - [DataMember] - internal string ScheduledEnqueueTime { get; set; } + long _cancelSequenceNumber; + string _scheduledEnqueueTime; + + [DataMember()] + internal string ScheduledEnqueueTime { get => _scheduledEnqueueTime; set => _scheduledEnqueueTime = value; } + + [DataMember()] + internal long CancelSequenceNumber { get => _cancelSequenceNumber; set => _cancelSequenceNumber = value; } } [DataContract()] @@ -598,44 +668,50 @@ internal enum ConsumeModeOpts #endregion - #region helper methods + #region Helper methods private ServiceBusReceivedMessage GetStoredServiceBusReceivedMessage(BrokerMessage message) { string messageIdentifier = GetMessageIdentifier(message); - if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage serviceBusReceivedMessage)) - return serviceBusReceivedMessage; + if (m_messages.TryGetValue(messageIdentifier, out Tuple messageStored)) + { + return messageStored.Item2; + } else return null; } private void RemoveStoredServiceBusReceivedMessage(BrokerMessage message) { string messageIdentifier = GetMessageIdentifier(message); - lock (m_messages) - { - if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage serviceBusReceivedMessage)) - { - KeyValuePair keyValuePair = new KeyValuePair(messageIdentifier, serviceBusReceivedMessage); - m_messages.TryRemove(keyValuePair); - } - } + m_messages.TryRemove(messageIdentifier, out _); } private bool AddOrUpdateStoredServiceReceivedMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) { string messageIdentifier = GetMessageIdentifierFromServiceBus(serviceBusReceivedMessage); if (!string.IsNullOrEmpty(messageIdentifier)) - lock (m_messages) - { - if (m_messages.TryGetValue(messageIdentifier, out ServiceBusReceivedMessage originalMessage)) - { - return (m_messages.TryUpdate(messageIdentifier, serviceBusReceivedMessage, originalMessage)); - } - else - return m_messages.TryAdd(messageIdentifier, serviceBusReceivedMessage); - } + { + Tuple messageStored = new Tuple(DateTime.UtcNow, serviceBusReceivedMessage); + m_messages[messageIdentifier] = messageStored; + return true; + } return false; } + private void ClearServiceBusAuxiliaryStorage() + { + //Clear all messages older than 5 minutes + //When a consumer locks a message, the broker temporarily hides it from other consumers (LockDuration). + //However, the lock on the message has a timeout, which is 5 mins maximum + + foreach (KeyValuePair> entry in m_messages) + { + if (entry.Value.Item1.AddMinutes(LOCK_DURATION) < DateTime.UtcNow) + { + m_messages.TryRemove(entry.Key, out _); + } + } + } + private string GetMessageIdentifier(BrokerMessage message) { //The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true identifier. @@ -651,17 +727,17 @@ private string GetMessageIdentifier(BrokerMessage message) //Get SessionId of the message string messageSessionId = GetMessageSessionId(message); if (!string.IsNullOrEmpty(messageSessionId)) - messageIdentifier = $"{messageSequenceNumber}{messageSessionId}"; + messageIdentifier = $"{messageSequenceNumber}_{messageSessionId}"; else { //Get PartitionKey of the message string messagePartitionKey = GetMessagePartitionKey(message); if (!string.IsNullOrEmpty(messagePartitionKey)) - messageIdentifier = $"{messageSequenceNumber}{messagePartitionKey}"; + messageIdentifier = $"{messageSequenceNumber}_{messagePartitionKey}"; else - messageIdentifier = $"{messageSequenceNumber}{message.MessageId}"; + messageIdentifier = $"{messageSequenceNumber}_{message.MessageId}"; } - return messageIdentifier.GetHashCode().ToString(); + return messageIdentifier; } private string GetMessageIdentifierFromServiceBus(ServiceBusReceivedMessage message) @@ -671,17 +747,17 @@ private string GetMessageIdentifierFromServiceBus(ServiceBusReceivedMessage mess //Get SessionId of the message string messageSessionId = message.SessionId; if (!string.IsNullOrEmpty(messageSessionId)) - messageIdentifier = $"{messageSequenceNumber}{messageSessionId}"; + messageIdentifier = $"{messageSequenceNumber}_{messageSessionId}"; else { //Get PartitionKey of the message string messagePartitionKey = message.PartitionKey; if (!string.IsNullOrEmpty(messagePartitionKey)) - messageIdentifier = $"{messageSequenceNumber}{messagePartitionKey}"; + messageIdentifier = $"{messageSequenceNumber}_{messagePartitionKey}"; else - messageIdentifier = $"{messageSequenceNumber}{message.MessageId}"; + messageIdentifier = $"{messageSequenceNumber}_{message.MessageId}"; } - return messageIdentifier.GetHashCode().ToString(); + return messageIdentifier; } private string GetMessageSequenceNumber(BrokerMessage message) { diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs index f7015d2aa..036c89652 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs @@ -9,7 +9,6 @@ public class ServiceBusMessageBrokerProvider { public MessageQueue Connect(string queueName, string connectionString, out GXBaseCollection errorMessages, out bool success) { - System.Diagnostics.Debugger.Launch(); MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); GXProperties properties = new GXProperties(); properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName); @@ -23,10 +22,9 @@ public MessageQueue Connect(string queueName, string connectionString, out GXBas public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, out GXBaseCollection errorMessages, out bool success) { - System.Diagnostics.Debugger.Launch(); MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); GXProperties properties = new GXProperties(); - properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, topicName); properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); @@ -37,10 +35,8 @@ public MessageQueue Connect(string topicName, string subcriptionName, string con } public MessageQueue Connect(string queueName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) { - System.Diagnostics.Debugger.Launch(); MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); - ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); GXProperties properties = new GXProperties(); @@ -61,12 +57,11 @@ public MessageQueue Connect(string queueName, string connectionString, bool sess public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) { - System.Diagnostics.Debugger.Launch(); MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); GXProperties properties = new GXProperties(); ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); - properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, topicName); properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString()); diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs index 30097724c..c30bebbcf 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs @@ -11,9 +11,11 @@ public interface IMessageBroker bool SendMessages(IList brokerMessages, string options); IList GetMessages(string options, out bool success); BrokerMessage GetMessage(string options, out bool success); + bool ConsumeMessage(BrokerMessage brokerMessage, string options); + long ScheduleMessage(BrokerMessage brokerMessage, string options); + bool CancelSchedule(long handleId); void Dispose(); bool GetMessageFromException(Exception ex, SdtMessages_Message msg); - bool ConsumeMessage(BrokerMessage brokerMessage, string options); } public class BrokerMessage : GxUserType { @@ -50,27 +52,4 @@ public override void ToJSON(bool includeState) #endregion } - public class BrokerMessageOptions : GxUserType - { - public short MaxNumberOfMessages { get; set; } - public bool DeleteConsumedMessages { get; set; } - public int WaitTimeout { get; set; } - public int VisibilityTimeout { get; set; } - public int TimetoLive { get; set; } - public int DelaySeconds { get; set; } - public string ReceiveRequestAttemptId { get; set; } - public bool ReceiveMessageAttributes { get; set; } - public short ReceiveMode { get; set; } - public short PrefetchCount { get; set; } - public string SubscriptionName { get; set; } - - } - - public static class BrokerMessageResultStatus - { - public const string Unknown = "Unknown"; - public const string Sent = "Sent"; - public const string Deleted = "Deleted"; - public const string Failed = "Failed"; - } } diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs index bfc0d1e25..3bc11056f 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs @@ -73,7 +73,6 @@ private static void Preprocess(String name, GXProperties properties) case Providers.AzureServiceBus: className = PropertyConstants.AZURE_SB_CLASSNAME; SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME); - SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME); SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME); SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING); if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className)) diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs index 580c3a3e1..344b8983c 100644 --- a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs @@ -19,7 +19,6 @@ public class MessageQueue static readonly ILog logger = log4net.LogManager.GetLogger(typeof(MessageQueue)); private const string SDT_MESSAGE_CLASS_NAME = @"SdtMessage"; private const string SDT_MESSAGEPROPERTY_CLASS_NAME = @"SdtMessageProperty"; - //private const string SDT_MESSAGERESULT_CLASS_NAME = @"SdtMessageResult"; private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingmessagebroker"; private const string GENEXUS_COMMON_DLL = @"GeneXus.Programs.Common.dll"; @@ -73,6 +72,48 @@ public void Dispose() } } + public long ScheduleMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + GxUserType result = new GxUserType(); + try + { + BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + return messageBroker.ScheduleMessage(brokerQueueMessage, options); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + throw ex; + } + return 0; + } + public bool CancelSchedule(long handleId, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + try + { + ValidQueue(); + return messageBroker.CancelSchedule(handleId); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + return false; + } + } + public bool ConsumeMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) { bool success = false; @@ -296,39 +337,25 @@ private GxUserType TransformBrokerMessage(BrokerMessage brokerMessage) } return null; } - private BrokerMessageOptions TransformOptions(GxUserType messageQueueOptions) - { - BrokerMessageOptions options = new BrokerMessageOptions(); - - options.MaxNumberOfMessages = messageQueueOptions.GetPropertyValue("Maxnumberofmessages"); - options.DeleteConsumedMessages = messageQueueOptions.GetPropertyValue("Deleteconsumedmessages"); - options.WaitTimeout = messageQueueOptions.GetPropertyValue("Waittimeout"); - options.VisibilityTimeout = messageQueueOptions.GetPropertyValue("Visibilitytimeout"); - options.TimetoLive = messageQueueOptions.GetPropertyValue("Timetolive"); - options.DelaySeconds = messageQueueOptions.GetPropertyValue("Delayseconds"); - options.ReceiveMode = messageQueueOptions.GetPropertyValue("Receivemode"); - options.ReceiveRequestAttemptId = messageQueueOptions.GetPropertyValue("Receiverequestattemptid"); - options.ReceiveMessageAttributes = messageQueueOptions.GetPropertyValue("Receivemessageattributes"); - options.PrefetchCount = messageQueueOptions.GetPropertyValue("Prefetchcount"); - options.SubscriptionName = messageQueueOptions.GetPropertyValue("Subscriptionname"); -; return options; - } - private BrokerMessage TransformGXUserTypeToBrokerMessage(GxUserType queueMessage) { - BrokerMessage brokerQueueMessage = new BrokerMessage(); - brokerQueueMessage.MessageId = queueMessage.GetPropertyValue("Messageid"); - brokerQueueMessage.MessageBody = queueMessage.GetPropertyValue("Messagebody"); - brokerQueueMessage.MessageHandleId = queueMessage.GetPropertyValue("Messagehandleid"); - IList messageAttributes = queueMessage.GetPropertyValue("Messageattributes_GXBaseCollection"); - brokerQueueMessage.MessageAttributes = new GXProperties(); - foreach (GxUserType messageAttribute in messageAttributes) - { - string messagePropKey = messageAttribute.GetPropertyValue("Propertykey"); - string messagePropValue = messageAttribute.GetPropertyValue("Propertyvalue"); - brokerQueueMessage.MessageAttributes.Add(messagePropKey, messagePropValue); + if (queueMessage != null) + { + BrokerMessage brokerQueueMessage = new BrokerMessage(); + brokerQueueMessage.MessageId = queueMessage.GetPropertyValue("Messageid"); + brokerQueueMessage.MessageBody = queueMessage.GetPropertyValue("Messagebody"); + brokerQueueMessage.MessageHandleId = queueMessage.GetPropertyValue("Messagehandleid"); + IList messageAttributes = queueMessage.GetPropertyValue("Messageattributes_GXBaseCollection"); + brokerQueueMessage.MessageAttributes = new GXProperties(); + foreach (GxUserType messageAttribute in messageAttributes) + { + string messagePropKey = messageAttribute.GetPropertyValue("Propertykey"); + string messagePropValue = messageAttribute.GetPropertyValue("Propertyvalue"); + brokerQueueMessage.MessageAttributes.Add(messagePropKey, messagePropValue); + } + return brokerQueueMessage; } - return brokerQueueMessage; + return null; } #endregion diff --git a/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs index d18cb12be..620ca26f7 100644 --- a/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs +++ b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs @@ -57,8 +57,6 @@ public void TestSendBatchMessagesMethod() messages.Add(brokerMessage1); messages.Add(brokerMessage2); - BrokerMessageOptions options = new BrokerMessageOptions(); - bool success = messageBroker.SendMessages(messages, String.Empty); Assert.True(success); @@ -66,7 +64,6 @@ public void TestSendBatchMessagesMethod() [SkippableFact] public void TestGetBatchMessagesMethod() { - BrokerMessageOptions options = new BrokerMessageOptions(); IList messages = messageBroker.GetMessages(String.Empty, out bool success); Assert.True(success); }