From 82d737c996d086504d3ce78a604b0dfefd134273 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 1 Dec 2020 11:27:26 -0800 Subject: [PATCH] Use BlobsCheckpointStore in scale monitor (#17241) --- .../BlobsCheckpointStore.Diagnostics.cs | 25 +- .../BlobsCheckpointStore.cs | 55 ++-- .../src/BlobsCheckpointStore.cs | 72 ++++++ .../src/Config/EventHubOptions.cs | 55 +++- .../src/EventHubConsumerClientImpl.cs | 30 +++ .../src/IEventHubConsumerClient.cs | 18 ++ .../src/Listeners/EventHubListener.cs | 27 +- .../src/Listeners/EventHubsScaleMonitor.cs | 194 ++++---------- ....Azure.WebJobs.Extensions.EventHubs.csproj | 23 +- .../src/Processor/EventProcessorHost.cs | 17 +- .../src/Resources.Designer.cs | 72 ++++++ .../src/Resources.resx | 24 ++ ...EventHubTriggerAttributeBindingProvider.cs | 22 +- .../tests/BlobsCheckpointStoreTests.cs | 83 ++++++ .../tests/EventHubListenerTests.cs | 15 +- .../tests/EventHubTests.cs | 3 +- .../tests/EventHubsScaleMonitorTests.cs | 242 ++++++++---------- 17 files changed, 625 insertions(+), 352 deletions(-) create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/BlobsCheckpointStore.cs create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubConsumerClientImpl.cs create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/IEventHubConsumerClient.cs create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.Designer.cs create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.resx create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs index 43e3bcfada572..8eaad10767d23 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; using Azure.Messaging.EventHubs.Processor.Diagnostics; namespace Azure.Messaging.EventHubs.Processor @@ -46,10 +47,10 @@ partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventH /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. - /// The message for the exception that occurred. + /// The exception that occurred. /// - partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) => - Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage); + partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// /// Indicates that an attempt to retrieve a list of ownership has started. @@ -81,10 +82,10 @@ partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string even /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. - /// The message for the exception that occurred. + /// The exception that occurred. /// - partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) => - Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage); + partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. @@ -117,10 +118,10 @@ partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHu /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. - /// The message for the exception that occurred. + /// The exception that occurred. /// - partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) => - Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage); + partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// /// Indicates that an attempt to update a checkpoint has completed. @@ -168,10 +169,10 @@ partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNam /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. - /// The message for the exception that occurred. + /// The exception that occurred. /// - partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string errorMessage) => - Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, errorMessage); + partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception) => + Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, exception.Message); /// /// Indicates that ownership was unable to be claimed. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 5e3ba844f960f..2e73deaa5f37a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -21,7 +21,7 @@ namespace Azure.Messaging.EventHubs.Processor /// A storage blob service that keeps track of checkpoints and ownership. /// /// - internal sealed partial class BlobsCheckpointStore : StorageManager + internal partial class BlobsCheckpointStore : StorageManager { #pragma warning disable CA1802 // Use a constant field /// A message to use when throwing exception when checkpoint container or blob does not exists. @@ -135,7 +135,7 @@ async Task> listOwnershipAsync(Cancellati } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { - ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message); + ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex); throw new RequestFailedException(BlobsResourceDoesNotExist); } finally @@ -239,12 +239,12 @@ async Task> uploadBlobAsync(CancellationToken uploadTo } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode == BlobErrorCode.BlobNotFound) { - ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex.Message); + ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex); throw new RequestFailedException(BlobsResourceDoesNotExist); } catch (Exception ex) { - ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex.Message); + ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex); throw; } finally @@ -286,13 +286,17 @@ async Task> listCheckpointsAsync(Cancellat { var partitionId = blob.Name.Substring(prefix.Length); var startingPosition = default(EventPosition?); + var offset = default(long?); + var sequenceNumber = default(long?); if (blob.Metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var result)) { + offset = result; startingPosition = EventPosition.FromOffset(result, false); } else if (blob.Metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out result)) { + sequenceNumber = result; startingPosition = EventPosition.FromSequenceNumber(result, false); } @@ -301,13 +305,15 @@ async Task> listCheckpointsAsync(Cancellat if (startingPosition.HasValue) { - checkpoints.Add(new EventProcessorCheckpoint + checkpoints.Add(new BlobStorageCheckpoint { FullyQualifiedNamespace = fullyQualifiedNamespace, EventHubName = eventHubName, ConsumerGroup = consumerGroup, PartitionId = partitionId, - StartingPosition = startingPosition.Value + StartingPosition = startingPosition.Value, + Offset = offset, + SequenceNumber = sequenceNumber }); } else @@ -326,12 +332,12 @@ async Task> listCheckpointsAsync(Cancellat } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { - ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message); + ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex); throw new RequestFailedException(BlobsResourceDoesNotExist); } catch (Exception ex) { - ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message); + ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex); throw; } finally @@ -382,18 +388,17 @@ await ApplyRetryPolicy(async token => { using var blobContent = new MemoryStream(Array.Empty()); await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: token).ConfigureAwait(false); - }, cancellationToken).ConfigureAwait(false); } } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { - UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex.Message); + UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex); throw new RequestFailedException(BlobsResourceDoesNotExist); } catch (Exception ex) { - UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex.Message); + UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex); throw; } finally @@ -506,9 +511,9 @@ async Task wrapper(CancellationToken token) /// The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. - /// The message for the exception that occurred. + /// The message for the exception that occurred. /// - partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage); + partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); /// /// Indicates that an attempt to retrieve a list of ownership has started. @@ -538,9 +543,9 @@ async Task wrapper(CancellationToken token) /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. - /// The message for the exception that occurred. + /// The message for the exception that occurred. /// - partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage); + partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); /// /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. @@ -571,9 +576,9 @@ async Task wrapper(CancellationToken token) /// The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. - /// The message for the exception that occurred. + /// The message for the exception that occurred. /// - partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage); + partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); /// /// Indicates that an attempt to update a checkpoint has completed. @@ -618,9 +623,9 @@ async Task wrapper(CancellationToken token) /// The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. - /// The message for the exception that occurred. + /// The message for the exception that occurred. /// - partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string errorMessage); + partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception); /// /// Indicates that ownership was unable to be claimed. @@ -668,5 +673,15 @@ async Task wrapper(CancellationToken token) /// The name of the associated container client. /// partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName); + + /// + /// Contains the information to reflect the state of event processing for a given Event Hub partition. + /// Provides access to the offset and the sequence number retrieved from the blob. + /// + public class BlobStorageCheckpoint : EventProcessorCheckpoint + { + public long? Offset { get; set; } + public long? SequenceNumber { get; set; } + } } -} +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/BlobsCheckpointStore.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/BlobsCheckpointStore.cs new file mode 100644 index 0000000000000..4eb051571906e --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/BlobsCheckpointStore.cs @@ -0,0 +1,72 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Azure.Storage.Blobs; +using Microsoft.Extensions.Logging; + +namespace Azure.Messaging.EventHubs.Processor +{ + internal partial class BlobsCheckpointStore + { + private readonly string _functionId; + private readonly ILogger _logger; + + /// + /// The mocking constructor. + /// + protected BlobsCheckpointStore() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The client used to interact with the Azure Blob Storage service. + /// The retry policy to use as the basis for interacting with the Storage Blobs service. + /// The function id for diagnostic messages. + /// The logger to use for diagnostic messages. + /// + public BlobsCheckpointStore(BlobContainerClient blobContainerClient, + EventHubsRetryPolicy retryPolicy, + string functionId, + ILogger logger): this(blobContainerClient, retryPolicy) + { + _functionId = functionId; + _logger = logger; + } + + /// + /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. + /// + /// + /// The identifier of the partition the data is associated with. + /// The fully qualified Event Hubs namespace the data is associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the data is associated with. + /// + partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) + { + _logger.LogWarning( + "Function '{functionId}': An invalid checkpoint was found for partition: '{partitionId}' of FullyQualifiedNamespace: '{fullyQualifiedNamespace}'; EventHubName: '{eventHubName}'; ConsumerGroup: '{consumerGroup}'. This checkpoint is not valid and will be ignored.", + _functionId, partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + } + + /// + /// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints. + /// + /// + /// The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to {yournamespace}.servicebus.windows.net. + /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. + /// The name of the consumer group the ownership are associated with. + /// The exception that occurred. + /// + partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) + { + _logger.LogWarning(exception, + "Function '{functionId}': An exception occurred when listing checkpoints for FullyQualifiedNamespace: '{fullyQualifiedNamespace}'; EventHubName: '{eventHubName}'; ConsumerGroup: '{consumerGroup}'.", + _functionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index e214e83a73dbc..4fbf27b890562 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -8,11 +8,15 @@ using System.Text; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; using Azure.Messaging.EventHubs.Producer; +using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Hosting; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -242,29 +246,18 @@ internal EventHubProducerClient GetEventHubProducerClient(string eventHubName, s } // Lookup a listener for receiving events given the name provided in the [EventHubTrigger] attribute. - internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string eventHubName, string consumerGroup) + internal EventProcessorHost GetEventProcessorHost(string eventHubName, string consumerGroup) { ReceiverCreds creds; if (this._receiverCreds.TryGetValue(eventHubName, out creds)) { - if (consumerGroup == null) - { - consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; - } - var storageConnectionString = creds.StorageConnectionString; - if (storageConnectionString == null) - { - string defaultStorageString = config.GetWebJobsConnectionString(ConnectionStringNames.Storage); - storageConnectionString = defaultStorageString; - } + consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; // Use blob prefix support available in EPH starting in 2.2.6 EventProcessorHost host = new EventProcessorHost( eventHubName: eventHubName, consumerGroupName: consumerGroup, eventHubConnectionString: creds.EventHubConnectionString, - storageConnectionString: storageConnectionString, - leaseContainerName: LeaseContainerName, exceptionHandler: _exceptionHandler); return host; @@ -273,6 +266,42 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string throw new InvalidOperationException("No event hub receiver named " + eventHubName); } + internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, string consumerGroup) + { + ReceiverCreds creds; + if (this._receiverCreds.TryGetValue(eventHubName, out creds)) + { + consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; + + // Use blob prefix support available in EPH starting in 2.2.6 + return new EventHubConsumerClientImpl(new EventHubConsumerClient( + consumerGroup, + creds.EventHubConnectionString, + eventHubName)); + } + + throw new InvalidOperationException("No event hub receiver named " + eventHubName); + } + + + internal string GetCheckpointStoreConnectionString(IConfiguration config, string eventHubName) + { + ReceiverCreds creds; + if (this._receiverCreds.TryGetValue(eventHubName, out creds)) + { + var storageConnectionString = creds.StorageConnectionString; + if (storageConnectionString == null) + { + string defaultStorageString = config.GetWebJobsConnectionString(ConnectionStringNames.Storage); + storageConnectionString = defaultStorageString; + } + + return storageConnectionString; + } + + throw new InvalidOperationException("No event hub receiver named " + eventHubName); + } + private static string EscapeStorageCharacter(char character) { var ordinalValue = (ushort)character; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubConsumerClientImpl.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubConsumerClientImpl.cs new file mode 100644 index 0000000000000..5c8f32c62698a --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubConsumerClientImpl.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; + +namespace Microsoft.Azure.WebJobs +{ + // TODO: remove when https://github.com/Azure/azure-sdk-for-net/issues/9117 is fixed + internal class EventHubConsumerClientImpl : IEventHubConsumerClient + { + private readonly EventHubConsumerClient _client; + + public EventHubConsumerClientImpl(EventHubConsumerClient client) + { + _client = client; + } + + public string EventHubName => _client.EventHubName; + + public string FullyQualifiedNamespace => _client.FullyQualifiedNamespace; + + public string ConsumerGroup => _client.ConsumerGroup; + + public async Task GetPartitionsAsync() => (await _client.GetEventHubPropertiesAsync().ConfigureAwait(false)).PartitionIds; + + public Task GetPartitionPropertiesAsync(string partitionId) => _client.GetPartitionPropertiesAsync(partitionId); + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/IEventHubConsumerClient.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/IEventHubConsumerClient.cs new file mode 100644 index 0000000000000..7f3a6a33c8dd5 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/IEventHubConsumerClient.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; + +namespace Microsoft.Azure.WebJobs +{ + // TODO: remove when https://github.com/Azure/azure-sdk-for-net/issues/9117 is fixed + internal interface IEventHubConsumerClient + { + string EventHubName { get; } + string FullyQualifiedNamespace { get; } + string ConsumerGroup { get; } + Task GetPartitionsAsync(); + Task GetPartitionPropertiesAsync(string partitionId); + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index 526ed8d0e9b91..653f9f0d636e8 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -11,7 +11,6 @@ using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Processor; -using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host.Executors; @@ -28,6 +27,7 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca private readonly ITriggeredFunctionExecutor _executor; private readonly EventProcessorHost _eventProcessorHost; private readonly bool _singleDispatch; + private readonly BlobsCheckpointStore _checkpointStore; private readonly EventHubOptions _options; private readonly ILogger _logger; private bool _started; @@ -39,23 +39,24 @@ public EventHubListener( ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, + Func clientFactory, + BlobsCheckpointStore checkpointStore, EventHubOptions options, - ILogger logger, - BlobContainerClient blobContainer = null) + ILogger logger) { + _logger = logger; _executor = executor; _eventProcessorHost = eventProcessorHost; _singleDispatch = singleDispatch; + _checkpointStore = checkpointStore; _options = options; - _logger = logger; - _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor( - functionId, - eventProcessorHost.EventHubName, - eventProcessorHost.ConsumerGroupName, - eventProcessorHost.EventHubConnectionString, - eventProcessorHost.StorageConnectionString, - _logger, - blobContainer)); + + _scaleMonitor = new Lazy( + () => new EventHubsScaleMonitor( + functionId, + clientFactory(), + checkpointStore, + _logger)); } void IListener.Cancel() @@ -69,7 +70,7 @@ void IDisposable.Dispose() public async Task StartAsync(CancellationToken cancellationToken) { - await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _options.EventProcessorOptions).ConfigureAwait(false); + await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _checkpointStore, _options.EventProcessorOptions).ConfigureAwait(false); _started = true; } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScaleMonitor.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScaleMonitor.cs index a42f4cb406119..90180d173e556 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScaleMonitor.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScaleMonitor.cs @@ -3,86 +3,45 @@ using System; using System.Collections.Generic; -using System.Globalization; using System.Linq; -using System.Net; using System.Threading.Tasks; -using Azure; using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; -using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; -using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; namespace Microsoft.Azure.WebJobs.EventHubs.Listeners { internal class EventHubsScaleMonitor : IScaleMonitor { - private const string EventHubContainerName = "azure-webjobs-eventhub"; private const int PartitionLogIntervalInMinutes = 5; private readonly string _functionId; - private readonly string _eventHubName; - private readonly string _consumerGroup; - private readonly string _connectionString; - private readonly string _storageConnectionString; - private readonly Lazy _client; - private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; + private readonly IEventHubConsumerClient _client; private readonly ILogger _logger; + private readonly BlobsCheckpointStore _checkpointStore; - private BlobContainerClient _blobContainer; private DateTime _nextPartitionLogTime; private DateTime _nextPartitionWarningTime; public EventHubsScaleMonitor( string functionId, - string eventHubName, - string consumerGroup, - string connectionString, - string storageConnectionString, - ILogger logger, - BlobContainerClient blobContainer = null) + IEventHubConsumerClient client, + BlobsCheckpointStore checkpointStore, + ILogger logger) { _functionId = functionId; - _eventHubName = eventHubName; - _consumerGroup = consumerGroup; - _connectionString = connectionString; - _storageConnectionString = storageConnectionString; _logger = logger; - _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-EventHubTrigger-{_eventHubName}-{_consumerGroup}".ToLowerInvariant()); + _checkpointStore = checkpointStore; _nextPartitionLogTime = DateTime.UtcNow; _nextPartitionWarningTime = DateTime.UtcNow; - _blobContainer = blobContainer; + _client = client; - EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(connectionString); - builder.EntityPath = eventHubName; - - _client = new Lazy(() => new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, builder.ToString())); - } - - public ScaleMonitorDescriptor Descriptor - { - get - { - return _scaleMonitorDescriptor; - } + Descriptor = new ScaleMonitorDescriptor($"{_functionId}-EventHubTrigger-{_client.EventHubName}-{_client.ConsumerGroup}".ToLowerInvariant()); } - private BlobContainerClient BlobContainer - { - get - { - if (_blobContainer == null) - { - BlobServiceClient blobService = new BlobServiceClient(_storageConnectionString); - _blobContainer = blobService.GetBlobContainerClient(EventHubContainerName); - } - return _blobContainer; - } - } + public ScaleMonitorDescriptor Descriptor { get; } async Task IScaleMonitor.GetMetricsAsync() { @@ -92,33 +51,50 @@ async Task IScaleMonitor.GetMetricsAsync() public async Task GetMetricsAsync() { EventHubsTriggerMetrics metrics = new EventHubsTriggerMetrics(); - EventHubProperties runtimeInfo = null; + string[] partitions = null; try { - runtimeInfo = await _client.Value.GetEventHubPropertiesAsync().ConfigureAwait(false); + partitions = await _client.GetPartitionsAsync().ConfigureAwait(false); + metrics.PartitionCount = partitions.Length; } catch (Exception e) { - _logger.LogWarning($"Encountered an exception while checking EventHub '{_eventHubName}'. Error: {e.Message}"); + _logger.LogWarning($"Encountered an exception while checking EventHub '{_client.EventHubName}'. Error: {e.Message}"); return metrics; } // Get the PartitionRuntimeInformation for all partitions - _logger.LogInformation($"Querying partition information for {runtimeInfo.PartitionIds.Length} partitions."); - var tasks = new Task[runtimeInfo.PartitionIds.Length]; + _logger.LogInformation($"Querying partition information for {partitions.Length} partitions."); + var tasks = new Task[partitions.Length]; - for (int i = 0; i < runtimeInfo.PartitionIds.Length; i++) + for (int i = 0; i < partitions.Length; i++) { - tasks[i] = _client.Value.GetPartitionPropertiesAsync(runtimeInfo.PartitionIds[i]); + tasks[i] = _client.GetPartitionPropertiesAsync(partitions[i]); } await Task.WhenAll(tasks).ConfigureAwait(false); - return await CreateTriggerMetrics(tasks.Select(t => t.Result).ToList()).ConfigureAwait(false); + IEnumerable checkpoints; + try + { + checkpoints = await _checkpointStore.ListCheckpointsAsync( + _client.FullyQualifiedNamespace, + _client.EventHubName, + _client.ConsumerGroup, + default) + .ConfigureAwait(false); + } + catch + { + // ListCheckpointsAsync would log + return metrics; + } + + return CreateTriggerMetrics(tasks.Select(t => t.Result).ToList(), checkpoints.ToArray()); } - internal async Task CreateTriggerMetrics(List partitionRuntimeInfo, bool alwaysLog = false) + private EventHubsTriggerMetrics CreateTriggerMetrics(List partitionRuntimeInfo, EventProcessorCheckpoint[] checkpoints, bool alwaysLog = false) { long totalUnprocessedEventCount = 0; bool logPartitionInfo = alwaysLog ? true : DateTime.UtcNow >= _nextPartitionLogTime; @@ -130,24 +106,22 @@ internal async Task CreateTriggerMetrics(List partitionErrors = new List(); for (int i = 0; i < partitionRuntimeInfo.Count; i++) { - Tuple partitionLeaseFile = await GetPartitionLeaseFileAsync(i).ConfigureAwait(false); - BlobParitionCheckpoint partitionLeaseInfo = partitionLeaseFile.Item1; - string errorMsg = partitionLeaseFile.Item2; + var partitionProperties = partitionRuntimeInfo[i]; - if (partitionRuntimeInfo[i] == null || partitionLeaseInfo == null) + var checkpoint = (BlobsCheckpointStore.BlobStorageCheckpoint)checkpoints.SingleOrDefault(c => c.PartitionId == partitionProperties.Id); + if (checkpoint == null) { - partitionErrors.Add(errorMsg); + partitionErrors.Add($"Unable to find a checkpoint information for partition: {partitionProperties.Id}"); + continue; } - else + + // Check for the unprocessed messages when there are messages on the event hub parition + // In that case, LastEnqueuedSequenceNumber will be >= 0 + if ((partitionProperties.LastEnqueuedSequenceNumber != -1 && partitionProperties.LastEnqueuedSequenceNumber != checkpoint.SequenceNumber) + || (checkpoint.Offset == null && partitionProperties.LastEnqueuedSequenceNumber >= 0)) { - // Check for the unprocessed messages when there are messages on the event hub parition - // In that case, LastEnqueuedSequenceNumber will be >= 0 - if ((partitionRuntimeInfo[i].LastEnqueuedSequenceNumber != -1 && partitionRuntimeInfo[i].LastEnqueuedSequenceNumber != partitionLeaseInfo.SequenceNumber) - || (partitionLeaseInfo.Offset == null && partitionRuntimeInfo[i].LastEnqueuedSequenceNumber >= 0)) - { - long partitionUnprocessedEventCount = GetUnprocessedEventCount(partitionRuntimeInfo[i], partitionLeaseInfo); - totalUnprocessedEventCount += partitionUnprocessedEventCount; - } + long partitionUnprocessedEventCount = GetUnprocessedEventCount(partitionProperties, checkpoint); + totalUnprocessedEventCount += partitionUnprocessedEventCount; } } @@ -173,56 +147,9 @@ internal async Task CreateTriggerMetrics(List> GetPartitionLeaseFileAsync(int partitionId) - { - BlobParitionCheckpoint blobParitionCheckpoint = null; - string prefix = $"{EventHubOptions.GetBlobPrefix(_eventHubName, _client.Value.FullyQualifiedNamespace)}{_consumerGroup}/checkpoint/{partitionId}"; - string errorMsg = null; - - try - { - BlobClient blockBlob = BlobContainer.GetBlobClient(prefix); - BlobProperties properties = await blockBlob.GetPropertiesAsync().ConfigureAwait(false); - - if (properties.Metadata.TryGetValue(SequenceNumberMetadataName, out string sequenceNumberString)) - { - blobParitionCheckpoint ??= new BlobParitionCheckpoint(); - blobParitionCheckpoint.SequenceNumber = long.Parse(sequenceNumberString, CultureInfo.InvariantCulture); - } - - if (properties.Metadata.TryGetValue(OffsetMetadataName, out string offsetString)) - { - blobParitionCheckpoint ??= new BlobParitionCheckpoint(); - blobParitionCheckpoint.Offset = long.Parse(offsetString, CultureInfo.InvariantCulture); - } - - if (blobParitionCheckpoint == null) - { - errorMsg = $"Checkpoint file did not contain required metadata on Partition: '{partitionId}', " + - $"EventHub: '{_eventHubName}', '{_consumerGroup}'."; - } - } - catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound) - { - errorMsg = $"Checkpoint file data could not be found for blob on Partition: '{partitionId}', " + - $"EventHub: '{_eventHubName}', '{_consumerGroup}'. Error: {e.Message}"; - } - catch (Exception e) - { - errorMsg = $"Encountered exception while checking for last checkpointed sequence number for blob " + - $"on Partition: '{partitionId}', EventHub: '{_eventHubName}', Consumer Group: '{_consumerGroup}'. Error: {e.Message}"; - } - - return new Tuple(blobParitionCheckpoint, errorMsg); - } // Get the number of unprocessed events by deriving the delta between the server side info and the partition lease info, - private static long GetUnprocessedEventCount(PartitionProperties partitionInfo, BlobParitionCheckpoint partitionLeaseInfo) + private static long GetUnprocessedEventCount(PartitionProperties partitionInfo, BlobsCheckpointStore.BlobStorageCheckpoint partitionLeaseInfo) { long partitionLeaseInfoSequenceNumber = partitionLeaseInfo.SequenceNumber ?? 0; @@ -287,7 +214,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, EventHubsTriggerMetrics[ status.Vote = ScaleVote.ScaleIn; _logger.LogInformation($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."); _logger.LogInformation($"Number of instances ({workerCount}) is too high relative to number " + - $"of partitions ({partitionCount}) for EventHubs entity ({_eventHubName}, {_consumerGroup})."); + $"of partitions ({partitionCount}) for EventHubs entity ({_client.EventHubName}, {_client.ConsumerGroup})."); return status; } @@ -303,7 +230,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, EventHubsTriggerMetrics[ { status.Vote = ScaleVote.ScaleOut; _logger.LogInformation($"EventCount ({latestEventCount}) > WorkerCount ({workerCount}) * 1,000."); - _logger.LogInformation($"Event count ({latestEventCount}) for EventHubs entity ({_eventHubName}, {_consumerGroup}) " + + _logger.LogInformation($"Event count ({latestEventCount}) for EventHubs entity ({_client.EventHubName}, {_client.ConsumerGroup}) " + $"is too high relative to the number of instances ({workerCount})."); return status; } @@ -313,7 +240,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, EventHubsTriggerMetrics[ if (isIdle) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"'{_eventHubName}' is idle."); + _logger.LogInformation($"'{_client.EventHubName}' is idle."); return status; } @@ -329,7 +256,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, EventHubsTriggerMetrics[ if (eventCountIncreasing) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"Event count is increasing for '{_eventHubName}'."); + _logger.LogInformation($"Event count is increasing for '{_client.EventHubName}'."); return status; } } @@ -342,11 +269,11 @@ private ScaleStatus GetScaleStatusCore(int workerCount, EventHubsTriggerMetrics[ if (eventCountDecreasing) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Event count is decreasing for '{_eventHubName}'."); + _logger.LogInformation($"Event count is decreasing for '{_client.EventHubName}'."); return status; } - _logger.LogInformation($"EventHubs entity '{_eventHubName}' is steady."); + _logger.LogInformation($"EventHubs entity '{_client.EventHubName}' is steady."); return status; } @@ -364,14 +291,5 @@ private static bool IsTrueForLastN(IList samples, int c return true; } - - // The BlobParitionCheckpoint class used for reading blob lease data for a partition from storage. The Offset and SequenceNumber - // are stored as storage metdata on the blob. - private class BlobParitionCheckpoint - { - public long? Offset { get; set; } - - public long? SequenceNumber { get; set; } - } } } \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index c021d4adac8e4..a07e3f740b577 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -18,10 +18,25 @@ - - - - + + + + + + + + True + True + Resources.resx + + + + + + ResXFileCodeGenerator + Resources.Designer.cs + Azure.Messaging.EventHubs.Core + diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index fdca41bad5b12..5ff36cc9a2c75 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -10,7 +10,6 @@ using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; -using Azure.Storage.Blobs; namespace Microsoft.Azure.WebJobs.EventHubs.Processor { @@ -19,22 +18,18 @@ internal class EventProcessorHost public string EventHubName { get; } public string ConsumerGroupName { get; } public string EventHubConnectionString { get; } - public string StorageConnectionString { get; } - private string LeaseContainerName { get; } private Processor CurrentProcessor { get; set; } private Action ExceptionHandler { get; } - public EventProcessorHost(string eventHubName, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, Action exceptionHandler) + public EventProcessorHost(string eventHubName, string consumerGroupName, string eventHubConnectionString, Action exceptionHandler) { EventHubName = eventHubName; ConsumerGroupName = consumerGroupName; EventHubConnectionString = eventHubConnectionString; - StorageConnectionString = storageConnectionString; - LeaseContainerName = leaseContainerName; ExceptionHandler = exceptionHandler; } - public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory factory, int maxBatchSize, bool invokeProcessorAfterReceiveTimeout, EventProcessorOptions options) + public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory factory, int maxBatchSize, bool invokeProcessorAfterReceiveTimeout, BlobsCheckpointStore checkpointStore, EventProcessorOptions options) { if (CurrentProcessor != null) { @@ -49,7 +44,8 @@ public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory fact factory, invokeProcessorAfterReceiveTimeout, ExceptionHandler, - new BlobContainerClient(StorageConnectionString, LeaseContainerName)); + checkpointStore + ); await CurrentProcessor.StartProcessingAsync().ConfigureAwait(false); } @@ -76,16 +72,15 @@ internal class Processor : EventProcessor private bool InvokeProcessorAfterRecieveTimeout { get; } private Action ExceptionHandler { get; } private ConcurrentDictionary LeaseInfos { get; } - private BlobsCheckpointStore CheckpointStore { get; } - public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobContainerClient containerClient) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) + public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobsCheckpointStore checkpointStore) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) { ProcessorFactory = processorFactory; InvokeProcessorAfterRecieveTimeout = invokeProcessorAfterRecieveTimeout; ExceptionHandler = exceptionHandler; LeaseInfos = new ConcurrentDictionary(); - CheckpointStore = new BlobsCheckpointStore(containerClient, RetryPolicy); + CheckpointStore = checkpointStore; } protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.Designer.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.Designer.cs new file mode 100644 index 0000000000000..459d143b96d7a --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.Designer.cs @@ -0,0 +1,72 @@ +//------------------------------------------------------------------------------ +// +// This code was generated by a tool. +// Runtime Version:4.0.30319.42000 +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// +//------------------------------------------------------------------------------ + +namespace Azure.Messaging.EventHubs.Core { + using System; + + + /// + /// A strongly-typed resource class, for looking up localized strings, etc. + /// + // This class was auto-generated by the StronglyTypedResourceBuilder + // class via a tool like ResGen or Visual Studio. + // To add or remove a member, edit your .ResX file then rerun ResGen + // with the /str option, or rebuild your VS project. + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")] + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] + internal class Resources { + + private static global::System.Resources.ResourceManager resourceMan; + + private static global::System.Globalization.CultureInfo resourceCulture; + + [global::System.Diagnostics.CodeAnalysis.SuppressMessageAttribute("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] + internal Resources() { + } + + /// + /// Returns the cached ResourceManager instance used by this class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Resources.ResourceManager ResourceManager { + get { + if (object.ReferenceEquals(resourceMan, null)) { + global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("Microsoft.Azure.WebJobs.Extensions.EventHubs.Resources", typeof(Resources).Assembly); + resourceMan = temp; + } + return resourceMan; + } + } + + /// + /// Overrides the current thread's CurrentUICulture property for all + /// resource lookups using this strongly typed resource class. + /// + [global::System.ComponentModel.EditorBrowsableAttribute(global::System.ComponentModel.EditorBrowsableState.Advanced)] + internal static global::System.Globalization.CultureInfo Culture { + get { + return resourceCulture; + } + set { + resourceCulture = value; + } + } + + /// + /// Looks up a localized string similar to The requested retry mode, '{0}', is not known; a retry delay cannot be determined.. + /// + internal static string UnknownRetryMode { + get { + return ResourceManager.GetString("UnknownRetryMode", resourceCulture); + } + } + } +} diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.resx b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.resx new file mode 100644 index 0000000000000..64f576fe23af8 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Resources.resx @@ -0,0 +1,24 @@ + + + + + + + + + + text/microsoft-resx + + + 1.3 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + The requested retry mode, '{0}', is not known; a retry delay cannot be determined. + + \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs index 0bb9e19c2995a..30368a0ff6eb1 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs @@ -5,6 +5,10 @@ using System.Reflection; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; +using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Listeners; @@ -59,24 +63,32 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex string consumerGroup = attribute.ConsumerGroup ?? EventHubConsumerClient.DefaultConsumerGroupName; string resolvedConsumerGroup = _nameResolver.ResolveWholeString(consumerGroup); - string connectionString = null; if (!string.IsNullOrWhiteSpace(attribute.Connection)) { - attribute.Connection = _nameResolver.ResolveWholeString(attribute.Connection); - connectionString = _config.GetConnectionStringOrSetting(attribute.Connection); + var connection = _nameResolver.ResolveWholeString(attribute.Connection); + var connectionString = _config.GetConnectionStringOrSetting(connection); _options.Value.AddReceiver(resolvedEventHubName, connectionString); } - var eventHostListener = _options.Value.GetEventProcessorHost(_config, resolvedEventHubName, resolvedConsumerGroup); + var eventHostListener = _options.Value.GetEventProcessorHost(resolvedEventHubName, resolvedConsumerGroup); + var checkpointStoreConnectionString = _options.Value.GetCheckpointStoreConnectionString(_config, resolvedEventHubName); Func> createListener = (factoryContext, singleDispatch) => { + var checkpointStore = new BlobsCheckpointStore( + new BlobContainerClient(checkpointStoreConnectionString, _options.Value.LeaseContainerName), + _options.Value.EventProcessorOptions.RetryOptions.ToRetryPolicy(), + factoryContext.Descriptor.Id, + _logger); + IListener listener = new EventHubListener( factoryContext.Descriptor.Id, factoryContext.Executor, eventHostListener, singleDispatch, + () => _options.Value.GetEventHubConsumerClient(resolvedEventHubName, consumerGroup), + checkpointStore, _options.Value, _logger); return Task.FromResult(listener); @@ -85,7 +97,7 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex #pragma warning disable 618 ITriggerBinding binding = BindingFactory.GetTriggerBinding(new EventHubTriggerBindingStrategy(), parameter, _converterManager, createListener); #pragma warning restore 618 - return Task.FromResult(binding); + return Task.FromResult(binding); } } // end class } \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs new file mode 100644 index 0000000000000..74d716b27aa4c --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs @@ -0,0 +1,83 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Core; +using Azure.Messaging.EventHubs.Processor; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Moq; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests +{ + public class BlobsCheckpointStoreTests + { + private readonly string _eventHubName = "TestEventHubName"; + private readonly string _consumerGroup = "TestConsumerGroup"; + private readonly string _namespace = "TestNamespace"; + private readonly string _functionId = "EventHubsTriggerFunction"; + + [Test] + public void ListCheckpointsAsync_LogsOnRequestErrors() + { + var testLoggerProvider = new TestLoggerProvider(); + Mock containerClientMock = new Mock(MockBehavior.Strict); + containerClientMock.Setup(c => c.GetBlobsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Throws(new RequestFailedException("Uh oh")); + + BlobsCheckpointStore store = new BlobsCheckpointStore( + containerClientMock.Object, + new BasicRetryPolicy(new EventHubsRetryOptions()), + _functionId, + testLoggerProvider.CreateLogger("TestLogger") + ); + + Assert.ThrowsAsync(async () => await store.ListCheckpointsAsync(_namespace, _eventHubName, _consumerGroup, CancellationToken.None)); + + var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); + var expectedWarning = "Function 'EventHubsTriggerFunction': An exception occurred when listing checkpoints for " + + "FullyQualifiedNamespace: 'TestNamespace'; EventHubName: 'TestEventHubName'; ConsumerGroup: 'TestConsumerGroup'."; + Assert.AreEqual(expectedWarning, warning.FormattedMessage); + testLoggerProvider.ClearAllLogMessages(); + } + + [Test] + public async Task ListCheckpointsAsync_LogsOnInvalidCheckpoints() + { + var testLoggerProvider = new TestLoggerProvider(); + Mock containerClientMock = new Mock(MockBehavior.Strict); + containerClientMock.Setup(c => c.GetBlobsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(AsyncPageable.FromPages(new[] + { + Page.FromValues(new[] + { + BlobsModelFactory.BlobItem("testnamespace/testeventhubname/testconsumergroup/checkpoint/0", false, BlobsModelFactory.BlobItemProperties(false), metadata: new Dictionary()) + }, null, Mock.Of()) + })); + + BlobsCheckpointStore store = new BlobsCheckpointStore( + containerClientMock.Object, + new BasicRetryPolicy(new EventHubsRetryOptions()), + _functionId, + testLoggerProvider.CreateLogger("TestLogger") + ); + + await store.ListCheckpointsAsync(_namespace, _eventHubName, _consumerGroup, CancellationToken.None); + + var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); + var expectedWarning = "Function 'EventHubsTriggerFunction': An invalid checkpoint was found for partition: '0' of " + + "FullyQualifiedNamespace: 'TestNamespace'; EventHubName: 'TestEventHubName'; ConsumerGroup: 'TestConsumerGroup'. " + + "This checkpoint is not valid and will be ignored."; + Assert.AreEqual(expectedWarning, warning.FormattedMessage); + testLoggerProvider.ClearAllLogMessages(); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 3cbcba40e4caa..794be1dd69816 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -196,16 +196,21 @@ public void GetMonitor_ReturnsExpectedValue() var host = new EventProcessorHost( eventHubName, consumerGroup, - "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", - "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null); + "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", null); + + var consumerClientMock = new Mock(); + consumerClientMock.SetupGet(c => c.ConsumerGroup).Returns(consumerGroup); + consumerClientMock.SetupGet(c => c.EventHubName).Returns(eventHubName); + var listener = new EventHubListener( functionId, - new Mock(MockBehavior.Strict).Object, + Mock.Of(), host, false, + () => consumerClientMock.Object, + Mock.Of(), new EventHubOptions(), - testLogger, - new Mock(MockBehavior.Strict).Object); + testLogger); IScaleMonitor scaleMonitor = listener.GetMonitor(); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs index bb226ca9e8eb5..7fdf92089844e 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs @@ -7,6 +7,7 @@ using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host; @@ -257,7 +258,7 @@ internal static ProcessorPartitionContext GetPartitionContext(string partitionId null, false, null, - Mock.Of()); + Mock.Of()); return new ProcessorPartitionContext(partitionId, processor, s => default); } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs index f68057970126d..ab41a36ab2f0c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs @@ -8,16 +8,14 @@ using System.Threading.Tasks; using Azure; using Azure.Messaging.EventHubs; -using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.WebJobs.EventHubs.Listeners; -using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Extensions.Logging; using Moq; -using Moq.Language; using NUnit.Framework; using static Moq.It; @@ -28,30 +26,41 @@ public class EventHubsScaleMonitorTests private readonly string _functionId = "EventHubsTriggerFunction"; private readonly string _eventHubName = "TestEventHubName"; private readonly string _consumerGroup = "TestConsumerGroup"; - private readonly string _eventHubConnectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123="; - private readonly string _storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net"; - + private readonly string _namespace = "TestNamespace"; private EventHubsScaleMonitor _scaleMonitor; - private Mock _mockBlobContainer; + private Mock _mockCheckpointStore; private TestLoggerProvider _loggerProvider; private LoggerFactory _loggerFactory; + private Mock _consumerClientMock; + + private IEnumerable _partitions; + private IEnumerable _checkpoints; [SetUp] public void SetUp() { - _mockBlobContainer = new Mock(MockBehavior.Strict); _loggerFactory = new LoggerFactory(); _loggerProvider = new TestLoggerProvider(); _loggerFactory.AddProvider(_loggerProvider); + _consumerClientMock = new Mock(MockBehavior.Strict); + _consumerClientMock.Setup(c => c.ConsumerGroup).Returns(_consumerGroup); + _consumerClientMock.Setup(c => c.EventHubName).Returns(_eventHubName); + _consumerClientMock.Setup(c => c.FullyQualifiedNamespace).Returns(_namespace); + _consumerClientMock.Setup(client => client.GetPartitionsAsync()) + .Returns(() => Task.FromResult(_partitions.Select(p => p.Id).ToArray())); + _consumerClientMock.Setup(client => client.GetPartitionPropertiesAsync(IsAny())) + .Returns((string id) => Task.FromResult(_partitions.SingleOrDefault(p => p.Id == id))); + + _mockCheckpointStore = new Mock(MockBehavior.Strict); + _mockCheckpointStore.Setup(s => s.ListCheckpointsAsync(_namespace, _eventHubName, _consumerGroup, default)) + .Returns(() => Task.FromResult(_checkpoints)); + _scaleMonitor = new EventHubsScaleMonitor( _functionId, - _eventHubName, - _consumerGroup, - _eventHubConnectionString, - _storageConnectionString, - _loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("EventHub")), - _mockBlobContainer.Object); + _consumerClientMock.Object, + _mockCheckpointStore.Object, + _loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("EventHub"))); } [Test] @@ -63,61 +72,63 @@ public void ScaleMonitorDescriptor_ReturnsExpectedValue() [Test] public async Task CreateTriggerMetrics_ReturnsExpectedResult() { - EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(_eventHubConnectionString); - string prefix = $"{sb.Endpoint.Host}/{_eventHubName.ToLower()}/{_consumerGroup}/checkpoint/0"; - - var mockBlobReference = new Mock(MockBehavior.Strict); - var sequence = mockBlobReference.SetupSequence(m => m.GetPropertiesAsync(IsAny(), IsAny())); - - _mockBlobContainer - .Setup(c => c.GetBlobClient(prefix)) - .Returns(mockBlobReference.Object); - - SetupBlobMock(sequence, 0, 0); - - var partitionInfo = new List + _partitions = new List { new TestPartitionProperties(lastSequenceNumber: 0) }; - var metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + _checkpoints = new EventProcessorCheckpoint[] + { + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 0 } + }; + + var metrics = await _scaleMonitor.GetMetricsAsync(); Assert.AreEqual(0, metrics.EventCount); Assert.AreEqual(1, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); // Partition got its first message (Offset == null, LastEnqueued == 0) - SetupBlobMock(sequence, null, 0); + _checkpoints = new EventProcessorCheckpoint[] + { + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = null, SequenceNumber = 0 } + }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + metrics = await _scaleMonitor.GetMetricsAsync(); Assert.AreEqual(1, metrics.EventCount); Assert.AreEqual(1, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); // No instances assigned to process events on partition (Offset == null, LastEnqueued > 0) - SetupBlobMock(sequence, null, 0); + _checkpoints = new EventProcessorCheckpoint[] + { + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = null, SequenceNumber = 0 } + }; - partitionInfo = new List + _partitions = new List { new TestPartitionProperties(lastSequenceNumber: 5) }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + metrics = await _scaleMonitor.GetMetricsAsync(); Assert.AreEqual(6, metrics.EventCount); Assert.AreEqual(1, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); // Checkpointing is ahead of partition info (SequenceNumber > LastEnqueued) - SetupBlobMock(sequence, 25, 11); + _checkpoints = new EventProcessorCheckpoint[] + { + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 25, SequenceNumber = 11 } + }; - partitionInfo = new List + _partitions = new List { new TestPartitionProperties(lastSequenceNumber: 10) }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + metrics = await _scaleMonitor.GetMetricsAsync(); Assert.AreEqual(0, metrics.EventCount); Assert.AreEqual(1, metrics.PartitionCount); @@ -127,118 +138,69 @@ public async Task CreateTriggerMetrics_ReturnsExpectedResult() [Test] public async Task CreateTriggerMetrics_MultiplePartitions_ReturnsExpectedResult() { - EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(_eventHubConnectionString); - - var mockBlobReference = new Mock(MockBehavior.Strict); - var sequence = mockBlobReference.SetupSequence(m => m.GetPropertiesAsync(IsAny(), IsAny())); - - _mockBlobContainer - .Setup(c => c.GetBlobClient(IsAny())) - .Returns(mockBlobReference.Object); // No messages processed, no messages in queue - SetupBlobMock(sequence, 0, 0); - SetupBlobMock(sequence, 0, 0); - SetupBlobMock(sequence, 0, 0); + _checkpoints = new EventProcessorCheckpoint[] + { + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 0, PartitionId = "1" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 0, PartitionId = "2" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 0, PartitionId = "3" } + }; - var partitionInfo = new List + _partitions = new List { - new TestPartitionProperties(lastSequenceNumber: 0), - new TestPartitionProperties(lastSequenceNumber: 0), - new TestPartitionProperties(lastSequenceNumber: 0) + new TestPartitionProperties(lastSequenceNumber: 0, partitionId: "1"), + new TestPartitionProperties(lastSequenceNumber: 0, partitionId: "2"), + new TestPartitionProperties(lastSequenceNumber: 0, partitionId: "3") }; - var metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + var metrics = await _scaleMonitor.GetMetricsAsync(); Assert.AreEqual(0, metrics.EventCount); Assert.AreEqual(3, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); // Messages processed, Messages in queue - SetupBlobMock(sequence, 0, 2); - SetupBlobMock(sequence, 0, 3); - SetupBlobMock(sequence, 0, 4); - - partitionInfo = new List + _checkpoints = new EventProcessorCheckpoint[] { - new TestPartitionProperties(lastSequenceNumber: 12), - new TestPartitionProperties(lastSequenceNumber: 13), - new TestPartitionProperties(lastSequenceNumber: 14) + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 2, PartitionId = "1" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 3, PartitionId = "2" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 4, PartitionId = "3" } }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); - - Assert.AreEqual(30, metrics.EventCount); - Assert.AreEqual(3, metrics.PartitionCount); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // One invalid sample - SetupBlobMock(sequence, 0, 2); - SetupBlobMock(sequence, 0, 3); - SetupBlobMock(sequence, 0, 4); - - partitionInfo = new List + _partitions = new List { - new TestPartitionProperties(lastSequenceNumber: 12), - new TestPartitionProperties(lastSequenceNumber: 13), - new TestPartitionProperties(lastSequenceNumber: 1) + new TestPartitionProperties(lastSequenceNumber: 12, partitionId: "1"), + new TestPartitionProperties(lastSequenceNumber: 13, partitionId: "2"), + new TestPartitionProperties(lastSequenceNumber: 14, partitionId: "3") }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo); + metrics = await _scaleMonitor.GetMetricsAsync(); - Assert.AreEqual(20, metrics.EventCount); + Assert.AreEqual(30, metrics.EventCount); Assert.AreEqual(3, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - } - - [Test] - public async Task CreateTriggerMetrics_HandlesExceptions() - { - // StorageException - _mockBlobContainer - .Setup(c => c.GetBlobClient(IsAny())) - .Throws(new RequestFailedException(404, "Uh oh")); - var partitionInfo = new List + // One invalid sample + _checkpoints = new EventProcessorCheckpoint[] { - new TestPartitionProperties() + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 2, PartitionId = "1" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 3, PartitionId = "2" }, + new BlobsCheckpointStore.BlobStorageCheckpoint { Offset = 0, SequenceNumber = 4, PartitionId = "3" } }; - var metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo, true); - - Assert.AreEqual(1, metrics.PartitionCount); - Assert.AreEqual(0, metrics.EventCount); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); - var expectedWarning = $"Function '{_functionId}': Unable to deserialize partition or lease info with the following errors: " + - $"Checkpoint file data could not be found for blob on Partition: '0', EventHub: '{_eventHubName}', " + - $"'{_consumerGroup}'. Error: Uh oh"; - Assert.AreEqual(expectedWarning, warning.FormattedMessage); - _loggerProvider.ClearAllLogMessages(); - - // Generic Exception - _mockBlobContainer - .Setup(c => c.GetBlobClient(IsAny())) - .Throws(new Exception("Uh oh")); - - partitionInfo = new List + _partitions = new List { - new TestPartitionProperties() + new TestPartitionProperties(lastSequenceNumber: 12, partitionId: "1"), + new TestPartitionProperties(lastSequenceNumber: 13, partitionId: "2"), + new TestPartitionProperties(lastSequenceNumber: 1, partitionId: "3") }; - metrics = await _scaleMonitor.CreateTriggerMetrics(partitionInfo, true); + metrics = await _scaleMonitor.GetMetricsAsync(); - Assert.AreEqual(1, metrics.PartitionCount); - Assert.AreEqual(0, metrics.EventCount); + Assert.AreEqual(20, metrics.EventCount); + Assert.AreEqual(3, metrics.PartitionCount); Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); - expectedWarning = $"Function '{_functionId}': Unable to deserialize partition or lease info with the following errors: " + - $"Encountered exception while checking for last checkpointed sequence number for blob on Partition: '0', " + - $"EventHub: '{_eventHubName}', Consumer Group: '{_consumerGroup}'. Error: Uh oh"; - Assert.AreEqual(expectedWarning, warning.FormattedMessage); - _loggerProvider.ClearAllLogMessages(); } [Test] @@ -445,23 +407,43 @@ public void GetScaleStatus_EventHubSteady_ReturnsVote_ScaleIn() Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); Assert.AreEqual($"EventHubs entity '{_eventHubName}' is steady.", log.FormattedMessage); } - - private static void SetupBlobMock(ISetupSequentialResult>> mock, int? offset, int? sequencenumber) + [Test] + public async Task CreateTriggerMetrics_HandlesExceptions() { - var metadata = new Dictionary(); - if (offset != null) + // StorageException + _mockCheckpointStore + .Setup(c => c.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Throws(new RequestFailedException(404, "Uh oh")); + + _partitions = new List { - metadata.Add("offset", offset.ToString()); - } + new TestPartitionProperties() + }; + + var metrics = await _scaleMonitor.GetMetricsAsync(); + + Assert.AreEqual(1, metrics.PartitionCount); + Assert.AreEqual(0, metrics.EventCount); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // Generic Exception + _mockCheckpointStore + .Setup(c => c.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Throws(new Exception("Uh oh")); - if (sequencenumber != null) + _partitions = new List { - metadata.Add("sequencenumber", sequencenumber.ToString()); - } + new TestPartitionProperties() + }; + + metrics = await _scaleMonitor.GetMetricsAsync(); - var response = Response.FromValue(BlobsModelFactory.BlobProperties(default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, default, metadata, default, default, default, default), Mock.Of()); + Assert.AreEqual(1, metrics.PartitionCount); + Assert.AreEqual(0, metrics.EventCount); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - mock.ReturnsAsync(response); + _loggerProvider.ClearAllLogMessages(); } + } }