diff --git a/src/Microsoft.Azure.WebJobs.ServiceBus/MessagingProvider.cs b/src/Microsoft.Azure.WebJobs.ServiceBus/MessagingProvider.cs index be02f794f..f563dcb99 100644 --- a/src/Microsoft.Azure.WebJobs.ServiceBus/MessagingProvider.cs +++ b/src/Microsoft.Azure.WebJobs.ServiceBus/MessagingProvider.cs @@ -2,8 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Concurrent; using System.Globalization; -using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; @@ -18,6 +18,11 @@ public class MessagingProvider { private readonly ServiceBusConfiguration _config; + /// + /// Cache of instances by connection string. + /// + private readonly ConcurrentDictionary _messagingFactoryCache = new ConcurrentDictionary(); + /// /// Constructs a new instance. /// @@ -51,6 +56,7 @@ public virtual NamespaceManager CreateNamespaceManager(string connectionStringNa /// Optional connection string name indicating the connection string to use. /// If null, the default connection string on the will be used. /// A . + /// For performance reasons, factories are cached per connection string. public virtual MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null) { if (string.IsNullOrEmpty(entityPath)) @@ -60,7 +66,15 @@ public virtual MessagingFactory CreateMessagingFactory(string entityPath, string string connectionString = GetConnectionString(connectionStringName); - return MessagingFactory.CreateFromConnectionString(connectionString); + // We cache messaging factories per connection, in accordance with ServiceBus + // performance guidelines. + // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements + var messagingFactory = _messagingFactoryCache.GetOrAdd(connectionString, (c) => + { + return MessagingFactory.CreateFromConnectionString(c); + }); + + return messagingFactory; } /// diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/ServiceBusEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/ServiceBusEndToEndTests.cs index 2bea1c0e3..b5db3ee84 100644 --- a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/ServiceBusEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/ServiceBusEndToEndTests.cs @@ -2,9 +2,11 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host.Listeners; @@ -22,6 +24,7 @@ public class ServiceBusEndToEndTests private const int SBTimeout = 60 * 1000; private const string QueueNamePrefix = PrefixForAll + "queue-"; private const string StartQueueName = QueueNamePrefix + "start"; + private const string BinderQueueName = QueueNamePrefix + "binder"; private const string TopicName = PrefixForAll + "topic"; @@ -137,6 +140,24 @@ await TestHelpers.Await(() => } } + [Fact] + public async Task ServiceBusBinderTest() + { + var hostType = typeof(ServiceBusTestJobs); + var host = CreateHost(hostType); + var method = typeof(ServiceBusTestJobs).GetMethod("ServiceBusBinderTest"); + + int numMessages = 10; + var args = new { message = "Test Message", numMessages = numMessages }; + await host.CallAsync(method, args); + await host.CallAsync(method, args); + await host.CallAsync(method, args); + + var queueName = ResolveName(BinderQueueName); + var queueDescription = await _namespaceManager.GetQueueAsync(queueName); + Assert.Equal(numMessages * 3, queueDescription.MessageCount); + } + [Fact] public async Task CustomMessageProcessorTest() { @@ -231,6 +252,9 @@ private void Cleanup() _secondaryNamespaceManager.DeleteQueue(elementName); } + elementName = ResolveName(BinderQueueName); + CleanupQueue(elementName); + elementName = ResolveName(QueueNamePrefix + "1"); CleanupQueue(elementName); @@ -449,6 +473,26 @@ public static void MultipleAccounts( { output = input; } + + [NoAutomaticTrigger] + public static async Task ServiceBusBinderTest( + string message, + int numMessages, + Binder binder) + { + var attribute = new ServiceBusAttribute(BinderQueueName) + { + EntityType = EntityType.Queue + }; + var collector = await binder.BindAsync>(attribute); + + for (int i = 0; i < numMessages; i++) + { + await collector.AddAsync(message + i); + } + + await collector.FlushAsync(); + } } /// diff --git a/test/Microsoft.Azure.WebJobs.ServiceBus.UnitTests/MessagingProviderTests.cs b/test/Microsoft.Azure.WebJobs.ServiceBus.UnitTests/MessagingProviderTests.cs index 3531eeeb1..c61d3f268 100644 --- a/test/Microsoft.Azure.WebJobs.ServiceBus.UnitTests/MessagingProviderTests.cs +++ b/test/Microsoft.Azure.WebJobs.ServiceBus.UnitTests/MessagingProviderTests.cs @@ -63,6 +63,23 @@ public void CreateMessageReceiver_ReturnsExpectedReceiver() Assert.Equal(100, receiver.PrefetchCount); } + [Fact] + public void CreateMessagingFactory_CachesPerConnection() + { + var factory = _provider.CreateMessagingFactory("test"); + + Assert.Same(factory, _provider.CreateMessagingFactory("test")); + Assert.Same(factory, _provider.CreateMessagingFactory("test")); + Assert.Same(factory, _provider.CreateMessagingFactory("test")); + + var factory2 = _provider.CreateMessagingFactory("test", "ServiceBusOverride"); + Assert.NotSame(factory, factory2); + + Assert.Same(factory2, _provider.CreateMessagingFactory("test", "ServiceBusOverride")); + Assert.Same(factory2, _provider.CreateMessagingFactory("test", "ServiceBusOverride")); + Assert.Same(factory2, _provider.CreateMessagingFactory("test", "ServiceBusOverride")); + } + [Fact] public void GetConnectionString_ThrowsIfConnectionStringNullOrEmpty() {