Skip to content

Commit

Permalink
Improvements to EventHub/ServiceBus error handling (#1784, #1760)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Jul 3, 2018
1 parent dd53af8 commit e721347
Show file tree
Hide file tree
Showing 21 changed files with 313 additions and 128 deletions.
37 changes: 23 additions & 14 deletions sample/SampleHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.ServiceBus;
Expand All @@ -13,24 +14,32 @@ class Program
{
static void Main(string[] args)
{
var config = new JobHostConfiguration();
config.Queues.VisibilityTimeout = TimeSpan.FromSeconds(15);
config.Queues.MaxDequeueCount = 3;
config.LoggerFactory = new LoggerFactory().AddConsole();
while (true)
{
var config = new JobHostConfiguration();
config.Queues.VisibilityTimeout = TimeSpan.FromSeconds(15);
config.Queues.MaxDequeueCount = 3;
config.LoggerFactory = new LoggerFactory().AddConsole();

config.UseServiceBus();
var eventHubConfig = new EventHubConfiguration();
config.UseEventHub(eventHubConfig);
config.UseServiceBus();
var eventHubConfig = new EventHubConfiguration();
config.UseEventHub(eventHubConfig);

if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}
if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}

CheckAndEnableAppInsights(config);
CheckAndEnableAppInsights(config);

var host = new JobHost(config);
host.RunAndBlock();
var host = new JobHost(config);
host.StartAsync().Wait();

Task.Delay(5000);

host.Stop();
host.Dispose();
}
}

private static void CheckAndEnableAppInsights(JobHostConfiguration config)
Expand Down
40 changes: 40 additions & 0 deletions src/Microsoft.Azure.WebJobs.Host/Extensions/LogLevelExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Host
{
internal static class LogLevelExtensions
{
internal static TraceLevel ToTraceLevel(this LogLevel logLevel)
{
TraceLevel level = TraceLevel.Off;
switch (logLevel)
{
case LogLevel.Critical:
case LogLevel.Error:
level = TraceLevel.Error;
break;

case LogLevel.Trace:
case LogLevel.Debug:
level = TraceLevel.Verbose;
break;

case LogLevel.Information:
level = TraceLevel.Info;
break;

case LogLevel.Warning:
level = TraceLevel.Warning;
break;

default:
break;
}
return level;
}
}
}
1 change: 1 addition & 0 deletions src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@
<Compile Include="Blobs\Listeners\StorageBlobScanInfoManager.cs" />
<Compile Include="Blobs\ParameterizedBlobUrl.cs" />
<Compile Include="Blobs\StorageBlobToCloudAppendBlobConverter.cs" />
<Compile Include="Extensions\LogLevelExtensions.cs" />
<Compile Include="JobHostFunctionTimeoutConfiguration.cs" />
<Compile Include="Config\IWebhookProvider.cs" />
<Compile Include="Config\FluentConverterRules.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,15 @@ public void Initialize(ExtensionConfigContext context)
throw new ArgumentNullException("context");
}

// Register an exception handler for background exceptions
// coming from MessageReceivers.
//
// The message options is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
Config.MessageOptions.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "MessageReceiver", context.Trace, context.Config.LoggerFactory);
};

// get the services we need to construct our binding providers
INameResolver nameResolver = context.Config.GetService<INameResolver>();
IExtensionRegistry extensions = context.Config.GetService<IExtensionRegistry>();

// register the background exception handler
var exceptionHandler = MessagingExceptionHandler.Subscribe(Config.MessageOptions, context.Trace, context.Config.LoggerFactory);

// register our trigger binding provider
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig);
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig, exceptionHandler);
extensions.RegisterExtension<ITriggerBindingProvider>(triggerBindingProvider);

// register our binding provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
.AddConverter<EventData, byte[]>(ConvertEventData2Bytes);

// register the background exception handler
MessagingExceptionHandler.Subscribe(_options, context.Trace, context.Config.LoggerFactory);

// register our trigger binding provider
INameResolver nameResolver = context.Config.NameResolver;
IConverterManager cm = context.Config.GetService<IConverterManager>();
Expand All @@ -454,17 +457,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
// register our binding provider
context.AddBindingRule<EventHubAttribute>()
.BindToCollector(BuildFromAttribute);

// Register an exception handler for background exceptions
// coming from the EventProcessorHost.
//
// EventProcessorOptions is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
_options.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "EventProcessorHost", context.Trace, context.Config.LoggerFactory);
};
}

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public EventHubTriggerAttributeBindingProvider(
IConverterManager converterManager,
EventHubConfiguration eventHubConfig)
{
this._nameResolver = nameResolver;
this._converterManager = converterManager;
this._eventHubConfig = eventHubConfig;
_nameResolver = nameResolver;
_converterManager = converterManager;
_eventHubConfig = eventHubConfig;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ internal sealed class ServiceBusListener : IListener
private readonly ServiceBusTriggerExecutor _triggerExecutor;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly MessageProcessor _messageProcessor;
private readonly MessagingExceptionHandler _exceptionHandler;
private MessagingFactory _messagingFactory;
private MessageReceiver _receiver;
private bool _disposed;
private bool _started;

public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config)
public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_messagingFactory = messagingFactory;
_entityPath = entityPath;
_triggerExecutor = triggerExecutor;
_cancellationTokenSource = new CancellationTokenSource();
_messagingProvider = config.MessagingProvider;
_messageProcessor = config.MessagingProvider.CreateMessageProcessor(entityPath);
_exceptionHandler = exceptionHandler;
}

public Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -53,6 +55,11 @@ public Task StopAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();

// important to disable the exception handler BEFORE aborting
// the messaging factory, to avoid spurious error logs due
// to receive attempts on a closed connection, etc.
_exceptionHandler.Unsubscribe();

if (!_started)
{
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ internal class ServiceBusQueueListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
_queueName = queueName;
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -40,7 +42,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
}

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config, _exceptionHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ internal class ServiceBusSubscriptionListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
Expand All @@ -29,6 +30,7 @@ public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string t
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -45,7 +47,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
string entityPath = SubscriptionClient.FormatSubscriptionPath(_topicName, _subscriptionName);

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config, _exceptionHandler);
}
}
}
Loading

0 comments on commit e721347

Please sign in to comment.