diff --git a/src/Components/Aspire.Azure.Messaging.EventHubs/AzureMessagingEventHubsSettings.cs b/src/Components/Aspire.Azure.Messaging.EventHubs/AzureMessagingEventHubsSettings.cs index c8da4824cbb..3424d8c38a5 100644 --- a/src/Components/Aspire.Azure.Messaging.EventHubs/AzureMessagingEventHubsSettings.cs +++ b/src/Components/Aspire.Azure.Messaging.EventHubs/AzureMessagingEventHubsSettings.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Data.Common; using Aspire.Azure.Common; using Azure.Core; @@ -89,13 +90,49 @@ void IConnectionStringSettings.ParseConnectionString(string? connectionString) if (!connectionString.Contains(';')) { FullyQualifiedNamespace = connectionString; + return; } - else + + var connectionBuilder = new DbConnectionStringBuilder() + { + ConnectionString = connectionString + }; + + if (connectionBuilder.TryGetValue("ConsumerGroup", out var group)) { - ConnectionString = connectionString; + SetConsumerGroup(group.ToString()); + + // remove ConsumerGroup from the connection builder since it isn't a connection property + // in regular Event Hubs connection strings + connectionBuilder.Remove("ConsumerGroup"); } + + var hasEntityPath = connectionBuilder.TryGetValue("EntityPath", out var entityPath); + if (hasEntityPath) + { + // don't strip off EntityPath from the connection string because + // it is a valid connection property in regular Event Hubs connection strings + EventHubName = entityPath!.ToString(); + } + + if (connectionBuilder.Count == 1 || + (connectionBuilder.Count == 2 && hasEntityPath)) + { + if (connectionBuilder.TryGetValue("Endpoint", out var endpoint)) + { + // if all that's left is Endpoint or Endpoint+EntityPath, + // it is a fully qualified namespace + FullyQualifiedNamespace = endpoint.ToString(); + return; + } + } + + // if we got here, it's a full connection string + ConnectionString = connectionString; } } + + internal virtual void SetConsumerGroup(string? consumerGroup) { } } /// @@ -117,6 +154,11 @@ public sealed class AzureMessagingEventHubsConsumerSettings : AzureMessagingEven /// Gets or sets the name of the consumer group. /// public string? ConsumerGroup { get; set; } + + internal override void SetConsumerGroup(string? consumerGroup) + { + ConsumerGroup = consumerGroup; + } } /// @@ -145,6 +187,11 @@ public sealed class AzureMessagingEventHubsProcessorSettings : AzureMessagingEve /// If a container is provided in the connection string, it will override this value and the container will be assumed to exist. /// public string? BlobContainerName { get; set; } + + internal override void SetConsumerGroup(string? consumerGroup) + { + ConsumerGroup = consumerGroup; + } } /// @@ -166,5 +213,10 @@ public sealed class AzureMessagingEventHubsPartitionReceiverSettings : AzureMess /// Gets or sets the event position to start from in the bound partition. Defaults to . /// public EventPosition EventPosition { get; set; } = EventPosition.Earliest; + + internal override void SetConsumerGroup(string? consumerGroup) + { + ConsumerGroup = consumerGroup; + } } diff --git a/tests/Aspire.Azure.Messaging.EventHubs.Tests/AspireEventHubsExtensionsTests.cs b/tests/Aspire.Azure.Messaging.EventHubs.Tests/AspireEventHubsExtensionsTests.cs index 08272a039b3..d4e7b19276b 100644 --- a/tests/Aspire.Azure.Messaging.EventHubs.Tests/AspireEventHubsExtensionsTests.cs +++ b/tests/Aspire.Azure.Messaging.EventHubs.Tests/AspireEventHubsExtensionsTests.cs @@ -596,10 +596,7 @@ public void ProcessorBlobContainerNameDefaultsCorrectly() new KeyValuePair("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "MyHub"), ]); - var mockTransport = new MockTransport( - CreateResponse("""{}""")); - var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport }); - builder.Services.AddSingleton(blobClient); + var mockTransport = InjectMockBlobClient(builder); builder.AddAzureEventProcessorClient("eh1"); @@ -613,6 +610,15 @@ public void ProcessorBlobContainerNameDefaultsCorrectly() Assert.Equal("https://fake.blob.core.windows.net/127-0-0-1-MyHub-default?restype=container", mockTransport.Requests[0].Uri.ToString()); } + private static MockTransport InjectMockBlobClient(HostApplicationBuilder builder) + { + var mockTransport = new MockTransport( + CreateResponse("""{}""")); + var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport }); + builder.Services.AddSingleton(blobClient); + return mockTransport; + } + private static MockResponse CreateResponse(string content) { var buffer = Encoding.UTF8.GetBytes(content); @@ -626,4 +632,137 @@ private static MockResponse CreateResponse(string content) return response; } + + [Theory] + [MemberData(nameof(ConnectionString_MemberData))] + public void AddAzureCosmosClient_EnsuresConnectionStringIsCorrect(EventHubTestConnectionInfo testInfo) + { + var builder = Host.CreateEmptyApplicationBuilder(null); + + builder.Configuration.AddInMemoryCollection([ + new KeyValuePair("ConnectionStrings:eh1", testInfo.TestConnectionString), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:EventHubProducerClient:EventHubName", "NotInConnectionInfo"), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:EventHubConsumerClient:EventHubName", "NotInConnectionInfo"), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "NotInConnectionInfo"), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:EventHubName", "NotInConnectionInfo"), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:PartitionId", "foo"), + new KeyValuePair("Aspire:Azure:Messaging:EventHubs:EventHubBufferedProducerClient:EventHubName", "NotInConnectionInfo"), + ]); + + var expectedEventHubName = testInfo.EventHubName ?? "NotInConnectionInfo"; + + var settingsCalled = 0; + void VerifySettings(AzureMessagingEventHubsSettings settings) + { + settingsCalled++; + + Assert.Equal(testInfo.ConnectionString, settings.ConnectionString); + Assert.Equal(testInfo.FullyQualifiedNamespace, settings.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, settings.EventHubName); + + var consumerGroupProperty = settings.GetType().GetProperty("ConsumerGroup"); + if (consumerGroupProperty != null) + { + Assert.Equal(testInfo.ConsumerGroup, consumerGroupProperty.GetValue(settings)); + } + } + + InjectMockBlobClient(builder); + + builder.AddAzureEventHubProducerClient("eh1", VerifySettings); + builder.AddAzureEventHubConsumerClient("eh1", VerifySettings); + builder.AddAzureEventProcessorClient("eh1", VerifySettings); + builder.AddAzurePartitionReceiverClient("eh1", VerifySettings); + builder.AddAzureEventHubBufferedProducerClient("eh1", VerifySettings); + + Assert.Equal(5, settingsCalled); + + using var app = builder.Build(); + + var producerClient = app.Services.GetRequiredService(); + Assert.Equal(testInfo.ClientFullyQualifiedNamespace, producerClient.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, producerClient.EventHubName); + + var consumerClient = app.Services.GetRequiredService(); + Assert.Equal(testInfo.ClientFullyQualifiedNamespace, consumerClient.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, consumerClient.EventHubName); + + var processorClient = app.Services.GetRequiredService(); + Assert.Equal(testInfo.ClientFullyQualifiedNamespace, processorClient.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, processorClient.EventHubName); + + var partitionReceiver = app.Services.GetRequiredService(); + Assert.Equal(testInfo.ClientFullyQualifiedNamespace, partitionReceiver.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, partitionReceiver.EventHubName); + + var bufferedProducerClient = app.Services.GetRequiredService(); + Assert.Equal(testInfo.ClientFullyQualifiedNamespace, bufferedProducerClient.FullyQualifiedNamespace); + Assert.Equal(expectedEventHubName, bufferedProducerClient.EventHubName); + } + + public static TheoryData ConnectionString_MemberData() + { + return new() + { + new EventHubTestConnectionInfo() + { + TestConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;", + ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;", + ClientFullyQualifiedNamespace = "localhost" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub", + ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub", + EventHubName = "myhub", + ClientFullyQualifiedNamespace = "localhost" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub", + ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub", + EventHubName = "myhub", + ConsumerGroup = "mygroup", + ClientFullyQualifiedNamespace = "localhost" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub;ConsumerGroup=mygroup", + FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/", + EventHubName = "myhub", + ConsumerGroup = "mygroup", + ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub", + FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/", + EventHubName = "myhub", + ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;ConsumerGroup=mygroup", + FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/", + ConsumerGroup = "mygroup", + ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net" + }, + new EventHubTestConnectionInfo() + { + TestConnectionString ="https://eventhubns-cetg3lr.servicebus.windows.net:443/", + FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/", + ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net" + } + }; + } + + public record EventHubTestConnectionInfo + { + public required string TestConnectionString { get; set; } + public string? FullyQualifiedNamespace { get; set; } + public string? ConnectionString { get; set; } + public string? EventHubName { get; set; } + public string? ConsumerGroup { get; set; } + public string? ClientFullyQualifiedNamespace { get; set; } + } }