Skip to content

Commit 05f988f

Browse files
eerhardtdavidfowl
andauthored
Allow EventHub connection strings to work with EntityPath and ConsumerGroup keys (#7453)
* Allow EventHub connection strings to work with EntityPath and ConsumerGroup keys With #7438, EventHub Hub and Consumer Groups are Aspire Resources now, which means they can be referenced via WithReference. In the future, the EventHubs Host integration will append these values to the connection string. This will allow the hub and consumer group names to be specified outside of the application. (Today they are hard-coded in both, or specified separately in config.) When they flow through the connection string, the EventHubName and ConsumerGroup properties are populated on the settings object. Contributes to #7407 * Apply suggestions from code review --------- Co-authored-by: David Fowler <davidfowl@gmail.com>
1 parent 28b47e3 commit 05f988f

File tree

2 files changed

+197
-6
lines changed

2 files changed

+197
-6
lines changed

src/Components/Aspire.Azure.Messaging.EventHubs/AzureMessagingEventHubsSettings.cs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Data.Common;
45
using Aspire.Azure.Common;
56

67
using Azure.Core;
@@ -89,13 +90,49 @@ void IConnectionStringSettings.ParseConnectionString(string? connectionString)
8990
if (!connectionString.Contains(';'))
9091
{
9192
FullyQualifiedNamespace = connectionString;
93+
return;
9294
}
93-
else
95+
96+
var connectionBuilder = new DbConnectionStringBuilder()
97+
{
98+
ConnectionString = connectionString
99+
};
100+
101+
if (connectionBuilder.TryGetValue("ConsumerGroup", out var group))
94102
{
95-
ConnectionString = connectionString;
103+
SetConsumerGroup(group.ToString());
104+
105+
// remove ConsumerGroup from the connection builder since it isn't a connection property
106+
// in regular Event Hubs connection strings
107+
connectionBuilder.Remove("ConsumerGroup");
96108
}
109+
110+
var hasEntityPath = connectionBuilder.TryGetValue("EntityPath", out var entityPath);
111+
if (hasEntityPath)
112+
{
113+
// don't strip off EntityPath from the connection string because
114+
// it is a valid connection property in regular Event Hubs connection strings
115+
EventHubName = entityPath!.ToString();
116+
}
117+
118+
if (connectionBuilder.Count == 1 ||
119+
(connectionBuilder.Count == 2 && hasEntityPath))
120+
{
121+
if (connectionBuilder.TryGetValue("Endpoint", out var endpoint))
122+
{
123+
// if all that's left is Endpoint or Endpoint+EntityPath,
124+
// it is a fully qualified namespace
125+
FullyQualifiedNamespace = endpoint.ToString();
126+
return;
127+
}
128+
}
129+
130+
// if we got here, it's a full connection string
131+
ConnectionString = connectionString;
97132
}
98133
}
134+
135+
internal virtual void SetConsumerGroup(string? consumerGroup) { }
99136
}
100137

101138
/// <summary>
@@ -117,6 +154,11 @@ public sealed class AzureMessagingEventHubsConsumerSettings : AzureMessagingEven
117154
/// Gets or sets the name of the consumer group.
118155
/// </summary>
119156
public string? ConsumerGroup { get; set; }
157+
158+
internal override void SetConsumerGroup(string? consumerGroup)
159+
{
160+
ConsumerGroup = consumerGroup;
161+
}
120162
}
121163

122164
/// <summary>
@@ -145,6 +187,11 @@ public sealed class AzureMessagingEventHubsProcessorSettings : AzureMessagingEve
145187
/// If a container is provided in the connection string, it will override this value and the container will be assumed to exist.
146188
/// </summary>
147189
public string? BlobContainerName { get; set; }
190+
191+
internal override void SetConsumerGroup(string? consumerGroup)
192+
{
193+
ConsumerGroup = consumerGroup;
194+
}
148195
}
149196

150197
/// <summary>
@@ -166,5 +213,10 @@ public sealed class AzureMessagingEventHubsPartitionReceiverSettings : AzureMess
166213
/// Gets or sets the event position to start from in the bound partition. Defaults to <see cref="EventPosition.Earliest" />.
167214
/// </summary>
168215
public EventPosition EventPosition { get; set; } = EventPosition.Earliest;
216+
217+
internal override void SetConsumerGroup(string? consumerGroup)
218+
{
219+
ConsumerGroup = consumerGroup;
220+
}
169221
}
170222

tests/Aspire.Azure.Messaging.EventHubs.Tests/AspireEventHubsExtensionsTests.cs

Lines changed: 143 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -596,10 +596,7 @@ public void ProcessorBlobContainerNameDefaultsCorrectly()
596596
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "MyHub"),
597597
]);
598598

599-
var mockTransport = new MockTransport(
600-
CreateResponse("""{}"""));
601-
var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport });
602-
builder.Services.AddSingleton(blobClient);
599+
var mockTransport = InjectMockBlobClient(builder);
603600

604601
builder.AddAzureEventProcessorClient("eh1");
605602

@@ -613,6 +610,15 @@ public void ProcessorBlobContainerNameDefaultsCorrectly()
613610
Assert.Equal("https://fake.blob.core.windows.net/127-0-0-1-MyHub-default?restype=container", mockTransport.Requests[0].Uri.ToString());
614611
}
615612

613+
private static MockTransport InjectMockBlobClient(HostApplicationBuilder builder)
614+
{
615+
var mockTransport = new MockTransport(
616+
CreateResponse("""{}"""));
617+
var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport });
618+
builder.Services.AddSingleton(blobClient);
619+
return mockTransport;
620+
}
621+
616622
private static MockResponse CreateResponse(string content)
617623
{
618624
var buffer = Encoding.UTF8.GetBytes(content);
@@ -626,4 +632,137 @@ private static MockResponse CreateResponse(string content)
626632

627633
return response;
628634
}
635+
636+
[Theory]
637+
[MemberData(nameof(ConnectionString_MemberData))]
638+
public void AddAzureCosmosClient_EnsuresConnectionStringIsCorrect(EventHubTestConnectionInfo testInfo)
639+
{
640+
var builder = Host.CreateEmptyApplicationBuilder(null);
641+
642+
builder.Configuration.AddInMemoryCollection([
643+
new KeyValuePair<string, string?>("ConnectionStrings:eh1", testInfo.TestConnectionString),
644+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubProducerClient:EventHubName", "NotInConnectionInfo"),
645+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubConsumerClient:EventHubName", "NotInConnectionInfo"),
646+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "NotInConnectionInfo"),
647+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:EventHubName", "NotInConnectionInfo"),
648+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:PartitionId", "foo"),
649+
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubBufferedProducerClient:EventHubName", "NotInConnectionInfo"),
650+
]);
651+
652+
var expectedEventHubName = testInfo.EventHubName ?? "NotInConnectionInfo";
653+
654+
var settingsCalled = 0;
655+
void VerifySettings(AzureMessagingEventHubsSettings settings)
656+
{
657+
settingsCalled++;
658+
659+
Assert.Equal(testInfo.ConnectionString, settings.ConnectionString);
660+
Assert.Equal(testInfo.FullyQualifiedNamespace, settings.FullyQualifiedNamespace);
661+
Assert.Equal(expectedEventHubName, settings.EventHubName);
662+
663+
var consumerGroupProperty = settings.GetType().GetProperty("ConsumerGroup");
664+
if (consumerGroupProperty != null)
665+
{
666+
Assert.Equal(testInfo.ConsumerGroup, consumerGroupProperty.GetValue(settings));
667+
}
668+
}
669+
670+
InjectMockBlobClient(builder);
671+
672+
builder.AddAzureEventHubProducerClient("eh1", VerifySettings);
673+
builder.AddAzureEventHubConsumerClient("eh1", VerifySettings);
674+
builder.AddAzureEventProcessorClient("eh1", VerifySettings);
675+
builder.AddAzurePartitionReceiverClient("eh1", VerifySettings);
676+
builder.AddAzureEventHubBufferedProducerClient("eh1", VerifySettings);
677+
678+
Assert.Equal(5, settingsCalled);
679+
680+
using var app = builder.Build();
681+
682+
var producerClient = app.Services.GetRequiredService<EventHubProducerClient>();
683+
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, producerClient.FullyQualifiedNamespace);
684+
Assert.Equal(expectedEventHubName, producerClient.EventHubName);
685+
686+
var consumerClient = app.Services.GetRequiredService<EventHubConsumerClient>();
687+
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, consumerClient.FullyQualifiedNamespace);
688+
Assert.Equal(expectedEventHubName, consumerClient.EventHubName);
689+
690+
var processorClient = app.Services.GetRequiredService<EventProcessorClient>();
691+
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, processorClient.FullyQualifiedNamespace);
692+
Assert.Equal(expectedEventHubName, processorClient.EventHubName);
693+
694+
var partitionReceiver = app.Services.GetRequiredService<PartitionReceiver>();
695+
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, partitionReceiver.FullyQualifiedNamespace);
696+
Assert.Equal(expectedEventHubName, partitionReceiver.EventHubName);
697+
698+
var bufferedProducerClient = app.Services.GetRequiredService<EventHubBufferedProducerClient>();
699+
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, bufferedProducerClient.FullyQualifiedNamespace);
700+
Assert.Equal(expectedEventHubName, bufferedProducerClient.EventHubName);
701+
}
702+
703+
public static TheoryData<EventHubTestConnectionInfo> ConnectionString_MemberData()
704+
{
705+
return new()
706+
{
707+
new EventHubTestConnectionInfo()
708+
{
709+
TestConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
710+
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
711+
ClientFullyQualifiedNamespace = "localhost"
712+
},
713+
new EventHubTestConnectionInfo()
714+
{
715+
TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub",
716+
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub",
717+
EventHubName = "myhub",
718+
ClientFullyQualifiedNamespace = "localhost"
719+
},
720+
new EventHubTestConnectionInfo()
721+
{
722+
TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub",
723+
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub",
724+
EventHubName = "myhub",
725+
ConsumerGroup = "mygroup",
726+
ClientFullyQualifiedNamespace = "localhost"
727+
},
728+
new EventHubTestConnectionInfo()
729+
{
730+
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub;ConsumerGroup=mygroup",
731+
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
732+
EventHubName = "myhub",
733+
ConsumerGroup = "mygroup",
734+
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
735+
},
736+
new EventHubTestConnectionInfo()
737+
{
738+
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub",
739+
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
740+
EventHubName = "myhub",
741+
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
742+
},
743+
new EventHubTestConnectionInfo()
744+
{
745+
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;ConsumerGroup=mygroup",
746+
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
747+
ConsumerGroup = "mygroup",
748+
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
749+
},
750+
new EventHubTestConnectionInfo()
751+
{
752+
TestConnectionString ="https://eventhubns-cetg3lr.servicebus.windows.net:443/",
753+
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
754+
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
755+
}
756+
};
757+
}
758+
759+
public record EventHubTestConnectionInfo
760+
{
761+
public required string TestConnectionString { get; set; }
762+
public string? FullyQualifiedNamespace { get; set; }
763+
public string? ConnectionString { get; set; }
764+
public string? EventHubName { get; set; }
765+
public string? ConsumerGroup { get; set; }
766+
public string? ClientFullyQualifiedNamespace { get; set; }
767+
}
629768
}

0 commit comments

Comments
 (0)