Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Merging dev to master to release version 4.2.0 (#82)
Browse files Browse the repository at this point in the history
* Add EventHub output logging. Fixes #60.
* Unifying logging of exceptions. Fixes #59.
* Fix nuget errors on pack (#73)
* Increasing max message size and batch size. Fixes #29
* User configurable initial offset support (#79)
* Instructions via Readme.md for setting up local environment to run integration tests

Co-authored-by: Alexey Rodionov <alrod@microsoft.com>
Co-authored-by: Pragna Gopa <pgopa@microsoft.com>
Co-authored-by: Sid Krishna <sidkri@microsoft.com>
  • Loading branch information
4 people authored Dec 8, 2020
1 parent daab3ae commit 738c799
Show file tree
Hide file tree
Showing 18 changed files with 442 additions and 75 deletions.
9 changes: 9 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Release notes
<!-- Please add your release notes in the following format:
- My change description (#PR)
-->
#### Version 4.2.0
- User configurable initial offset support [#79](https://github.com/Azure/azure-functions-eventhubs-extension/pull/79)

**Release sprint:** Sprint 87
[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Afeature+is%3Aclosed) ]
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,10 @@ public void Initialize(ExtensionConfigContext context)

internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILoggerFactory loggerFactory)
{
try
{
var logger = loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"EventProcessorHost error (Action={e.Action}, HostName={e.Hostname}, PartitionId={e.PartitionId})";
var logger = loggerFactory?.CreateLogger(LogCategories.Executor);
string message = $"EventProcessorHost error (Action='{e.Action}', HostName='{e.Hostname}', PartitionId='{e.PartitionId}').";

var logLevel = GetLogLevel(e.Exception);
logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message);
}
catch
{
// best effort logging
}
Utility.LogException(e.Exception, message, logger);
}

private static LogLevel GetLogLevel(Exception ex)
Expand Down Expand Up @@ -129,7 +121,7 @@ private static LogLevel GetLogLevel(Exception ex)
private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubClient client = _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection);
return new EventHubAsyncCollector(client);
return new EventHubAsyncCollector(client, _loggerFactory);
}

private static string ConvertEventData2String(EventData x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public EventHubOptions()
{
EventProcessorOptions = EventProcessorOptions.DefaultOptions;
PartitionManagerOptions = new PartitionManagerOptions();
InitialOffsetOptions = new InitialOffsetOptions();
}

/// <summary>
Expand All @@ -60,6 +61,8 @@ public int BatchCheckpointFrequency
}
}

public InitialOffsetOptions InitialOffsetOptions { get; set; }

public EventProcessorOptions EventProcessorOptions { get; }

public PartitionManagerOptions PartitionManagerOptions { get; }
Expand Down Expand Up @@ -390,11 +393,22 @@ public string Format()
};
}

JObject initialOffsetOptions = null;
if (InitialOffsetOptions != null)
{
initialOffsetOptions = new JObject
{
{ nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type },
{ nameof(InitialOffsetOptions.EnqueuedTimeUTC), InitialOffsetOptions.EnqueuedTimeUTC },
};
}

JObject options = new JObject
{
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
{ nameof(EventProcessorOptions), eventProcessorOptions },
{ nameof(PartitionManagerOptions), partitionManagerOptions }
{ nameof(PartitionManagerOptions), partitionManagerOptions },
{ nameof(InitialOffsetOptions), initialOffsetOptions }
};

return options.ToString(Formatting.Indented);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -18,7 +18,7 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder)
throw new ArgumentNullException(nameof(builder));
}

builder.AddEventHubs(p => {});
builder.AddEventHubs(ConfigureOptions);

return builder;
}
Expand All @@ -45,5 +45,38 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<

return builder;
}

internal static void ConfigureOptions(EventHubOptions options)
{
string offsetType = options?.InitialOffsetOptions?.Type?.ToLower() ?? String.Empty;
if (!offsetType.Equals(String.Empty))
{
switch (offsetType)
{
case "fromstart":
options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromStart(); };
break;
case "fromend":
options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromEnd(); };
break;
case "fromenqueuedtime":
try
{
DateTime enqueuedTimeUTC = DateTime.Parse(options.InitialOffsetOptions.EnqueuedTimeUTC).ToUniversalTime();
options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromEnqueuedTime(enqueuedTimeUTC); };
}
catch (System.FormatException fe)
{
string message = $"{nameof(EventHubOptions)}:{nameof(InitialOffsetOptions)}:{nameof(InitialOffsetOptions.EnqueuedTimeUTC)} is configured with an invalid format. " +
"Please use a format supported by DateTime.Parse(). e.g. 'yyyy-MM-ddTHH:mm:ssZ'";
throw new InvalidOperationException(message, fe);
}
break;
default:
throw new InvalidOperationException("An unsupported value was supplied for initialOffsetOptions.type");
}
// If not specified, EventProcessor's default offset will apply
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.EventHubs
{
public class InitialOffsetOptions
{
public string Type { get; set; } = "";
public string EnqueuedTimeUTC { get; set; } = "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,9 @@ public Task OpenAsync(PartitionContext context)

public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
string errorDetails = $"Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}'";
string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}').";

if (error is ReceiverDisconnectedException ||
error is LeaseLostException)
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
_logger.LogInformation($"An Event Hub exception of type '{error.GetType().Name}' was thrown from {errorDetails}. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.");
}
else
{
_logger.LogError(error, $"Error processing event from {errorDetails}");
}
Utility.LogException(error, errorDetails, _logger);

return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.EventHubs
{
/// <summary>
/// Core object to send events to EventHub.
/// Any user parameter that sends EventHub events will eventually get bound to this object.
/// This will queue events and send in batches, also keeping under the 256kb event hub limit per batch.
/// This will queue events and send in batches, also keeping under the 1024kb event hub limit per batch.
/// </summary>
internal class EventHubAsyncCollector : IAsyncCollector<EventData>
{
Expand All @@ -23,20 +25,23 @@ internal class EventHubAsyncCollector : IAsyncCollector<EventData>

private const int BatchSize = 100;

// Suggested to use 240k instead of 256k to leave padding room for headers.
private const int MaxByteSize = 240 * 1024;

// Suggested to use 1008k instead of 1024k to leave padding room for headers.
private const int MaxByteSize = 1008 * 1024;

private readonly ILogger _logger;

/// <summary>
/// Create a sender around the given client.
/// </summary>
/// <param name="client"></param>
public EventHubAsyncCollector(EventHubClient client)
public EventHubAsyncCollector(EventHubClient client, ILoggerFactory loggerFactory)
{
if (client == null)
{
throw new ArgumentNullException("client");
}
_client = client;
_logger = loggerFactory?.CreateLogger(LogCategories.Executor);
}

/// <summary>
Expand Down Expand Up @@ -96,6 +101,7 @@ public EventHubAsyncCollector(EventHubClient client)
/// <param name="batch">the set of events to send</param>
protected virtual async Task SendBatchAsync(IEnumerable<EventData> batch)
{
_logger?.LogDebug("Sending events to EventHub");
await _client.SendAsync(batch);
}

Expand Down
100 changes: 100 additions & 0 deletions src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Utility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Microsoft.Azure.WebJobs.EventHubs
{
internal class Utility
{
public static void LogException(Exception ex, string message, ILogger logger)
{
try
{
// Sometimes EventHub SDK aggregates an exception
AggregateException ae = ex as AggregateException;
if (ae != null && ae.InnerExceptions != null && ae.InnerExceptions.Count == 1)
{
ex = ae.InnerExceptions[0];
}

LogLevel logLevel = GetLevel(ex);
if (logLevel == LogLevel.Information)
{
message = $"{message} An exception of type '{ex.GetType().Name}' was thrown. This exception type is typically a result of Event Hub processor rebalancing or a transient error and can be safely ignored.";
}
logger?.Log(logLevel, 0, message, ex, (s, exc) => message);
}
catch
{
// best effort logging
}
}

private static LogLevel GetLevel(Exception ex)
{
if (ex == null)
{
throw new ArgumentNullException("ex");
}

if (ex is ReceiverDisconnectedException || ex is LeaseLostException
|| IsConflictLeaseIdMismatchWithLeaseOperation(ex))
{
// For EventProcessorHost these exceptions can happen as part
// of normal partition balancing across instances, so we want to
// trace them, but not treat them as errors.
return LogLevel.Information;
}

var ehex = ex as EventHubsException;
if (!(ex is OperationCanceledException) && (ehex == null || !ehex.IsTransient))
{
// any non-transient exceptions or unknown exception types
// we want to log as errors
return LogLevel.Error;
}
else
{
// transient messaging errors we log as info so we have a record
// of them, but we don't treat them as actual errors
return LogLevel.Information;
}
}

public static bool IsConflictLeaseIdMismatchWithLeaseOperation(Exception ex)
{
StorageException exception = ex as StorageException;
if (exception == null)
{
return false;
}

RequestResult result = exception.RequestInformation;

if (result == null)
{
return false;
}

if (result.HttpStatusCode != 409)
{
return false;
}

StorageExtendedErrorInformation extendedInformation = result.ExtendedErrorInformation;

if (extendedInformation == null)
{
return false;
}

return extendedInformation.ErrorCode == "LeaseIdMismatchWithLeaseOperation";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
<RootNamespace>Microsoft.Azure.WebJobs.EventHubs</RootNamespace>
<PackageId>Microsoft.Azure.WebJobs.Extensions.EventHubs</PackageId>
<Description>Microsoft Azure WebJobs SDK EventHubs Extension</Description>
<Version>4.1.1</Version>
<Version>4.2.0</Version>
<CommitHash Condition="$(CommitHash) == ''">N/A</CommitHash>
<InformationalVersion>$(Version) Commit hash: $(CommitHash)</InformationalVersion>
<Authors>Microsoft</Authors>
<Company>Microsoft</Company>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<PackageLicenseUrl>https://go.microsoft.com/fwlink/?linkid=2028464</PackageLicenseUrl>
<PackageIconUrl>https://raw.githubusercontent.com/Azure/azure-webjobs-sdk/dev/webjobs.png</PackageIconUrl>
<PackageRequireLicenseAcceptance>True</PackageRequireLicenseAcceptance>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Icon>webjobs.png</Icon>
<PackageProjectUrl>http://go.microsoft.com/fwlink/?LinkID=320972</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/Azure/azure-functions-servicebus-extension</RepositoryUrl>
Expand All @@ -38,7 +39,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="3.0.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.14" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.23" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public async Task EventHub_MultipleDispatch_IndependentMessages()
expectedLinks[i] = new TestLink
{
operation_Id = operationId,
id = $"|{operationId}.{spanId}."
id = $"{spanId}"
};

messages[i] = new EventData(Encoding.UTF8.GetBytes(_testPrefix + i))
Expand Down Expand Up @@ -388,7 +388,7 @@ private void ValidateEventHubDependency(
string parentId,
string category)
{
Assert.Equal($"{endpoint} | {entityName}", dependency.Target);
Assert.Equal($"{endpoint}{entityName}", dependency.Target);
Assert.Equal("Azure Event Hubs", dependency.Type);
Assert.Equal(name, dependency.Name);
Assert.True(dependency.Success);
Expand Down
Loading

0 comments on commit 738c799

Please sign in to comment.