Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] changing 'rebalancing' errors to be logged as Info #2176

Merged
merged 1 commit into from
Apr 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,20 @@ public Task OpenAsync(PartitionContext context)

public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
string errorMessage = $"Error processing event from Partition Id:{context.PartitionId}, Owner:{context.Owner}, EventHubPath:{context.EventHubPath}";
_logger.LogError(error, errorMessage);
string errorDetails = $"Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}'";

if (error is ReceiverDisconnectedException ||
error is LeaseLostException)
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
_logger.LogInformation($"An Event Hub exception of type '{error.GetType().Name}' was thrown from {errorDetails}. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeffhollan -- any suggestions for this message?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. Thanks

}
else
{
_logger.LogError(error, $"Error processing event from {errorDetails}");
}

return Task.CompletedTask;
}
Expand Down Expand Up @@ -216,6 +228,6 @@ async Task ICheckpointer.CheckpointAsync(PartitionContext context)
{
await context.CheckpointAsync();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
Expand Down Expand Up @@ -127,6 +131,60 @@ public async Task CloseAsync_Shutdown_DoesNotCheckpoint()
await eventProcessor.CloseAsync(partitionContext, CloseReason.Shutdown);

checkpointer.Verify(p => p.CheckpointAsync(partitionContext), Times.Never);
}

[Fact]
public async Task ProcessErrorsAsync_LoggedAsError()
{
var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def");
var options = new EventHubOptions();
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
var testLogger = new TestLogger("Test");
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object);

var ex = new InvalidOperationException("My InvalidOperationException!");

await eventProcessor.ProcessErrorAsync(partitionContext, ex);
var msg = testLogger.GetLogMessages().Single();
Assert.Equal("Error processing event from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'", msg.FormattedMessage);
Assert.IsType<InvalidOperationException>(msg.Exception);
Assert.Equal(LogLevel.Error, msg.Level);
}

[Fact]
public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation()
{
var partitionContext = EventHubTests.GetPartitionContext(partitionId: "123", eventHubPath: "abc", owner: "def");
var options = new EventHubOptions();
var checkpointer = new Mock<EventHubListener.ICheckpointer>(MockBehavior.Strict);
var executor = new Mock<ITriggeredFunctionExecutor>(MockBehavior.Strict);
var testLogger = new TestLogger("Test");
var eventProcessor = new EventHubListener.EventProcessor(options, executor.Object, testLogger, true, checkpointer.Object);

// ctor is private
var constructor = typeof(ReceiverDisconnectedException)
.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string) }, null);
ReceiverDisconnectedException disconnectedEx = (ReceiverDisconnectedException)constructor.Invoke(new[] { "My ReceiverDisconnectedException!" });

await eventProcessor.ProcessErrorAsync(partitionContext, disconnectedEx);
var msg = testLogger.GetLogMessages().Single();
Assert.Equal("An Event Hub exception of type 'ReceiverDisconnectedException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage);
Assert.Null(msg.Exception);
Assert.Equal(LogLevel.Information, msg.Level);

testLogger.ClearLogMessages();

// ctor is private
constructor = typeof(LeaseLostException)
.GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(string), typeof(Exception) }, null);
LeaseLostException leaseLostEx = (LeaseLostException)constructor.Invoke(new object[] { "My LeaseLostException!", new Exception() });

await eventProcessor.ProcessErrorAsync(partitionContext, leaseLostEx);
msg = testLogger.GetLogMessages().Single();
Assert.Equal("An Event Hub exception of type 'LeaseLostException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage);
Assert.Null(msg.Exception);
Assert.Equal(LogLevel.Information, msg.Level);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -259,14 +258,25 @@ public void InitializeFromHostMetadata()
Assert.Equal(5, options.BatchCheckpointFrequency);
}

internal static PartitionContext GetPartitionContext()
internal static PartitionContext GetPartitionContext(string partitionId = null, string eventHubPath = null,
string consumerGroupName = null, string owner = null)
{
var constructor = typeof(PartitionContext).GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new Type[] { typeof(EventProcessorHost), typeof(string), typeof(string), typeof(string), typeof(CancellationToken) },
null);
return (PartitionContext)constructor.Invoke(new object[] { null, null, null, null, null });
var context = (PartitionContext)constructor.Invoke(new object[] { null, partitionId, eventHubPath, consumerGroupName, null });

// Set a lease, which allows us to grab the "Owner"
constructor = typeof(Lease).GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { }, null);
var lease = (Lease)constructor.Invoke(new object[] { });
lease.Owner = owner;

var leaseProperty = typeof(PartitionContext).GetProperty("Lease", BindingFlags.NonPublic | BindingFlags.Instance);
leaseProperty.SetValue(context, lease);

return context;
}
}
}