Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to EventHub/ServiceBus error handling (#1784, #1760) #1788

Merged
merged 1 commit into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/Microsoft.Azure.WebJobs.Host/Extensions/LogLevelExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Host
{
internal static class LogLevelExtensions
{
internal static TraceLevel ToTraceLevel(this LogLevel logLevel)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied this from the functions runtime codebase

{
TraceLevel level = TraceLevel.Off;
switch (logLevel)
{
case LogLevel.Critical:
case LogLevel.Error:
level = TraceLevel.Error;
break;

case LogLevel.Trace:
case LogLevel.Debug:
level = TraceLevel.Verbose;
break;

case LogLevel.Information:
level = TraceLevel.Info;
break;

case LogLevel.Warning:
level = TraceLevel.Warning;
break;

default:
break;
}
return level;
}
}
}
1 change: 1 addition & 0 deletions src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@
<Compile Include="Blobs\Listeners\StorageBlobScanInfoManager.cs" />
<Compile Include="Blobs\ParameterizedBlobUrl.cs" />
<Compile Include="Blobs\StorageBlobToCloudAppendBlobConverter.cs" />
<Compile Include="Extensions\LogLevelExtensions.cs" />
<Compile Include="JobHostFunctionTimeoutConfiguration.cs" />
<Compile Include="Config\IWebhookProvider.cs" />
<Compile Include="Config\FluentConverterRules.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,15 @@ public void Initialize(ExtensionConfigContext context)
throw new ArgumentNullException("context");
}

// Register an exception handler for background exceptions
// coming from MessageReceivers.
//
// The message options is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
Config.MessageOptions.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "MessageReceiver", context.Trace, context.Config.LoggerFactory);
};

// get the services we need to construct our binding providers
INameResolver nameResolver = context.Config.GetService<INameResolver>();
IExtensionRegistry extensions = context.Config.GetService<IExtensionRegistry>();

// register the background exception handler
var exceptionHandler = MessagingExceptionHandler.Subscribe(Config.MessageOptions, context.Trace, context.Config.LoggerFactory);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key for ServiceBus issue fix was to capture the exception handler and flow it down to the listener, so it can be unsubscribed BEFORE we abort the messaging factory.


// register our trigger binding provider
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig);
ServiceBusTriggerAttributeBindingProvider triggerBindingProvider = new ServiceBusTriggerAttributeBindingProvider(nameResolver, _serviceBusConfig, exceptionHandler);
extensions.RegisterExtension<ITriggerBindingProvider>(triggerBindingProvider);

// register our binding provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
.AddConverter<EventData, byte[]>(ConvertEventData2Bytes);

// register the background exception handler
MessagingExceptionHandler.Subscribe(_options, context.Trace, context.Config.LoggerFactory);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to unsubscribe for EventHubs - the way our listener unregisters the EPH shuts down cleanly. I had flowed it down anyways, however we don't want to unregister before we close the EPH (since any errors happening during graceful shutdown we want to log).


// register our trigger binding provider
INameResolver nameResolver = context.Config.NameResolver;
IConverterManager cm = context.Config.GetService<IConverterManager>();
Expand All @@ -454,17 +457,6 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
// register our binding provider
context.AddBindingRule<EventHubAttribute>()
.BindToCollector(BuildFromAttribute);

// Register an exception handler for background exceptions
// coming from the EventProcessorHost.
//
// EventProcessorOptions is a host level instance that is shared
// across all bindings, so we have to subscribe to it at the
// host level.
_options.ExceptionReceived += (s, e) =>
{
Utility.LogExceptionReceivedEvent(e, "EventProcessorHost", context.Trace, context.Config.LoggerFactory);
};
}

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public EventHubTriggerAttributeBindingProvider(
IConverterManager converterManager,
EventHubConfiguration eventHubConfig)
{
this._nameResolver = nameResolver;
this._converterManager = converterManager;
this._eventHubConfig = eventHubConfig;
_nameResolver = nameResolver;
_converterManager = converterManager;
_eventHubConfig = eventHubConfig;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ internal sealed class ServiceBusListener : IListener
private readonly ServiceBusTriggerExecutor _triggerExecutor;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly MessageProcessor _messageProcessor;
private readonly MessagingExceptionHandler _exceptionHandler;
private MessagingFactory _messagingFactory;
private MessageReceiver _receiver;
private bool _disposed;
private bool _started;

public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config)
public ServiceBusListener(MessagingFactory messagingFactory, string entityPath, ServiceBusTriggerExecutor triggerExecutor, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_messagingFactory = messagingFactory;
_entityPath = entityPath;
_triggerExecutor = triggerExecutor;
_cancellationTokenSource = new CancellationTokenSource();
_messagingProvider = config.MessagingProvider;
_messageProcessor = config.MessagingProvider.CreateMessageProcessor(entityPath);
_exceptionHandler = exceptionHandler;
}

public Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -53,6 +55,11 @@ public Task StopAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();

// important to disable the exception handler BEFORE aborting
// the messaging factory, to avoid spurious error logs due
// to receive attempts on a closed connection, etc.
_exceptionHandler.Unsubscribe();

if (!_started)
{
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ internal class ServiceBusQueueListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusQueueListenerFactory(ServiceBusAccount account, string queueName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
_queueName = queueName;
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -40,7 +42,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
}

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, _queueName, triggerExecutor, _config, _exceptionHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ internal class ServiceBusSubscriptionListenerFactory : IListenerFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly AccessRights _accessRights;
private readonly ServiceBusConfiguration _config;
private readonly MessagingExceptionHandler _exceptionHandler;

public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config)
public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string topicName, string subscriptionName, ITriggeredFunctionExecutor executor, AccessRights accessRights, ServiceBusConfiguration config, MessagingExceptionHandler exceptionHandler)
{
_namespaceManager = account.NamespaceManager;
_messagingFactory = account.MessagingFactory;
Expand All @@ -29,6 +30,7 @@ public ServiceBusSubscriptionListenerFactory(ServiceBusAccount account, string t
_executor = executor;
_accessRights = accessRights;
_config = config;
_exceptionHandler = exceptionHandler;
}

public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
Expand All @@ -45,7 +47,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
string entityPath = SubscriptionClient.FormatSubscriptionPath(_topicName, _subscriptionName);

ServiceBusTriggerExecutor triggerExecutor = new ServiceBusTriggerExecutor(_executor);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config);
return new ServiceBusListener(_messagingFactory, entityPath, triggerExecutor, _config, _exceptionHandler);
}
}
}
159 changes: 159 additions & 0 deletions src/Microsoft.Azure.WebJobs.ServiceBus/MessagingExceptionHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.ServiceBus.Messaging;

namespace Microsoft.Azure.WebJobs.ServiceBus
{
internal abstract class MessagingExceptionHandler
{
private readonly TraceWriter _traceWriter;
private readonly ILoggerFactory _loggerFactory;
private string _source;

public MessagingExceptionHandler(string source, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
if (string.IsNullOrEmpty(source))
{
throw new ArgumentNullException(nameof(source));
}
if (traceWriter == null)
{
throw new ArgumentNullException(nameof(traceWriter));
}

_source = source;
_traceWriter = traceWriter;
_loggerFactory = loggerFactory;
}

public static MessagingExceptionHandler Subscribe(EventProcessorOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
var exceptionHandler = new EventHubExceptionHandler(options, traceWriter, loggerFactory);
exceptionHandler.Subscribe();
return exceptionHandler;
}

public static MessagingExceptionHandler Subscribe(OnMessageOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
{
var exceptionHandler = new ServiceBusExceptionHandler(options, traceWriter, loggerFactory);
exceptionHandler.Subscribe();
return exceptionHandler;
}

public abstract void Subscribe();

public abstract void Unsubscribe();

protected void Handle(object sender, ExceptionReceivedEventArgs e)
{
LogExceptionReceivedEvent(e);
}

internal void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e)
{
try
{
var logger = _loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"{_source} error (Action={e.Action}) : {e.Exception.ToString()}";

var logLevel = GetLogLevel(e.Exception);
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);

var traceEvent = new TraceEvent(logLevel.ToTraceLevel(), message, null, e.Exception);
_traceWriter.Trace(traceEvent);
}
catch
{
// best effort logging
}
}

protected virtual LogLevel GetLogLevel(Exception ex)
{
var mex = ex as MessagingException;
if (!(ex is OperationCanceledException) && (mex == null || !mex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
return LogLevel.Error;
}
else
{
// transient messaging errors we log as verbose so we have a record
// of them, but we don't treat them as actual errors
return LogLevel.Information;
}
}

private class EventHubExceptionHandler : MessagingExceptionHandler
{
private readonly EventProcessorOptions _options;

public EventHubExceptionHandler(EventProcessorOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
: base("EventProcessorHost", traceWriter, loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

_options = options;
}

public override void Subscribe()
{
_options.ExceptionReceived += Handle;
}

public override void Unsubscribe()
{
_options.ExceptionReceived -= Handle;
}

protected override LogLevel GetLogLevel(Exception ex)
{
if (ex is ReceiverDisconnectedException ||
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change for the EventHub issue fix - overriding the base level determination to make these Info level rather than errors as they are now (generating CRIs)

ex is LeaseLostException)
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
return LogLevel.Information;
}

return base.GetLogLevel(ex);
}
}

private class ServiceBusExceptionHandler : MessagingExceptionHandler
{
private readonly OnMessageOptions _options;

public ServiceBusExceptionHandler(OnMessageOptions options, TraceWriter traceWriter, ILoggerFactory loggerFactory = null)
: base("MessageReceiver", traceWriter, loggerFactory)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

_options = options;
}

public override void Subscribe()
{
_options.ExceptionReceived += Handle;
}

public override void Unsubscribe()
{
_options.ExceptionReceived -= Handle;
}
}
}
}
Loading