diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md index 3134bd99dd14a..3e923594d36d7 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md @@ -8,6 +8,10 @@ - The web proxy specified in configuration is now respected. +#### New Features + +- Added support for specifying `accountName` or `blobServiceUri` for the checkpoint connection. + ## 5.0.0-beta.4 (2021-04-06) ### Changes diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs index 21b117ed0b043..1365777efff3a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs @@ -22,6 +22,7 @@ internal class EventHubClientFactory private readonly AzureComponentFactory _componentFactory; private readonly EventHubOptions _options; private readonly INameResolver _nameResolver; + private readonly CheckpointClientProvider _checkpointClientProvider; private readonly ConcurrentDictionary _producerCache; private readonly ConcurrentDictionary _consumerCache = new(); @@ -30,13 +31,15 @@ public EventHubClientFactory( AzureComponentFactory componentFactory, IOptions options, INameResolver nameResolver, - AzureEventSourceLogForwarder forwarder) + AzureEventSourceLogForwarder forwarder, + CheckpointClientProvider checkpointClientProvider) { forwarder.Start(); _configuration = configuration; _componentFactory = componentFactory; _options = options.Value; _nameResolver = nameResolver; + _checkpointClientProvider = checkpointClientProvider; _producerCache = new ConcurrentDictionary(); } @@ -166,11 +169,7 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, internal BlobContainerClient GetCheckpointStoreClient() { - var section = _configuration.GetWebJobsConnectionStringSection(ConnectionStringNames.Storage); - var options = _componentFactory.CreateClientOptions(typeof(BlobClientOptions), null, section); - var credential = _componentFactory.CreateTokenCredential(section); - var client = (BlobServiceClient)_componentFactory.CreateClient(typeof(BlobServiceClient), section, credential, options); - + var client = _checkpointClientProvider.Get(ConnectionStringNames.Storage); return client.GetBlobContainerClient(_options.CheckpointContainer); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs index 8a27ec5e74538..b7fe55087b1a5 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs @@ -7,6 +7,7 @@ using Azure.Messaging.EventHubs.Consumer; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.EventHubs; +using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -83,6 +84,7 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action< builder.Services.AddAzureClientsCore(); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.Configure(configure); builder.Services.PostConfigure(ConfigureInitialOffsetOptions); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index fff04f0c1224c..af85b957bf11d 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -26,6 +26,7 @@ + True True diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/CheckpointClientProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/CheckpointClientProvider.cs new file mode 100644 index 0000000000000..965cc827ee636 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/CheckpointClientProvider.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Azure.Storage.Blobs; +using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.EventHubs.Processor +{ + internal class CheckpointClientProvider : StorageClientProvider + { + public CheckpointClientProvider(IConfiguration configuration, AzureComponentFactory componentFactory, AzureEventSourceLogForwarder logForwarder, ILogger logger) + : base(configuration, componentFactory, logForwarder, logger) { } + + /// + protected override string ServiceUriSubDomain => "blob"; + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 8e50a2692e4a7..e06f64fda67a5 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -198,6 +198,42 @@ await AssertCanSendReceiveMessage(host => }))); } + [Test] + public async Task CanSendAndReceive_BlobServiceUri_InConfiguration() + { + await AssertCanSendReceiveMessage(host => + host.ConfigureAppConfiguration(configurationBuilder => + configurationBuilder.AddInMemoryCollection(new Dictionary() + { + {"TestConnection:fullyQualifiedNamespace", EventHubsTestEnvironment.Instance.FullyQualifiedNamespace}, + {"TestConnection:clientId", EventHubsTestEnvironment.Instance.ClientId}, + {"TestConnection:clientSecret", EventHubsTestEnvironment.Instance.ClientSecret}, + {"TestConnection:tenantId", EventHubsTestEnvironment.Instance.TenantId}, + {"AzureWebJobsStorage:blobServiceUri", GetServiceUri()}, + {"AzureWebJobsStorage:clientId", EventHubsTestEnvironment.Instance.ClientId}, + {"AzureWebJobsStorage:clientSecret", EventHubsTestEnvironment.Instance.ClientSecret}, + {"AzureWebJobsStorage:tenantId", EventHubsTestEnvironment.Instance.TenantId}, + }))); + } + + [Test] + public async Task CanSendAndReceive_AccountName_InConfiguration() + { + await AssertCanSendReceiveMessage(host => + host.ConfigureAppConfiguration(configurationBuilder => + configurationBuilder.AddInMemoryCollection(new Dictionary() + { + {"TestConnection:fullyQualifiedNamespace", EventHubsTestEnvironment.Instance.FullyQualifiedNamespace}, + {"TestConnection:clientId", EventHubsTestEnvironment.Instance.ClientId}, + {"TestConnection:clientSecret", EventHubsTestEnvironment.Instance.ClientSecret}, + {"TestConnection:tenantId", EventHubsTestEnvironment.Instance.TenantId}, + {"AzureWebJobsStorage:accountName", StorageTestEnvironment.Instance.StorageAccountName}, + {"AzureWebJobsStorage:clientId", EventHubsTestEnvironment.Instance.ClientId}, + {"AzureWebJobsStorage:clientSecret", EventHubsTestEnvironment.Instance.ClientSecret}, + {"AzureWebJobsStorage:tenantId", EventHubsTestEnvironment.Instance.TenantId}, + }))); + } + [Test] public void ThrowsIfBindingToASingleEvent() { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs index 4caaf1027dafb..5b12d2e419237 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs @@ -15,6 +15,7 @@ using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Moq; @@ -35,7 +36,7 @@ public void GetEventHubClient_AddsConnection(string expectedPathName, string con EventHubOptions options = new EventHubOptions(); var configuration = CreateConfiguration(new KeyValuePair("connection", connectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var client = factory.GetEventHubProducerClient(expectedPathName, "connection"); Assert.AreEqual(expectedPathName, client.EventHubName); @@ -47,7 +48,7 @@ public void CreatesClientsFromConfigWithConnectionString() EventHubOptions options = new EventHubOptions(); var configuration = CreateConfiguration(new KeyValuePair("connection", ConnectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var producer = factory.GetEventHubProducerClient("k1", "connection"); var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); var host = factory.GetEventProcessorHost("k1", "connection", null); @@ -72,7 +73,7 @@ public void CreatesClientsFromConfigWithFullyQualifiedNamespace() var configuration = CreateConfiguration(new KeyValuePair("connection:fullyQualifiedNamespace", "test89123-ns-x.servicebus.windows.net")); - var factory = new EventHubClientFactory(configuration, componentFactoryMock.Object, Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options, componentFactoryMock.Object); var producer = factory.GetEventHubProducerClient("k1", "connection"); var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); var host = factory.GetEventProcessorHost("k1", "connection", null); @@ -92,7 +93,7 @@ public void ConsumersAndProducersAreCached() EventHubOptions options = new EventHubOptions(); var configuration = CreateConfiguration(new KeyValuePair("connection", ConnectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var producer = factory.GetEventHubProducerClient("k1", "connection"); var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); var producer2 = factory.GetEventHubProducerClient("k1", "connection"); @@ -116,7 +117,7 @@ public void UsesDefaultConnectionToStorageAccount() null, null)) .Returns(new BlobServiceClient(configuration["AzureWebJobsStorage"])); - var factory = new EventHubClientFactory(configuration, factoryMock.Object, Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options, factoryMock.Object); var client = factory.GetCheckpointStoreClient(); Assert.AreEqual("azure-webjobs-eventhub", client.Name); @@ -138,7 +139,7 @@ public void RespectsConnectionOptionsForProducer(string expectedPathName, string }; var configuration = CreateConfiguration(new KeyValuePair("connection", connectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var producer = factory.GetEventHubProducerClient(expectedPathName, "connection"); EventHubConnection connection = (EventHubConnection)typeof(EventHubProducerClient).GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance) @@ -168,7 +169,7 @@ public void RespectsConnectionOptionsForConsumer(string expectedPathName, string }; var configuration = CreateConfiguration(new KeyValuePair("connection", connectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var consumer = factory.GetEventHubConsumerClient(expectedPathName, "connection", "consumer"); var consumerClient = (EventHubConsumerClient)typeof(EventHubConsumerClientImpl) @@ -212,7 +213,7 @@ public void RespectsConnectionOptionsForProcessor(string expectedPathName, strin }; var configuration = CreateConfiguration(new KeyValuePair("connection", connectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var processor = factory.GetEventProcessorHost(expectedPathName, "connection", "consumer"); EventProcessorOptions processorOptions = (EventProcessorOptions)typeof(EventProcessor) @@ -231,7 +232,7 @@ public void DefaultStrategyIsGreedy() EventHubOptions options = new EventHubOptions(); var configuration = CreateConfiguration(new KeyValuePair("connection", ConnectionString)); - var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration), new AzureEventSourceLogForwarder(new NullLoggerFactory())); + var factory = CreateFactory(configuration, options); var processor = factory.GetEventProcessorHost("connection", "connection", "consumer"); EventProcessorOptions processorOptions = (EventProcessorOptions)typeof(EventProcessor) @@ -241,6 +242,20 @@ public void DefaultStrategyIsGreedy() Assert.AreEqual(LoadBalancingStrategy.Greedy, processorOptions.LoadBalancingStrategy); } + private static EventHubClientFactory CreateFactory(IConfiguration configuration, EventHubOptions options, AzureComponentFactory componentFactory = null) + { + componentFactory ??= Mock.Of(); + var loggerFactory = new NullLoggerFactory(); + var azureEventSourceLogForwarder = new AzureEventSourceLogForwarder(loggerFactory); + return new EventHubClientFactory( + configuration, + componentFactory, + Options.Create(options), + new DefaultNameResolver(configuration), + azureEventSourceLogForwarder, + new CheckpointClientProvider(configuration, componentFactory, azureEventSourceLogForwarder, loggerFactory.CreateLogger())); + } + private IConfiguration CreateConfiguration(params KeyValuePair[] data) { return new ConfigurationBuilder().AddInMemoryCollection(data).Build();