Skip to content

Commit

Permalink
Use BlobsCheckpointStore in scale monitor (#17241)
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym authored Dec 1, 2020
1 parent 7eeb660 commit 82d737c
Show file tree
Hide file tree
Showing 17 changed files with 625 additions and 352 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Azure.Messaging.EventHubs.Processor.Diagnostics;

namespace Azure.Messaging.EventHubs.Processor
Expand Down Expand Up @@ -46,10 +47,10 @@ partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventH
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The exception that occurred.</param>
///
partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) =>
Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage);
partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);

/// <summary>
/// Indicates that an attempt to retrieve a list of ownership has started.
Expand Down Expand Up @@ -81,10 +82,10 @@ partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string even
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The exception that occurred.</param>
///
partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) =>
Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage);
partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);

/// <summary>
/// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints.
Expand Down Expand Up @@ -117,10 +118,10 @@ partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHu
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The exception that occurred.</param>
///
partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, errorMessage);
partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand Down Expand Up @@ -168,10 +169,10 @@ partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNam
/// <param name="eventHubName">The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership is associated with.</param>
/// <param name="ownerIdentifier">The identifier of the processor that attempted to claim the ownership for.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The exception that occurred.</param>
///
partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string errorMessage) =>
Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, errorMessage);
partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception) =>
Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, exception.Message);

/// <summary>
/// Indicates that ownership was unable to be claimed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Azure.Messaging.EventHubs.Processor
/// A storage blob service that keeps track of checkpoints and ownership.
/// </summary>
///
internal sealed partial class BlobsCheckpointStore : StorageManager
internal partial class BlobsCheckpointStore : StorageManager
{
#pragma warning disable CA1802 // Use a constant field
/// <summary>A message to use when throwing exception when checkpoint container or blob does not exists.</summary>
Expand Down Expand Up @@ -135,7 +135,7 @@ async Task<List<EventProcessorPartitionOwnership>> listOwnershipAsync(Cancellati
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
}
finally
Expand Down Expand Up @@ -239,12 +239,12 @@ async Task<Response<BlobContentInfo>> uploadBlobAsync(CancellationToken uploadTo
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode == BlobErrorCode.BlobNotFound)
{
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex.Message);
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
}
catch (Exception ex)
{
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex.Message);
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex);
throw;
}
finally
Expand Down Expand Up @@ -286,13 +286,17 @@ async Task<IEnumerable<EventProcessorCheckpoint>> listCheckpointsAsync(Cancellat
{
var partitionId = blob.Name.Substring(prefix.Length);
var startingPosition = default(EventPosition?);
var offset = default(long?);
var sequenceNumber = default(long?);

if (blob.Metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var result))
{
offset = result;
startingPosition = EventPosition.FromOffset(result, false);
}
else if (blob.Metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out result))
{
sequenceNumber = result;
startingPosition = EventPosition.FromSequenceNumber(result, false);
}

Expand All @@ -301,13 +305,15 @@ async Task<IEnumerable<EventProcessorCheckpoint>> listCheckpointsAsync(Cancellat

if (startingPosition.HasValue)
{
checkpoints.Add(new EventProcessorCheckpoint
checkpoints.Add(new BlobStorageCheckpoint
{
FullyQualifiedNamespace = fullyQualifiedNamespace,
EventHubName = eventHubName,
ConsumerGroup = consumerGroup,
PartitionId = partitionId,
StartingPosition = startingPosition.Value
StartingPosition = startingPosition.Value,
Offset = offset,
SequenceNumber = sequenceNumber
});
}
else
Expand All @@ -326,12 +332,12 @@ async Task<IEnumerable<EventProcessorCheckpoint>> listCheckpointsAsync(Cancellat
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
}
catch (Exception ex)
{
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex.Message);
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw;
}
finally
Expand Down Expand Up @@ -382,18 +388,17 @@ await ApplyRetryPolicy(async token =>
{
using var blobContent = new MemoryStream(Array.Empty<byte>());
await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: token).ConfigureAwait(false);

}, cancellationToken).ConfigureAwait(false);
}
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex.Message);
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
}
catch (Exception ex)
{
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex.Message);
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex);
throw;
}
finally
Expand Down Expand Up @@ -506,9 +511,9 @@ async Task wrapper(CancellationToken token)
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the ownership are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The message for the exception that occurred.</param>
///
partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage);
partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);

/// <summary>
/// Indicates that an attempt to retrieve a list of ownership has started.
Expand Down Expand Up @@ -538,9 +543,9 @@ async Task wrapper(CancellationToken token)
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoints are associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership are associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The message for the exception that occurred.</param>
///
partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage);
partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);

/// <summary>
/// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints.
Expand Down Expand Up @@ -571,9 +576,9 @@ async Task wrapper(CancellationToken token)
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace the checkpoint is associated with. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The message for the exception that occurred.</param>
///
partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string errorMessage);
partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand Down Expand Up @@ -618,9 +623,9 @@ async Task wrapper(CancellationToken token)
/// <param name="eventHubName">The name of the specific Event Hub the ownership is associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the ownership is associated with.</param>
/// <param name="ownerIdentifier">The identifier of the processor that attempted to claim the ownership for.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="exception">The message for the exception that occurred.</param>
///
partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string errorMessage);
partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception);

/// <summary>
/// Indicates that ownership was unable to be claimed.
Expand Down Expand Up @@ -668,5 +673,15 @@ async Task wrapper(CancellationToken token)
/// <param name="containerName">The name of the associated container client.</param>
///
partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName);

/// <summary>
/// Contains the information to reflect the state of event processing for a given Event Hub partition.
/// Provides access to the offset and the sequence number retrieved from the blob.
/// </summary>
public class BlobStorageCheckpoint : EventProcessorCheckpoint
{
public long? Offset { get; set; }
public long? SequenceNumber { get; set; }
}
}
}
}
Loading

0 comments on commit 82d737c

Please sign in to comment.