Skip to content

Commit

Permalink
Improvements to EventHub/ServiceBus error handling (#1784, #1760)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Jul 3, 2018
1 parent dd53af8 commit c731190
Show file tree
Hide file tree
Showing 22 changed files with 313 additions and 116 deletions.
40 changes: 40 additions & 0 deletions src/Microsoft.Azure.WebJobs.Host/Extensions/LogLevelExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Host
{
internal static class LogLevelExtensions
{
internal static TraceLevel ToTraceLevel(this LogLevel logLevel)
{
TraceLevel level = TraceLevel.Off;
switch (logLevel)
{
case LogLevel.Critical:
case LogLevel.Error:
level = TraceLevel.Error;
break;

case LogLevel.Trace:
case LogLevel.Debug:
level = TraceLevel.Verbose;
break;

case LogLevel.Information:
level = TraceLevel.Info;
break;

case LogLevel.Warning:
level = TraceLevel.Warning;
break;

default:
break;
}
return level;
}
}
}
1 change: 1 addition & 0 deletions src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@
<Compile Include="Blobs\Listeners\StorageBlobScanInfoManager.cs" />
<Compile Include="Blobs\ParameterizedBlobUrl.cs" />
<Compile Include="Blobs\StorageBlobToCloudAppendBlobConverter.cs" />
<Compile Include="Extensions\LogLevelExtensions.cs" />
<Compile Include="JobHostFunctionTimeoutConfiguration.cs" />
<Compile Include="Config\IWebhookProvider.cs" />
<Compile Include="Config\FluentConverterRules.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,15 @@ public void Initialize(ExtensionConfigContext context)
throw new ArgumentNullException("context");
}

// Register an exception handler for background exceptions
// coming from MessageReceivers.
//
// The message options is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
Config.MessageOptions.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "MessageReceiver", context.Trace, context.Config.LoggerFactory);
};

// get the services we need to construct our binding providers
INameResolver nameResolver = context.Config.GetService<INameResolver>();
IExtensionRegistry extensions = context.Config.GetService<IExtensionRegistry>();

// register the background exception handler
var exceptionHandler = MessagingExceptionHandler.Subscribe(Config.MessageOptions, context.Trace, context.Config.LoggerFactory);

// register our trigger binding provider
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig);
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig, exceptionHandler);
extensions.RegisterExtension<ITriggerBindingProvider>(triggerBindingProvider);

// register our binding provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
.AddConverter<EventData, byte[]>(ConvertEventData2Bytes);

// register the background exception handler
MessagingExceptionHandler.Subscribe(_options, context.Trace, context.Config.LoggerFactory);

// register our trigger binding provider
INameResolver nameResolver = context.Config.NameResolver;
IConverterManager cm = context.Config.GetService<IConverterManager>();
Expand All @@ -454,17 +457,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
// register our binding provider
context.AddBindingRule<EventHubAttribute>()
.BindToCollector(BuildFromAttribute);

// Register an exception handler for background exceptions
// coming from the EventProcessorHost.
//
// EventProcessorOptions is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
_options.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "EventProcessorHost", context.Trace, context.Config.LoggerFactory);
};
}

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public EventHubTriggerAttributeBindingProvider(
IConverterManager converterManager,
EventHubConfiguration eventHubConfig)
{
this._nameResolver = nameResolver;
this._converterManager = converterManager;
this._eventHubConfig = eventHubConfig;
_nameResolver = nameResolver;
_converterManager = converterManager;
_eventHubConfig = eventHubConfig;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ internal sealed class ServiceBusListener : IListener
private readonly ServiceBusTriggerExecutor _triggerExecutor;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly MessageProcessor _messageProcessor;
private readonly MessagingExceptionHandler _exceptionHandler;
private MessagingFactory _messagingFactory;
private MessageReceiver _receiver;
private bool _disposed;
private bool _started;

public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config)
public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_messagingFactory = messagingFactory;
_entityPath = entityPath;
_triggerExecutor = triggerExecutor;
_cancellationTokenSource = new CancellationTokenSource();
_messagingProvider = config.MessagingProvider;
_messageProcessor = config.MessagingProvider.CreateMessageProcessor(entityPath);
_exceptionHandler = exceptionHandler;
}

public Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -53,6 +55,11 @@ public Task StopAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();

// important to disable the exception handler BEFORE aborting
// the messaging factory, to avoid spurious error logs due
// to receive attempts on a closed connection, etc.
_exceptionHandler.Unsubscribe();

if (!_started)
{
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ internal class ServiceBusQueueListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
_queueName = queueName;
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -40,7 +42,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
}

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config, _exceptionHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ internal class ServiceBusSubscriptionListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
Expand All @@ -29,6 +30,7 @@ public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string t
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -45,7 +47,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
string entityPath = SubscriptionClient.FormatSubscriptionPath(_topicName, _subscriptionName);

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config, _exceptionHandler);
}
}
}
159 changes: 159 additions & 0 deletions src/Microsoft.Azure.WebJobs.ServiceBus/MessagingExceptionHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.ServiceBus.Messaging;

namespace Microsoft.Azure.WebJobs.ServiceBus
{
internal abstract class MessagingExceptionHandler
{
private readonly TraceWriter _traceWriter;
private readonly ILoggerFactory _loggerFactory;
private string _source;

public MessagingExceptionHandler(string source, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
if (string.IsNullOrEmpty(source))
{
throw new ArgumentNullException(nameof(source));
}
if (traceWriter == null)
{
throw new ArgumentNullException(nameof(traceWriter));
}

_source = source;
_traceWriter = traceWriter;
_loggerFactory = loggerFactory;
}

public static MessagingExceptionHandler Subscribe(EventProcessorOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
var exceptionHandler = new EventHubExceptionHandler(options, traceWriter, loggerFactory);
exceptionHandler.Subscribe();
return exceptionHandler;
}

public static MessagingExceptionHandler Subscribe(OnMessageOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
var exceptionHandler = new ServiceBusExceptionHandler(options, traceWriter, loggerFactory);
exceptionHandler.Subscribe();
return exceptionHandler;
}

public abstract void Subscribe();

public abstract void Unsubscribe();

protected void Handle(object sender, ExceptionReceivedEventArgs e)
{
LogExceptionReceivedEvent(e);
}

internal void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e)
{
try
{
var logger = _loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"{_source} error (Action={e.Action}) : {e.Exception.ToString()}";

var logLevel = GetLogLevel(e.Exception);
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);

var traceEvent = new TraceEvent(logLevel.ToTraceLevel(), message, null, e.Exception);
_traceWriter.Trace(traceEvent);
}
catch
{
// best effort logging
}
}

protected virtual LogLevel GetLogLevel(Exception ex)
{
var mex = ex as MessagingException;
if (!(ex is OperationCanceledException) && (mex == null || !mex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
return LogLevel.Error;
}
else
{
// transient messaging errors we log as verbose so we have a record
// of them, but we don't treat them as actual errors
return LogLevel.Information;
}
}

private class EventHubExceptionHandler : MessagingExceptionHandler
{
private readonly EventProcessorOptions _options;

public EventHubExceptionHandler(EventProcessorOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
: base("EventProcessorHost", traceWriter, loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

_options = options;
}

public override void Subscribe()
{
_options.ExceptionReceived += Handle;
}

public override void Unsubscribe()
{
_options.ExceptionReceived -= Handle;
}

protected override LogLevel GetLogLevel(Exception ex)
{
if (ex is ReceiverDisconnectedException ||
ex is LeaseLostException)
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
return LogLevel.Information;
}

return base.GetLogLevel(ex);
}
}

private class ServiceBusExceptionHandler : MessagingExceptionHandler
{
private readonly OnMessageOptions _options;

public ServiceBusExceptionHandler(OnMessageOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
: base("MessageReceiver", traceWriter, loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

_options = options;
}

public override void Subscribe()
{
_options.ExceptionReceived += Handle;
}

public override void Unsubscribe()
{
_options.ExceptionReceived -= Handle;
}
}
}
}
Loading

0 comments on commit c731190

Please sign in to comment.