Skip to content

Commit

Permalink
Fix EventHub configuration (#1857)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Sep 14, 2018
1 parent a37d14f commit eeb0744
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.EventHubs
{
[Extension("EventHubs", configurationSection: "EventHubs")]
internal class EventHubExtensionConfigProvider : IExtensionConfigProvider
{
private readonly EventHubConfiguration _options;
public IConfiguration _config;
private readonly IOptions<EventHubOptions> _options;
private readonly ILoggerFactory _loggerFactory;
private readonly IConverterManager _converterManager;
private readonly INameResolver _nameResolver;
private readonly IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> _configuration;

public EventHubExtensionConfigProvider(EventHubConfiguration options, ILoggerFactory loggerFactory,
public EventHubExtensionConfigProvider(IConfiguration config, IOptions<EventHubOptions> options, ILoggerFactory loggerFactory,
IConverterManager converterManager, INameResolver nameResolver, IWebJobsExtensionConfiguration<EventHubExtensionConfigProvider> configuration)
{
_config = config;
_options = options;
_loggerFactory = loggerFactory;
_converterManager = converterManager;
Expand All @@ -49,8 +52,7 @@ public void Initialize(ExtensionConfigContext context)
throw new ArgumentNullException(nameof(context));
}

EventProcessorOptions options = _options.GetOptions();
options.SetExceptionHandler(ExceptionReceivedHandler);
_options.Value.EventProcessorOptions.SetExceptionHandler(ExceptionReceivedHandler);

// apply at config level (batchCheckpointFrequency)
// TODO: Revisit this... All configurable options should move to a proper Options type.
Expand All @@ -64,7 +66,7 @@ public void Initialize(ExtensionConfigContext context)
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);

// register our trigger binding provider
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_nameResolver, _converterManager, _options, _loggerFactory);
var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_config, _nameResolver, _converterManager, _options, _loggerFactory);
context.AddBindingRule<EventHubTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

Expand Down Expand Up @@ -122,7 +124,7 @@ private static LogLevel GetLogLevel(Exception ex)

private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubClient client = _options.GetEventHubClient(attribute.EventHubName, attribute.Connection);
EventHubClient client = _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection);
return new EventHubAsyncCollector(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@

namespace Microsoft.Azure.WebJobs.EventHubs
{
/// <summary>
/// Provide configuration for event hubs.
/// This is primarily mapping names to underlying EventHub listener and receiver objects from the EventHubs SDK.
/// </summary>
public class EventHubConfiguration
public class EventHubOptions
{
// Event Hub Names are case-insensitive.
// The same path can have multiple connection strings with different permissions (sending and receiving),
Expand All @@ -29,39 +25,16 @@ public class EventHubConfiguration
private readonly Dictionary<string, ReceiverCreds> _receiverCreds = new Dictionary<string, ReceiverCreds>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, EventProcessorHost> _explicitlyProvidedHosts = new Dictionary<string, EventProcessorHost>(StringComparer.OrdinalIgnoreCase);

private readonly EventProcessorOptions _options;
private string _defaultStorageString;
private int _batchCheckpointFrequency = 1;

/// <summary>
/// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub.
/// Each event hub gets its own blob prefix within the container.
/// </summary>
public const string LeaseContainerName = "azure-webjobs-eventhub";
private int _batchCheckpointFrequency = 1;

/// <summary>
/// default constructor. Callers can reference this without having any assembly references to eventhub assemblies.
/// </summary>
public EventHubConfiguration()
: this(null, null)
public EventHubOptions()
{
}

/// <summary>
/// Constructs a new instance.
/// </summary>
/// <param name="options">The optional <see cref="EventProcessorOptions"/> to use when receiving events.</param>
public EventHubConfiguration(IConfiguration configuration, IOptions<EventProcessorOptions> options = null)
{
_options = options?.Value;
if (_options == null)
{
_options = EventProcessorOptions.DefaultOptions;
_options.MaxBatchSize = 64;
_options.PrefetchCount = _options.MaxBatchSize * 4;
}

_defaultStorageString = configuration?.GetWebJobsConnectionString(ConnectionStringNames.Storage);
EventProcessorOptions = EventProcessorOptions.DefaultOptions;
}

/// <summary>
Expand All @@ -84,6 +57,8 @@ public int BatchCheckpointFrequency
}
}

public EventProcessorOptions EventProcessorOptions { get; }

/// <summary>
/// Add an existing client for sending messages to an event hub. Infer the eventHub name from client.path
/// </summary>
Expand Down Expand Up @@ -236,7 +211,7 @@ internal EventHubClient GetEventHubClient(string eventHubName, string connection
}

// Lookup a listener for receiving events given the name provided in the [EventHubTrigger] attribute.
internal EventProcessorHost GetEventProcessorHost(string eventHubName, string consumerGroup)
internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string eventHubName, string consumerGroup)
{
ReceiverCreds creds;
if (this._receiverCreds.TryGetValue(eventHubName, out creds))
Expand All @@ -251,7 +226,8 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
var storageConnectionString = creds.StorageConnectionString;
if (storageConnectionString == null)
{
storageConnectionString = _defaultStorageString;
string defaultStorageString = config.GetWebJobsConnectionString(ConnectionStringNames.Storage);
storageConnectionString = defaultStorageString;
}

// If the connection string provides a hub name, that takes precedence.
Expand All @@ -264,7 +240,7 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
sb.EntityPath = null; // need to remove to use with EventProcessorHost
}

var @namespace = GetServiceBusNamespace(sb);
var @namespace = GetEventHubNamespace(sb);
var blobPrefix = GetBlobPrefix(actualPath, @namespace);

// Use blob prefix support available in EPH starting in 2.2.6
Expand Down Expand Up @@ -340,7 +316,7 @@ private static string EscapeBlobPath(string path)
return sb.ToString();
}

private static string GetServiceBusNamespace(EventHubsConnectionStringBuilder connectionString)
private static string GetEventHubNamespace(EventHubsConnectionStringBuilder connectionString)
{
// EventHubs only have 1 endpoint.
var url = connectionString.Endpoint;
Expand Down Expand Up @@ -375,9 +351,6 @@ public static string GetBlobPrefix(string eventHubName, string serviceBusNamespa
return key;
}

// Get the eventhub options, used by the EventHub SDK for listening on event.
internal EventProcessorOptions GetOptions() => _options;

// Hold credentials for a given eventHub name.
// Multiple consumer groups (and multiple listeners) on the same hub can share the same credentials.
private class ReceiverCreds
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,47 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.EventHubs.Processor;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;

namespace Microsoft.Extensions.Hosting
{
public static class EventHubWebJobsBuilderExtensions
{
public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

builder.AddEventHubs(p => {});

return builder;
}

public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<EventHubOptions> configure)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

builder.AddExtension<EventHubExtensionConfigProvider>()
.BindOptions<EventProcessorOptions>();
.BindOptions<EventHubOptions>();

builder.Services.TryAddSingleton<EventHubConfiguration>();
builder.Services.Configure<EventHubOptions>(options =>
{
configure(options);
});

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory
private readonly ITriggeredFunctionExecutor _executor;
private readonly EventProcessorHost _eventProcessorHost;
private readonly bool _singleDispatch;
private readonly EventProcessorOptions _options;
private readonly EventHubConfiguration _config;
private readonly EventHubOptions _options;
private readonly ILogger _logger;
private bool _started;

public EventHubListener(ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, EventHubConfiguration config, ILogger logger)
public EventHubListener(ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, EventHubOptions options, ILogger logger)
{
_executor = executor;
_eventProcessorHost = eventProcessorHost;
_singleDispatch = singleDispatch;
_options = config.GetOptions();
_config = config;
_options = options;
_logger = logger;
}

Expand All @@ -45,7 +43,7 @@ void IDisposable.Dispose()

public async Task StartAsync(CancellationToken cancellationToken)
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options);
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.EventProcessorOptions);
_started = true;
}

Expand All @@ -60,7 +58,7 @@ public async Task StopAsync(CancellationToken cancellationToken)

IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
{
return new EventProcessor(_config, _executor, _logger, _singleDispatch);
return new EventProcessor(_options, _executor, _logger, _singleDispatch);
}

/// <summary>
Expand All @@ -84,12 +82,12 @@ internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer
private int _batchCounter = 0;
private bool _disposed = false;

public EventProcessor(EventHubConfiguration config, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
{
_checkpointer = checkpointer ?? this;
_executor = executor;
_singleDispatch = singleDispatch;
_batchCheckpointFrequency = config.BatchCheckpointFrequency;
_batchCheckpointFrequency = options.BatchCheckpointFrequency;
_logger = logger;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,30 @@
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.EventHubs
{
internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider
{
private readonly INameResolver _nameResolver;
private readonly ILogger _logger;
private readonly EventHubConfiguration _eventHubConfig;
private readonly IConfiguration _config;
private readonly IOptions<EventHubOptions> _options;
private readonly IConverterManager _converterManager;

public EventHubTriggerAttributeBindingProvider(
IConfiguration configuration,
INameResolver nameResolver,
IConverterManager converterManager,
EventHubConfiguration eventHubConfig,
IOptions<EventHubOptions> options,
ILoggerFactory loggerFactory)
{
_config = configuration;
_nameResolver = nameResolver;
_converterManager = converterManager;
_eventHubConfig = eventHubConfig;
_options = options;
_logger = loggerFactory?.CreateLogger(LogCategories.CreateTriggerCategory("EventHub"));
}

Expand All @@ -56,15 +61,15 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex

if (!string.IsNullOrWhiteSpace(attribute.Connection))
{
_eventHubConfig.AddReceiver(resolvedEventHubName, _nameResolver.Resolve(attribute.Connection));
_options.Value.AddReceiver(resolvedEventHubName, _nameResolver.Resolve(attribute.Connection));
}

var eventHostListener = _eventHubConfig.GetEventProcessorHost(resolvedEventHubName, resolvedConsumerGroup);
var eventHostListener = _options.Value.GetEventProcessorHost(_config, resolvedEventHubName, resolvedConsumerGroup);

Func<ListenerFactoryContext, bool, Task<IListener>> createListener =
(factoryContext, singleDispatch) =>
{
IListener listener = new EventHubListener(factoryContext.Executor, eventHostListener, singleDispatch, _eventHubConfig, _logger);
IListener listener = new EventHubListener(factoryContext.Executor, eventHostListener, singleDispatch, _options.Value, _logger);
return Task.FromResult(listener);
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.WebJobs.ServiceBus.Config;
Expand All @@ -14,13 +15,37 @@ public static class ServiceBusHostBuilderExtensions
{
public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

builder.AddServiceBus(p => { });

return builder;
}

public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action<ServiceBusOptions> configure)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}

if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

builder.AddExtension<ServiceBusExtensionConfigProvider>()
.ConfigureOptions<ServiceBusOptions>((config, path, options) =>
{
options.ConnectionString = config.GetConnectionString(Constants.DefaultConnectionStringName);

IConfigurationSection section = config.GetSection(path);
section.Bind(options);

configure(options);
});

builder.Services.TryAddSingleton<MessagingProvider>();
Expand Down
Loading

0 comments on commit eeb0744

Please sign in to comment.