Skip to content

Commit

Permalink
Improvements to EventHub/ServiceBus error handling (#1784, #1760)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Jul 10, 2018
1 parent 98fa140 commit 10df564
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 63 deletions.
28 changes: 13 additions & 15 deletions sample/SampleHost/Program.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Configuration;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace SampleHost
{
Expand All @@ -20,15 +14,20 @@ public static async Task Main(string[] args)
{
var builder = new HostBuilder()
.UseEnvironment("Development")
.ConfigureWebJobsHost()
.ConfigureWebJobsHost(o =>
{
// TEMP - remove once https://github.com/Azure/azure-webjobs-sdk/issues/1802 is fixed
o.HostId = "ecad61-62cf-47f4-93b4-6efcded6";
})
.AddWebJobsLogging() // Enables WebJobs v1 classic logging
.AddAzureStorageCoreServices()
.AddAzureStorage()
.AddServiceBus()
.AddApplicationInsights()
.ConfigureAppConfiguration(config =>
.ConfigureAppConfiguration(b =>
{
// Adding command line as a configuration source
config.AddCommandLine(args);
b.AddCommandLine(args);
})
.ConfigureLogging(b =>
{
Expand All @@ -37,11 +36,10 @@ public static async Task Main(string[] args)
})
.UseConsoleLifetime();

var jobHost = builder.Build();

using (jobHost)
var host = builder.Build();
using (host)
{
await jobHost.RunAsync();
await host.RunAsync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,41 @@ internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILo
var logger = loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"EventProcessorHost error (Action={e.Action}, HostName={e.Hostname}, PartitionId={e.PartitionId})";

var ehex = e.Exception as EventHubsException;
if (!(e.Exception is OperationCanceledException) && (ehex == null || !ehex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
logger?.LogError(0, e.Exception, message);
}
else
{
// transient errors we log as verbose so we have a record
// of them, but we don't treat them as actual errors
logger?.LogDebug(0, e.Exception, message);
}
var logLevel = GetLogLevel(e.Exception);
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);
}
catch
{
// best effort logging
}
}

private static LogLevel GetLogLevel(Exception ex)
{
if (ex is ReceiverDisconnectedException ||
ex is LeaseLostException)
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
return LogLevel.Information;
}

var ehex = ex as EventHubsException;
if (!(ex is OperationCanceledException) && (ehex == null || !ehex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
return LogLevel.Error;
}
else
{
// transient messaging errors we log as info so we have a record
// of them, but we don't treat them as actual errors
return LogLevel.Information;
}
}

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubClient client = _options.GetEventHubClient(attribute.EventHubName, attribute.Connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ public static IServiceCollection AddWebJobs(this IServiceCollection services, Ac
services.TryAddSingleton<IFunctionOutputLogger, ConsoleFunctionOutputLogger>();
services.TryAddSingleton<IFunctionInstanceLogger, FunctionInstanceLogger>();
services.TryAddSingleton<IHostInstanceLogger, NullHostInstanceLogger>();


// TODO: FACAVAL FIX THIS - Right now, We're only registering the FixedIdProvider
// need to register the dynamic ID provider and verify if the logic in it can be improved (and have the storage dependency removed)
// Tracked by https://github.com/Azure/azure-webjobs-sdk/issues/1802
services.TryAddSingleton<IHostIdProvider, FixedHostIdProvider>();

services.TryAddSingleton<IDistributedLockManager, InMemorySingletonManager>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.ServiceBus.Bindings;
using Microsoft.Azure.WebJobs.ServiceBus.Triggers;
Expand Down Expand Up @@ -86,24 +85,30 @@ internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILo
var logger = loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"MessageReceiver error (Action={ctxt.Action}, ClientId={ctxt.ClientId}, EntityPath={ctxt.EntityPath}, Endpoint={ctxt.Endpoint})";

var sbex = e.Exception as ServiceBusException;
if (!(e.Exception is OperationCanceledException) && (sbex == null || !sbex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
logger?.LogError(0, e.Exception, message);
}
else
{
// transient errors we log as verbose so we have a record
// of them, but we don't treat them as actual errors
logger?.LogDebug(0, e.Exception, message);
}
var logLevel = GetLogLevel(e.Exception);
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);
}
catch
{
// best effort logging
}
}

private static LogLevel GetLogLevel(Exception ex)
{
var sbex = ex as ServiceBusException;
if (!(ex is OperationCanceledException) && (sbex == null || !sbex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
return LogLevel.Error;
}
else
{
// transient messaging errors we log as info so we have a record
// of them, but we don't treat them as actual errors
return LogLevel.Information;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void LogExceptionReceivedEvent_TransientEvent_LoggedAsVerbose()

string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
var logMessage = _loggerProvider.GetAllLogMessages().Single();
Assert.Equal(LogLevel.Debug, logMessage.Level);
Assert.Equal(LogLevel.Information, logMessage.Level);
Assert.Same(ex, logMessage.Exception);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
}
Expand All @@ -112,7 +112,7 @@ public void LogExceptionReceivedEvent_OperationCanceledException_LoggedAsVerbose

string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
var logMessage = _loggerProvider.GetAllLogMessages().Single();
Assert.Equal(LogLevel.Debug, logMessage.Level);
Assert.Equal(LogLevel.Information, logMessage.Level);
Assert.Same(ex, logMessage.Exception);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
}
Expand All @@ -131,5 +131,21 @@ public void LogExceptionReceivedEvent_NonMessagingException_LoggedAsError()
Assert.Same(ex, logMessage.Exception);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
}

[Fact]
public void LogExceptionReceivedEvent_PartitionExceptions_LoggedAsInfo()
{
var ctor = typeof(ReceiverDisconnectedException).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string) }, null);
var ex = (ReceiverDisconnectedException)ctor.Invoke(new object[] { "New receiver with higher epoch of '30402' is created hence current receiver with epoch '30402' is getting disconnected." });
ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single();
var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" });
EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory);

string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)";
var logMessage = _loggerProvider.GetAllLogMessages().Single();
Assert.Equal(LogLevel.Information, logMessage.Level);
Assert.Same(ex, logMessage.Exception);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Azure.WebJobs.ServiceBus.Config;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using Xunit;

namespace Microsoft.Azure.WebJobs.ServiceBus.UnitTests.Config
Expand All @@ -35,19 +34,6 @@ public void Constructor_SetsExpectedDefaults()
Assert.Equal(0, config.PrefetchCount);
}

[Fact]
public void ConnectionString_ReturnsExpectedDefaultUntilSetExplicitly()
{
ServiceBusOptions config = new ServiceBusOptions();

string defaultConnection = null; // AmbientConnectionStringProvider.Instance.GetConnectionString(ConnectionStringNames.ServiceBus);
Assert.Equal(defaultConnection, config.ConnectionString);

string testConnection = "testconnection";
config.ConnectionString = testConnection;
Assert.Equal(testConnection, config.ConnectionString);
}

[Fact]
public void PrefetchCount_GetSet()
{
Expand All @@ -73,7 +59,7 @@ public void LogExceptionReceivedEvent_NonTransientEvent_LoggedAsError()
}

[Fact]
public void LogExceptionReceivedEvent_TransientEvent_LoggedAsVerbose()
public void LogExceptionReceivedEvent_TransientEvent_LoggedAsInformation()
{
var ex = new ServiceBusException(true);
Assert.True(ex.IsTransient);
Expand All @@ -82,7 +68,7 @@ public void LogExceptionReceivedEvent_TransientEvent_LoggedAsVerbose()

var expectedMessage = $"MessageReceiver error (Action=TestAction, ClientId=TestClient, EntityPath=TestEntity, Endpoint=TestEndpoint)";
var logMessage = _loggerProvider.GetAllLogMessages().Single();
Assert.Equal(LogLevel.Debug, logMessage.Level);
Assert.Equal(LogLevel.Information, logMessage.Level);
Assert.Same(ex, logMessage.Exception);
Assert.Equal(expectedMessage, logMessage.FormattedMessage);
}
Expand Down

0 comments on commit 10df564

Please sign in to comment.