Skip to content

Commit

Permalink
(Dev) Exception logging for EventProcessorHost background exceptions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed May 23, 2018
1 parent ad66a05 commit f96699b
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Logging;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.ServiceBus
{
Expand All @@ -32,9 +34,7 @@ public class EventHubConfiguration : IExtensionConfigProvider
private readonly Dictionary<string, ReceiverCreds> _receiverCreds = new Dictionary<string, ReceiverCreds>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, EventProcessorHost> _explicitlyProvidedHosts = new Dictionary<string, EventProcessorHost>(StringComparer.OrdinalIgnoreCase);

private readonly EventProcessorOptions _options;

private string _defaultStorageString; // set to JobHostConfig.StorageConnectionString
private string _defaultStorageString;
private int _batchCheckpointFrequency = 1;

/// <summary>
Expand All @@ -43,27 +43,18 @@ public class EventHubConfiguration : IExtensionConfigProvider
/// </summary>
public const string LeaseContainerName = "azure-webjobs-eventhub";

/// <summary>
/// default constructor. Callers can reference this without having any assembly references to service bus assemblies.
/// </summary>
public EventHubConfiguration()
: this(null)
{
}

/// <summary>
/// Constructs a new instance.
/// </summary>
/// <param name="options">The optional <see cref="EventProcessorOptions"/> to use when receiving events.</param>
public EventHubConfiguration(EventProcessorOptions options)
public EventHubConfiguration()
{
if (options == null)
{
options = EventProcessorOptions.DefaultOptions;
options.MaxBatchSize = 64;
options.PrefetchCount = options.MaxBatchSize * 4;
}
_options = options;
// Our default options will delegate to our own exception
// logger. Customers can override this completely by setting their
// own EventProcessorOptions instance.
EventProcessorOptions = EventProcessorOptions.DefaultOptions;
EventProcessorOptions.MaxBatchSize = 64;
EventProcessorOptions.PrefetchCount = EventProcessorOptions.MaxBatchSize * 4;
EventProcessorOptions.SetExceptionHandler(ExceptionReceivedHandler);
}

/// <summary>
Expand Down Expand Up @@ -376,9 +367,7 @@ public static string GetBlobPrefix(string eventHubName, string serviceBusNamespa
string key = EscapeBlobPath(serviceBusNamespace) + "/" + EscapeBlobPath(eventHubName) + "/";
return key;
}

// Get the eventhub options, used by the EventHub SDK for listening on event.
internal EventProcessorOptions GetOptions() => _options;
public EventProcessorOptions EventProcessorOptions { get; set; }

void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
{
Expand All @@ -388,7 +377,7 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
}

// apply at eventProcessorOptions level (maxBatchSize, prefetchCount)
context.ApplyConfig(_options, "eventHub");
context.ApplyConfig(EventProcessorOptions, "eventHub");

// apply at config level (batchCheckpointFrequency)
context.ApplyConfig(this, "eventHub");
Expand All @@ -411,7 +400,20 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)

// register our binding provider
context.AddBindingRule<EventHubAttribute>()
.BindToCollector(BuildFromAttribute);
.BindToCollector(BuildFromAttribute);

// Set the default exception handler for background exceptions
// coming from the EventProcessorHost.
ExceptionHandler = (e =>
{
var ehex = e.Exception as EventHubsException;
if (ehex != null && !ehex.IsTransient)
{
string message = $"EventProcessorHost error (Action={e.Action})";
var logger = context.Config.LoggerFactory?.CreateLogger(LogCategories.Executor);
logger?.LogError(0, e.Exception, message);
}
});
}

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
Expand All @@ -437,6 +439,13 @@ private static Task<object> ConvertPocoToEventData(object arg, Attribute attrRes
return Task.FromResult<object>(ConvertString2EventData(JsonConvert.SerializeObject(arg)));
}

internal Action<ExceptionReceivedEventArgs> ExceptionHandler { get; set; }

private void ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
{
ExceptionHandler?.Invoke(args);
}

// Hold credentials for a given eventHub name.
// Multiple consumer groups (and multiple listeners) on the same hub can share the same credentials.
private class ReceiverCreds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public EventHubListener(ITriggeredFunctionExecutor executor, EventProcessorHost
_executor = executor;
_eventListener = eventListener;
_singleDispatch = single;
_options = config.GetOptions();
_options = config.EventProcessorOptions;
_config = config;
_logger = logger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class ServiceBusConfiguration
public ServiceBusConfiguration()
{
// Our default options will delegate to our own exception
// logger. Customers can override this completely.
// logger. Customers can override this completely by setting their
// own MessageHandlerOptions instance.
MessageOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void Initialize(ExtensionConfigContext context)
throw new ArgumentNullException("context");
}

// Register an exception handler for background exceptions
// Set the default exception handler for background exceptions
// coming from MessageReceivers.
Config.ExceptionHandler = (e) =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Reflection;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;
using Xunit;

namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
{
public class EventHubConfigurationTests
{
[Fact]
public void Initialize_PerformsExpectedRegistrations()
{
JobHostConfiguration config = new JobHostConfiguration();
config.AddService<INameResolver>(new RandomNameResolver());

TestLoggerProvider loggerProvider = new TestLoggerProvider();
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddProvider(loggerProvider);
config.LoggerFactory = loggerFactory;

EventHubConfiguration eventHubConfiguration = new EventHubConfiguration();

IExtensionRegistry extensions = config.GetService<IExtensionRegistry>();
ITriggerBindingProvider[] triggerBindingProviders = extensions.GetExtensions<ITriggerBindingProvider>().ToArray();
Assert.Empty(triggerBindingProviders);
IBindingProvider[] bindingProviders = extensions.GetExtensions<IBindingProvider>().ToArray();
Assert.Empty(bindingProviders);

ExtensionConfigContext context = new ExtensionConfigContext
{
Config = config,
};
((IExtensionConfigProvider)eventHubConfiguration).Initialize(context);

// ensure the EventHubTriggerAttributeBindingProvider was registered
triggerBindingProviders = extensions.GetExtensions<ITriggerBindingProvider>().ToArray();
EventHubTriggerAttributeBindingProvider triggerBindingProvider = (EventHubTriggerAttributeBindingProvider)triggerBindingProviders.Single();
Assert.NotNull(triggerBindingProvider);

// ensure the EventProcessorOptions ExceptionReceived event is wired up
var eventProcessorOptions = eventHubConfiguration.EventProcessorOptions;
var ex = new EventHubsException(false, "Kaboom!");
var ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
var args = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "Testing" });
var handler = (Action<ExceptionReceivedEventArgs>)eventProcessorOptions.GetType().GetField("exceptionHandler", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(eventProcessorOptions);
handler.Method.Invoke(handler.Target, new object[] { args });

string expectedMessage = "EventProcessorHost error (Action=Testing)";
var logMessage = loggerProvider.GetAllLogMessages().Single();
Assert.Equal(LogLevel.Error, logMessage.Level);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
Assert.Same(ex, logMessage.Exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
<RootNamespace>Microsoft.Azure.WebJobs.ServiceBus.UnitTests</RootNamespace>
<RootNamespace>Microsoft.Azure.WebJobs.EventHubs.UnitTests</RootNamespace>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.ServiceBus;
using Xunit;

using static Microsoft.Azure.EventHubs.EventData;

namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
{
public class EventHubAsyncCollectorTests
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Xunit;
using static Microsoft.Azure.EventHubs.EventData;

namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
{
public class EventHubTests
{
Expand Down Expand Up @@ -240,7 +241,7 @@ public void InitializeFromHostMetadata()
context.Config.AddService<ILoggerFactory>(new LoggerFactory());
(config as IExtensionConfigProvider).Initialize(context);

var options = config.GetOptions();
var options = config.EventProcessorOptions;
Assert.Equal(100, options.MaxBatchSize);
Assert.Equal(200, options.PrefetchCount);
Assert.Equal(5, config.BatchCheckpointFrequency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.ServiceBus;
using Xunit;

namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests
namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests
{
public class EventHubListenerTests
{
Expand Down

0 comments on commit f96699b

Please sign in to comment.