Skip to content

Commit

Permalink
support all options of Azure SB API
Browse files Browse the repository at this point in the history
  • Loading branch information
sjuarezgx committed Sep 25, 2022
1 parent 6acc8eb commit d8d7fdb
Show file tree
Hide file tree
Showing 8 changed files with 795 additions and 235 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.8.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.10.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,134 @@
using System;
using System.Collections;
using GeneXus.Messaging.Common;
using GeneXus.Utils;

namespace GeneXus.Messaging.GXAzureServiceBus
{
public class ServiceBusMessageBrokerProvider
{
public MessageQueue Connect(string queueName, string queueConnection, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
public MessageQueue Connect(string queueName, string connectionString, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
System.Diagnostics.Debugger.Launch();
MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider();
GXProperties properties = new GXProperties();
properties.Add(PropertyConstants. MESSAGEBROKER_AZURESB_QUEUENAME, queueName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUECONNECTION, queueConnection);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString);

MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return messageQueue;
}

public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
System.Diagnostics.Debugger.Launch();
MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider();
GXProperties properties = new GXProperties();
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString);

MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return messageQueue;
}
public MessageQueue Connect(string queueName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
System.Diagnostics.Debugger.Launch();
MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider();

ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions);

GXProperties properties = new GXProperties();
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString);
properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString());
properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString());
properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString());
properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier);
properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId);
properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier);

MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return messageQueue;
}

public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
System.Diagnostics.Debugger.Launch();
MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider();
GXProperties properties = new GXProperties();
ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions);

properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME, topicName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName);
properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString);
properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString());
properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString());
properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString());
properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier);
properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId);
properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier);

MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection<SdtMessages_Message> errorMessagesConnect, out bool successConnect);
errorMessages = errorMessagesConnect;
success = successConnect;
return messageQueue;
}

#region Transformation methods
private ReceiverOptions TransformGXUserTypeToReceiverOptions(GxUserType options)
{
ReceiverOptions receiverOptions = new ReceiverOptions();
if (options != null)
{
receiverOptions.ReceiveMode = options.GetPropertyValue<short>("Receivemode");
receiverOptions.Identifier = options.GetPropertyValue<string>("Identifier");
receiverOptions.PrefetchCount = options.GetPropertyValue<short>("Prefetchcount");
receiverOptions.SessionId = options.GetPropertyValue<string>("Sessionid");
}
return receiverOptions;
}
#endregion
public class ReceiverOptions : GxUserType
{
public short ReceiveMode { get; set; }
public short PrefetchCount { get; set; }
public string Identifier { get; set; }
public string SessionId { get; set; }

#region Json
private static Hashtable mapper;
public override String JsonMap(String value)
{
if (mapper == null)
{
mapper = new Hashtable();
}
return (String)mapper[value]; ;
}

public override void ToJSON()
{
ToJSON(true);
return;
}

public override void ToJSON(bool includeState)
{
AddObjectProperty("ReceiveMode", ReceiveMode, false);
AddObjectProperty("PrefetchCount", PrefetchCount, false);
AddObjectProperty("Identifier", Identifier, false);
AddObjectProperty("SessionId", Identifier, false);
return;
}

#endregion
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ namespace GeneXus.Messaging.Common
{
public interface IMessageBroker
{
bool SendMessage(BrokerMessage brokerMessage);
bool SendMessages(IList<BrokerMessage> brokerMessages, BrokerMessageOptions messageQueueOptions);
IList<BrokerMessage> GetMessages(BrokerMessageOptions messageQueueOptions, out bool success);
bool SendMessage(BrokerMessage brokerMessage, string options);
bool SendMessages(IList<BrokerMessage> brokerMessages, string options);
IList<BrokerMessage> GetMessages(string options, out bool success);
BrokerMessage GetMessage(string options, out bool success);
void Dispose();
bool GetMessageFromException(Exception ex, SdtMessages_Message msg);
bool ConsumeMessage(BrokerMessage brokerMessage, string options);
}
public class BrokerMessage : GxUserType
{
Expand Down Expand Up @@ -48,45 +50,6 @@ public override void ToJSON(bool includeState)
#endregion

}

public class BrokerMessageResult : GxUserType
{
public string MessageId { get; set; }
public string ServerMessageId { get; set; }
public GXProperties MessageAttributes { get; set; }
public string MessageHandleId { get; set; }
public string MessageStatus { get; set; } = "Unknown";

#region Json
private static Hashtable mapper;
public override String JsonMap(String value)
{
if (mapper == null)
{
mapper = new Hashtable();
}
return (String)mapper[value]; ;
}

public override void ToJSON()
{
ToJSON(true);
return;
}

public override void ToJSON(bool includeState)
{
AddObjectProperty("MessageId", MessageId, false);
AddObjectProperty("ServerMessageId", ServerMessageId, false);
AddObjectProperty("MessageHandleId", MessageHandleId, false);
AddObjectProperty("MessageStatus", MessageStatus, false);

return;
}

#endregion
}

public class BrokerMessageOptions : GxUserType
{
public short MaxNumberOfMessages { get; set; }
Expand All @@ -97,8 +60,8 @@ public class BrokerMessageOptions : GxUserType
public int DelaySeconds { get; set; }
public string ReceiveRequestAttemptId { get; set; }
public bool ReceiveMessageAttributes { get; set; }
public int ReceiveMode { get; set; }
public int PrefetchCount { get; set; }
public short ReceiveMode { get; set; }
public short PrefetchCount { get; set; }
public string SubscriptionName { get; set; }

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ private static void Preprocess(String name, GXProperties properties)
case Providers.AzureServiceBus:
className = PropertyConstants.AZURE_SB_CLASSNAME;
SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME);
SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUECONNECTION);
SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_TOPICNAME);
SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME);
SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING);
if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className))
{
providerService.ClassName = PropertyConstants.AZURE_SB_PROVIDER_CLASSNAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class MessageQueue
private const string SDT_MESSAGE_CLASS_NAME = @"SdtMessage";
private const string SDT_MESSAGEPROPERTY_CLASS_NAME = @"SdtMessageProperty";
//private const string SDT_MESSAGERESULT_CLASS_NAME = @"SdtMessageResult";
private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingqueue.MessageBroker";
private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingmessagebroker";
private const string GENEXUS_COMMON_DLL = @"GeneXus.Programs.Common.dll";

public MessageQueue()
Expand Down Expand Up @@ -73,15 +73,76 @@ public void Dispose()
}
}

public IList<GxUserType> GetMessages(GxUserType messageQueueOptions, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
public bool ConsumeMessage(GxUserType messageQueue, string options, out GXBaseCollection<SdtMessages_Message> errorMessages)
{
bool success = false;
errorMessages = new GXBaseCollection<SdtMessages_Message>();
GxUserType result = new GxUserType();
try
{
BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue);
LoadAssemblyIfRequired();
try
{
ValidQueue();
return (messageBroker.ConsumeMessage(brokerQueueMessage, options));
}
catch (Exception ex)
{
QueueErrorMessagesSetup(ex, out errorMessages);
success = false;
GXLogging.Error(logger, ex);
}
}
catch (Exception ex)
{
success = false;
GXLogging.Error(logger, ex);
throw ex;
}
return success;
}
public GxUserType GetMessage(string options, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
errorMessages = new GXBaseCollection<SdtMessages_Message>();
GxUserType receivedMessage = new GxUserType();
success = false;
try
{
try
{
ValidQueue();
BrokerMessage brokerMessage = messageBroker.GetMessage(options, out success);
LoadAssemblyIfRequired();

if (TransformBrokerMessage(brokerMessage) is GxUserType result)
{
success = true;
return result;
}
}
catch (Exception ex)
{
QueueErrorMessagesSetup(ex, out errorMessages);
GXLogging.Error(logger, ex);
success = false;
}
}
catch (Exception ex)
{
GXLogging.Error(logger, ex);
success = false;
throw ex;
}
return receivedMessage;
}
public IList<GxUserType> GetMessages(string options, out GXBaseCollection<SdtMessages_Message> errorMessages, out bool success)
{
errorMessages = new GXBaseCollection<SdtMessages_Message>();
IList<GxUserType> resultMessages = new List<GxUserType>();
success = false;
try
{
BrokerMessageOptions options = TransformOptions(messageQueueOptions);

try
{
ValidQueue();
Expand Down Expand Up @@ -111,7 +172,7 @@ public IList<GxUserType> GetMessages(GxUserType messageQueueOptions, out GXBaseC

return resultMessages;
}
public bool SendMessage(GxUserType messageQueue, out GXBaseCollection<SdtMessages_Message> errorMessages)
public bool SendMessage(GxUserType messageQueue, string options, out GXBaseCollection<SdtMessages_Message> errorMessages)
{
bool success = false;
errorMessages = new GXBaseCollection<SdtMessages_Message>();
Expand All @@ -123,7 +184,7 @@ public bool SendMessage(GxUserType messageQueue, out GXBaseCollection<SdtMessage
try
{
ValidQueue();
return(messageBroker.SendMessage(brokerQueueMessage));
return(messageBroker.SendMessage(brokerQueueMessage, options));
}
catch (Exception ex)
{
Expand All @@ -141,13 +202,12 @@ public bool SendMessage(GxUserType messageQueue, out GXBaseCollection<SdtMessage
return success;
}

public bool SendMessages(IList queueMessages, GxUserType messageQueueOptions, out GXBaseCollection<SdtMessages_Message> errorMessages)
public bool SendMessages(IList queueMessages, string options, out GXBaseCollection<SdtMessages_Message> errorMessages)
{
errorMessages = new GXBaseCollection<SdtMessages_Message>();
bool success = false;
try
{
BrokerMessageOptions options = TransformOptions(messageQueueOptions);
IList<BrokerMessage> brokerMessagesList = new List<BrokerMessage>();
foreach (GxUserType queueMessage in queueMessages)
{
Expand Down Expand Up @@ -239,15 +299,19 @@ private GxUserType TransformBrokerMessage(BrokerMessage brokerMessage)
private BrokerMessageOptions TransformOptions(GxUserType messageQueueOptions)
{
BrokerMessageOptions options = new BrokerMessageOptions();
//TO DO Check the valid options, ex
//https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.createmessagebatchoptions.maxsizeinbytes?view=azure-dotnet#azure-messaging-servicebus-createmessagebatchoptions-maxsizeinbytes


options.MaxNumberOfMessages = messageQueueOptions.GetPropertyValue<short>("Maxnumberofmessages");
options.DeleteConsumedMessages = messageQueueOptions.GetPropertyValue<bool>("Deleteconsumedmessages");
options.WaitTimeout = messageQueueOptions.GetPropertyValue<int>("Waittimeout");
options.VisibilityTimeout = messageQueueOptions.GetPropertyValue<int>("Visibilitytimeout");
options.TimetoLive = messageQueueOptions.GetPropertyValue<int>("Timetolive");
return options;
options.DelaySeconds = messageQueueOptions.GetPropertyValue<int>("Delayseconds");
options.ReceiveMode = messageQueueOptions.GetPropertyValue<short>("Receivemode");
options.ReceiveRequestAttemptId = messageQueueOptions.GetPropertyValue<string>("Receiverequestattemptid");
options.ReceiveMessageAttributes = messageQueueOptions.GetPropertyValue<bool>("Receivemessageattributes");
options.PrefetchCount = messageQueueOptions.GetPropertyValue<short>("Prefetchcount");
options.SubscriptionName = messageQueueOptions.GetPropertyValue<string>("Subscriptionname");
; return options;
}

private BrokerMessage TransformGXUserTypeToBrokerMessage(GxUserType queueMessage)
Expand Down
Loading

0 comments on commit d8d7fdb

Please sign in to comment.