Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Data.Common;
using Aspire.Azure.Common;

using Azure.Core;
Expand Down Expand Up @@ -89,13 +90,49 @@ void IConnectionStringSettings.ParseConnectionString(string? connectionString)
if (!connectionString.Contains(';'))
{
FullyQualifiedNamespace = connectionString;
return;
}
else

var connectionBuilder = new DbConnectionStringBuilder()
{
ConnectionString = connectionString
};

if (connectionBuilder.TryGetValue("ConsumerGroup", out var group))
{
ConnectionString = connectionString;
SetConsumerGroup(group.ToString());

// remove ConsumerGroup from the connection builder since it isn't a connection property
// in regular Event Hubs connection strings
connectionBuilder.Remove("ConsumerGroup");
}

var hasEntityPath = connectionBuilder.TryGetValue("EntityPath", out var entityPath);
if (hasEntityPath)
{
// don't strip off EntityPath from the connection string because
// it is a valid connection property in regular Event Hubs connection strings
EventHubName = entityPath!.ToString();
}

if (connectionBuilder.Count == 1 ||
(connectionBuilder.Count == 2 && hasEntityPath))
{
if (connectionBuilder.TryGetValue("Endpoint", out var endpoint))
{
// if all that's left is Endpoint or Endpoint+EntityPath,
// it is a fully qualified namespace
FullyQualifiedNamespace = endpoint.ToString();
return;
}
}

// if we got here, it's a full connection string
ConnectionString = connectionString;
}
}

internal virtual void SetConsumerGroup(string? consumerGroup) { }
}

/// <summary>
Expand All @@ -117,6 +154,11 @@ public sealed class AzureMessagingEventHubsConsumerSettings : AzureMessagingEven
/// Gets or sets the name of the consumer group.
/// </summary>
public string? ConsumerGroup { get; set; }

internal override void SetConsumerGroup(string? consumerGroup)
{
ConsumerGroup = consumerGroup;
}
}

/// <summary>
Expand Down Expand Up @@ -145,6 +187,11 @@ public sealed class AzureMessagingEventHubsProcessorSettings : AzureMessagingEve
/// If a container is provided in the connection string, it will override this value and the container will be assumed to exist.
/// </summary>
public string? BlobContainerName { get; set; }

internal override void SetConsumerGroup(string? consumerGroup)
{
ConsumerGroup = consumerGroup;
}
}

/// <summary>
Expand All @@ -166,5 +213,10 @@ public sealed class AzureMessagingEventHubsPartitionReceiverSettings : AzureMess
/// Gets or sets the event position to start from in the bound partition. Defaults to <see cref="EventPosition.Earliest" />.
/// </summary>
public EventPosition EventPosition { get; set; } = EventPosition.Earliest;

internal override void SetConsumerGroup(string? consumerGroup)
{
ConsumerGroup = consumerGroup;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,7 @@ public void ProcessorBlobContainerNameDefaultsCorrectly()
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "MyHub"),
]);

var mockTransport = new MockTransport(
CreateResponse("""{}"""));
var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport });
builder.Services.AddSingleton(blobClient);
var mockTransport = InjectMockBlobClient(builder);

builder.AddAzureEventProcessorClient("eh1");

Expand All @@ -613,6 +610,15 @@ public void ProcessorBlobContainerNameDefaultsCorrectly()
Assert.Equal("https://fake.blob.core.windows.net/127-0-0-1-MyHub-default?restype=container", mockTransport.Requests[0].Uri.ToString());
}

private static MockTransport InjectMockBlobClient(HostApplicationBuilder builder)
{
var mockTransport = new MockTransport(
CreateResponse("""{}"""));
var blobClient = new BlobServiceClient(new Uri(BlobsConnectionString), new BlobClientOptions() { Transport = mockTransport });
builder.Services.AddSingleton(blobClient);
return mockTransport;
}

private static MockResponse CreateResponse(string content)
{
var buffer = Encoding.UTF8.GetBytes(content);
Expand All @@ -626,4 +632,137 @@ private static MockResponse CreateResponse(string content)

return response;
}

[Theory]
[MemberData(nameof(ConnectionString_MemberData))]
public void AddAzureCosmosClient_EnsuresConnectionStringIsCorrect(EventHubTestConnectionInfo testInfo)
{
var builder = Host.CreateEmptyApplicationBuilder(null);

builder.Configuration.AddInMemoryCollection([
new KeyValuePair<string, string?>("ConnectionStrings:eh1", testInfo.TestConnectionString),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubProducerClient:EventHubName", "NotInConnectionInfo"),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubConsumerClient:EventHubName", "NotInConnectionInfo"),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventProcessorClient:EventHubName", "NotInConnectionInfo"),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:EventHubName", "NotInConnectionInfo"),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:PartitionReceiver:PartitionId", "foo"),
new KeyValuePair<string, string?>("Aspire:Azure:Messaging:EventHubs:EventHubBufferedProducerClient:EventHubName", "NotInConnectionInfo"),
]);

var expectedEventHubName = testInfo.EventHubName ?? "NotInConnectionInfo";

var settingsCalled = 0;
void VerifySettings(AzureMessagingEventHubsSettings settings)
{
settingsCalled++;

Assert.Equal(testInfo.ConnectionString, settings.ConnectionString);
Assert.Equal(testInfo.FullyQualifiedNamespace, settings.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, settings.EventHubName);

var consumerGroupProperty = settings.GetType().GetProperty("ConsumerGroup");
if (consumerGroupProperty != null)
{
Assert.Equal(testInfo.ConsumerGroup, consumerGroupProperty.GetValue(settings));
}
}

InjectMockBlobClient(builder);

builder.AddAzureEventHubProducerClient("eh1", VerifySettings);
builder.AddAzureEventHubConsumerClient("eh1", VerifySettings);
builder.AddAzureEventProcessorClient("eh1", VerifySettings);
builder.AddAzurePartitionReceiverClient("eh1", VerifySettings);
builder.AddAzureEventHubBufferedProducerClient("eh1", VerifySettings);

Assert.Equal(5, settingsCalled);

using var app = builder.Build();

var producerClient = app.Services.GetRequiredService<EventHubProducerClient>();
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, producerClient.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, producerClient.EventHubName);

var consumerClient = app.Services.GetRequiredService<EventHubConsumerClient>();
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, consumerClient.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, consumerClient.EventHubName);

var processorClient = app.Services.GetRequiredService<EventProcessorClient>();
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, processorClient.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, processorClient.EventHubName);

var partitionReceiver = app.Services.GetRequiredService<PartitionReceiver>();
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, partitionReceiver.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, partitionReceiver.EventHubName);

var bufferedProducerClient = app.Services.GetRequiredService<EventHubBufferedProducerClient>();
Assert.Equal(testInfo.ClientFullyQualifiedNamespace, bufferedProducerClient.FullyQualifiedNamespace);
Assert.Equal(expectedEventHubName, bufferedProducerClient.EventHubName);
}

public static TheoryData<EventHubTestConnectionInfo> ConnectionString_MemberData()
{
return new()
{
new EventHubTestConnectionInfo()
{
TestConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
ClientFullyQualifiedNamespace = "localhost"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub",
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath=myhub",
EventHubName = "myhub",
ClientFullyQualifiedNamespace = "localhost"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub",
ConnectionString = "Endpoint=sb://localhost:55184;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;ConsumerGroup=mygroup;EntityPath=myhub",
EventHubName = "myhub",
ConsumerGroup = "mygroup",
ClientFullyQualifiedNamespace = "localhost"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub;ConsumerGroup=mygroup",
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
EventHubName = "myhub",
ConsumerGroup = "mygroup",
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;EntityPath=myhub",
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
EventHubName = "myhub",
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="Endpoint=https://eventhubns-cetg3lr.servicebus.windows.net:443/;ConsumerGroup=mygroup",
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
ConsumerGroup = "mygroup",
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
},
new EventHubTestConnectionInfo()
{
TestConnectionString ="https://eventhubns-cetg3lr.servicebus.windows.net:443/",
FullyQualifiedNamespace = "https://eventhubns-cetg3lr.servicebus.windows.net:443/",
ClientFullyQualifiedNamespace = "eventhubns-cetg3lr.servicebus.windows.net"
}
};
}

public record EventHubTestConnectionInfo
{
public required string TestConnectionString { get; set; }
public string? FullyQualifiedNamespace { get; set; }
public string? ConnectionString { get; set; }
public string? EventHubName { get; set; }
public string? ConsumerGroup { get; set; }
public string? ClientFullyQualifiedNamespace { get; set; }
}
}
Loading