diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.CheckpointStore.Blob/tests/Infrastructure/StorageScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs.CheckpointStore.Blob/tests/Infrastructure/StorageScope.cs
index e9e386489acd2..a5e745c0d3ee9 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs.CheckpointStore.Blob/tests/Infrastructure/StorageScope.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.CheckpointStore.Blob/tests/Infrastructure/StorageScope.cs
@@ -24,10 +24,10 @@ namespace Azure.Messaging.EventHubs.CheckpointStore.Blob.Tests.Infrastructure
public sealed class StorageScope : IAsyncDisposable
{
/// The set of characters considered invalid in a blob container name.
- private static readonly Regex s_invalidContainerCharactersExpression = new Regex("[^a-z0-9]", RegexOptions.IgnoreCase | RegexOptions.Compiled);
+ private static readonly Regex InvalidContainerCharactersExpression = new Regex("[^a-z0-9]", RegexOptions.IgnoreCase | RegexOptions.Compiled);
/// The manager for common live test resource operations.
- private static readonly LiveResourceManager s_resourceManager = new LiveResourceManager();
+ private static readonly LiveResourceManager ResourceManager = new LiveResourceManager();
/// Serves as a sentinel flag to denote when the instance has been disposed.
private bool _disposed = false;
@@ -62,12 +62,12 @@ public async ValueTask DisposeAsync()
var resourceGroup = TestEnvironment.EventHubsResourceGroup;
var storageAccount = StorageTestEnvironment.StorageAccountName;
- var token = await s_resourceManager.AquireManagementTokenAsync();
+ var token = await ResourceManager.AquireManagementTokenAsync();
var client = new StorageManagementClient(new TokenCredentials(token)) { SubscriptionId = TestEnvironment.EventHubsSubscription };
try
{
- await s_resourceManager.CreateRetryPolicy().ExecuteAsync(() => client.BlobContainers.DeleteAsync(resourceGroup, storageAccount, ContainerName));
+ await ResourceManager.CreateRetryPolicy().ExecuteAsync(() => client.BlobContainers.DeleteAsync(resourceGroup, storageAccount, ContainerName));
}
catch
{
@@ -96,18 +96,18 @@ public async ValueTask DisposeAsync()
///
public static async Task CreateAsync([CallerMemberName] string caller = "")
{
- caller = s_invalidContainerCharactersExpression.Replace(caller.ToLowerInvariant(), string.Empty);
+ caller = InvalidContainerCharactersExpression.Replace(caller.ToLowerInvariant(), string.Empty);
caller = (caller.Length < 16) ? caller : caller.Substring(0, 15);
var resourceGroup = TestEnvironment.EventHubsResourceGroup;
var storageAccount = StorageTestEnvironment.StorageAccountName;
- var token = await s_resourceManager.AquireManagementTokenAsync();
+ var token = await ResourceManager.AquireManagementTokenAsync();
string CreateName() => $"{ Guid.NewGuid().ToString("D").Substring(0, 13) }-{ caller }";
using (var client = new StorageManagementClient(new TokenCredentials(token)) { SubscriptionId = TestEnvironment.EventHubsSubscription })
{
- BlobContainer container = await s_resourceManager.CreateRetryPolicy().ExecuteAsync(() => client.BlobContainers.CreateAsync(resourceGroup, storageAccount, CreateName(), PublicAccess.None));
+ BlobContainer container = await ResourceManager.CreateRetryPolicy().ExecuteAsync(() => client.BlobContainers.CreateAsync(resourceGroup, storageAccount, CreateName(), PublicAccess.None));
return new StorageScope(container.Name);
}
}
@@ -123,18 +123,18 @@ public static async Task CreateStorageAccountAsync()
{
var subscription = TestEnvironment.EventHubsSubscription;
var resourceGroup = TestEnvironment.EventHubsResourceGroup;
- var token = await s_resourceManager.AquireManagementTokenAsync();
+ var token = await ResourceManager.AquireManagementTokenAsync();
static string CreateName() => $"neteventhubs{ Guid.NewGuid().ToString("N").Substring(0, 12) }";
using (var client = new StorageManagementClient(new TokenCredentials(token)) { SubscriptionId = subscription })
{
- var location = await s_resourceManager.QueryResourceGroupLocationAsync(token, resourceGroup, subscription);
+ var location = await ResourceManager.QueryResourceGroupLocationAsync(token, resourceGroup, subscription);
var sku = new Sku(SkuName.StandardLRS, SkuTier.Standard);
- var parameters = new StorageAccountCreateParameters(sku, Kind.BlobStorage, location: location, tags: s_resourceManager.GenerateTags(), accessTier: AccessTier.Hot);
- StorageAccount storageAccount = await s_resourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.CreateAsync(resourceGroup, CreateName(), parameters));
+ var parameters = new StorageAccountCreateParameters(sku, Kind.BlobStorage, location: location, tags: ResourceManager.GenerateTags(), accessTier: AccessTier.Hot);
+ StorageAccount storageAccount = await ResourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.CreateAsync(resourceGroup, CreateName(), parameters));
- StorageAccountListKeysResult storageKeys = await s_resourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.ListKeysAsync(resourceGroup, storageAccount.Name));
+ StorageAccountListKeysResult storageKeys = await ResourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.ListKeysAsync(resourceGroup, storageAccount.Name));
return new StorageProperties(storageAccount.Name, $"DefaultEndpointsProtocol=https;AccountName={ storageAccount.Name };AccountKey={ storageKeys.Keys[0].Value };EndpointSuffix=core.windows.net");
}
}
@@ -150,11 +150,11 @@ public static async Task DeleteStorageAccountAsync(string accountName)
{
var subscription = TestEnvironment.EventHubsSubscription;
var resourceGroup = TestEnvironment.EventHubsResourceGroup;
- var token = await s_resourceManager.AquireManagementTokenAsync();
+ var token = await ResourceManager.AquireManagementTokenAsync();
using (var client = new StorageManagementClient(new TokenCredentials(token)) { SubscriptionId = subscription })
{
- await s_resourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.DeleteAsync(resourceGroup, accountName));
+ await ResourceManager.CreateRetryPolicy().ExecuteAsync(() => client.StorageAccounts.DeleteAsync(resourceGroup, accountName));
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
index f31488f4a71e8..89411344b86be 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
@@ -9,8 +10,12 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
+using Azure.Messaging.EventHubs.Authorization;
using Azure.Messaging.EventHubs.Core;
+using Azure.Messaging.EventHubs.Diagnostics;
+using Azure.Messaging.EventHubs.Errors;
using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp.Transport;
@@ -33,8 +38,8 @@ internal class AmqpConnectionScope : IDisposable
/// The URI scheme to apply when using web sockets for service communication.
private const string WebSocketsUriScheme = "wss";
- /// Indicates whether or not this instance has been disposed.
- private bool _disposed = false;
+ /// The string formatting mask to apply to the service endpoint to consume events for a given consumer group and partition.
+ private const string ConsumerPathSuffixMask = "/ConsumerGroups/{0}/Partitions/{1}";
///
/// The version of AMQP to use within the scope.
@@ -42,6 +47,48 @@ internal class AmqpConnectionScope : IDisposable
///
private static Version AmqpVersion { get; } = new Version(1, 0, 0, 0);
+ ///
+ /// The amount of buffer to apply to account for clock skew when
+ /// refreshing authorization. Authorization will be refreshed earlier
+ /// than the expected expiration by this amount.
+ ///
+ ///
+ private static TimeSpan AuthorizationRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
+
+ ///
+ /// The minimum amount of time for authorization to be refreshed; any calculations that
+ /// call for refreshing more frequently will be substituted with this value.
+ ///
+ ///
+ private static TimeSpan MinimumAuthorizationRefresh { get; } = TimeSpan.FromMinutes(4);
+
+ ///
+ /// The amount time to allow to refresh authorization of an AMQP link.
+ ///
+ ///
+ private static TimeSpan AuthorizationRefreshTimeout { get; } = TimeSpan.FromMinutes(3);
+
+ ///
+ /// Indicates whether this has been disposed.
+ ///
+ ///
+ /// true if disposed; otherwise, false.
+ ///
+ public bool IsDisposed { get; private set; }
+
+ ///
+ /// The cancellation token to use with operations initiated by the scope.
+ ///
+ ///
+ private CancellationTokenSource OperationCancellationSource { get; } = new CancellationTokenSource();
+
+ ///
+ /// The set of active AMQP links associated with the connection scope. These are considered children
+ /// of the active connection and should be managed as such.
+ ///
+ ///
+ private ConcurrentDictionary ActiveLinks { get; } = new ConcurrentDictionary();
+
///
/// The unique identifier of the scope.
///
@@ -61,10 +108,10 @@ internal class AmqpConnectionScope : IDisposable
private string EventHubName { get; }
///
- /// The credential to use for authorization with the Event Hubs service.
+ /// The provider to use for obtaining a token for authorization with the Event Hubs service.
///
///
- private TokenCredential Credential { get; }
+ private CbsTokenProvider TokenProvider { get; }
///
/// The type of transport to use for communication.
@@ -109,12 +156,12 @@ public AmqpConnectionScope(Uri serviceEndpoint,
ServiceEndpoint = serviceEndpoint;
EventHubName = eventHubName;
- Credential = credential;
Transport = transport;
Proxy = proxy;
+ TokenProvider = new CbsTokenProvider(new EventHubTokenCredential(credential, serviceEndpoint.ToString()), OperationCancellationSource.Token);
Id = identifier ?? $"{ eventHubName }-{ Guid.NewGuid().ToString("D").Substring(0, 8) }";
- Task connectionFactory(TimeSpan timeout) => CreateConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
+ Task connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
ActiveConnection = new FaultTolerantAmqpObject(connectionFactory, CloseConnection);
}
@@ -138,10 +185,60 @@ public async Task OpenManagementLinkAsync(TimeSpan time
cancellationToken.ThrowIfCancellationRequested();
var stopWatch = Stopwatch.StartNew();
- AmqpConnection connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
+ var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var link = await CreateManagementLinkAsync(connection, timeout.CalculateRemaining(stopWatch.Elapsed), cancellationToken).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
stopWatch.Stop();
+ return link;
+ }
+
+ ///
+ /// Opens an AMQP link for use with consumer operations.
+ ///
+ ///
+ /// The name of the consumer group in the context of which events should be received.
+ /// The identifier of the Event Hub partition from which events should be received.
+ /// The position of the event in the partition where the link should be filtered to.
+ /// The set of active options for the consumer that will make use of the link.
+ /// The timeout to apply when creating the link.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// A link for use with consumer operations.
+ ///
+ public async Task OpenConsumerLinkAsync(string consumerGroup,
+ string partitionId,
+ EventPosition eventPosition,
+ EventHubConsumerOptions consumerOptions,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
+ {
+ Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
+ Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
+ Argument.AssertNotNull(eventPosition, nameof(eventPosition));
+ Argument.AssertNotNull(consumerOptions, nameof(consumerOptions));
- return await OpenManagementLinkAsync(connection, timeout.CalculateRemaining(stopWatch.Elapsed), cancellationToken).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var stopWatch = Stopwatch.StartNew();
+ var consumerEndpoint = new Uri(ServiceEndpoint, string.Format(ConsumerPathSuffixMask, consumerGroup, partitionId));
+
+ var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var link = await CreateReceivingLinkAsync(connection, consumerEndpoint, eventPosition, consumerOptions, timeout.CalculateRemaining(stopWatch.Elapsed), cancellationToken).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ stopWatch.Stop();
+ return link;
}
///
@@ -151,13 +248,16 @@ public async Task OpenManagementLinkAsync(TimeSpan time
///
public void Dispose()
{
- if (_disposed)
+ if (IsDisposed)
{
return;
}
ActiveConnection?.Dispose();
- _disposed = true;
+ OperationCancellationSource.Cancel();
+ OperationCancellationSource.Dispose();
+
+ IsDisposed = true;
}
///
@@ -173,12 +273,12 @@ public void Dispose()
///
/// An AMQP connection that may be used for communicating with the Event Hubs service.
///
- protected virtual async Task CreateConnectionAsync(Version amqpVersion,
- Uri serviceEndpoint,
- TransportType transportType,
- IWebProxy proxy,
- string scopeIdentifier,
- TimeSpan timeout)
+ protected virtual async Task CreateAndOpenConnectionAsync(Version amqpVersion,
+ Uri serviceEndpoint,
+ TransportType transportType,
+ IWebProxy proxy,
+ string scopeIdentifier,
+ TimeSpan timeout)
{
var hostName = serviceEndpoint.Host;
AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
@@ -197,30 +297,35 @@ protected virtual async Task CreateConnectionAsync(Version amqpV
TransportBase transport = await initiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
var connection = new AmqpConnection(transport, amqpSettings, connectionSetings);
- await connection.OpenAsync(timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ await OpenAmqpObjectAsync(connection, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
stopWatch.Stop();
- // Create the CBS link that will be used for authorization and ensure that it is associated
- // with the connection.
+ // Create the CBS link that will be used for authorization. The act of creating the link will associate
+ // it with the connection.
+
+ new AmqpCbsLink(connection);
- var cbsLink = new AmqpCbsLink(connection);
+ // When the connection is closed, close each of the links associated with it.
- // TODO (pri2 // squire):
- // The act of creating the link should ensure that it is added to the connection. Unsure
- // of why this additional check was in the track one code. Investigate and either
- // document or remove.
+ EventHandler closeHandler = null;
- if (!connection.Extensions.Contains(typeof(AmqpCbsLink)))
+ closeHandler = (snd, args) =>
{
- connection.Extensions.Add(cbsLink);
- }
+ foreach (var link in ActiveLinks.Keys)
+ {
+ link.SafeClose();
+ }
+
+ connection.Closed -= closeHandler;
+ };
+ connection.Closed += closeHandler;
return connection;
}
///
- /// Opens an AMQP link for use with management operations.
+ /// Creates an AMQP link for use with management operations.
///
///
/// The active and opened AMQP connection to use for this link.
@@ -229,11 +334,11 @@ protected virtual async Task CreateConnectionAsync(Version amqpV
///
/// A link for use with management operations.
///
- protected virtual async Task OpenManagementLinkAsync(AmqpConnection connection,
- TimeSpan timeout,
- CancellationToken cancellationToken)
+ protected virtual async Task CreateManagementLinkAsync(AmqpConnection connection,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
{
- Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
+ Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
cancellationToken.ThrowIfCancellationRequested();
var session = default(AmqpSession);
@@ -246,7 +351,7 @@ protected virtual async Task OpenManagementLinkAsync(Am
var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
session = connection.CreateSession(sessionSettings);
- await session.OpenAsync(timeout).ConfigureAwait(false);
+ await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
// Create and open the link.
@@ -255,9 +360,128 @@ protected virtual async Task OpenManagementLinkAsync(Am
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.Elapsed).TotalMilliseconds);
var link = new RequestResponseAmqpLink(AmqpManagement.LinkType, session, AmqpManagement.Address, linkSettings.Properties);
- await link.OpenAsync(timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ stopWatch.Stop();
+
+ // Track the link before returning it, so that it can be managed with the scope.
+
+ BeginTrackingLinkAsActive(link);
+ return link;
+ }
+ catch
+ {
+ // Aborting the session will perform any necessary cleanup of
+ // the associated link as well.
+
+ session?.Abort();
+ throw;
+ }
+ }
+
+ ///
+ /// Creates an AMQP link for use with receiving operations.
+ ///
+ ///
+ /// The active and opened AMQP connection to use for this link.
+ /// The fully qualified endpoint to open the link for.
+ /// The position of the event in the partition where the link should be filtered to.
+ /// The set of active options for the consumer that will make use of the link.
+ /// The timeout to apply when creating the link.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// A link for use for operations related to receiving events.
+ ///
+ protected virtual async Task CreateReceivingLinkAsync(AmqpConnection connection,
+ Uri endpoint,
+ EventPosition eventPosition,
+ EventHubConsumerOptions consumerOptions,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
+ {
+ Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var session = default(AmqpSession);
+ var stopWatch = Stopwatch.StartNew();
+
+ try
+ {
+ // Perform the initial authorization for the link.
+
+ var authClaims = new[] { EventHubsClaim.Listen };
+ var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, TokenProvider, endpoint, endpoint.AbsoluteUri, endpoint.AbsoluteUri, authClaims, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // Create and open the AMQP session associated with the link.
+
+ var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
+ session = connection.CreateSession(sessionSettings);
+
+ await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // Create and open the link.
+
+ var filters = new FilterSet();
+ filters.Add(AmqpFilter.ConsumerFilterName, AmqpFilter.CreateConsumerFilter(AmqpFilter.BuildFilterExpression(eventPosition)));
+
+ var linkSettings = new AmqpLinkSettings
+ {
+ Role = true,
+ TotalLinkCredit = (uint)consumerOptions.PrefetchCount,
+ AutoSendFlow = consumerOptions.PrefetchCount > 0,
+ SettleType = SettleMode.SettleOnSend,
+ Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
+ Target = new Target { Address = Guid.NewGuid().ToString() }
+ };
+
+ linkSettings.AddProperty(AmqpProperty.EntityType, (int)AmqpProperty.Entity.ConsumerGroup);
+
+ if (!string.IsNullOrEmpty(consumerOptions.Identifier))
+ {
+ linkSettings.AddProperty(AmqpProperty.ConsumerIdentifier, consumerOptions.Identifier);
+ }
+
+ if (consumerOptions.OwnerLevel.HasValue)
+ {
+ linkSettings.AddProperty(AmqpProperty.OwnerLevel, consumerOptions.OwnerLevel.Value);
+ }
+
+ if (consumerOptions.TrackLastEnqueuedEventInformation)
+ {
+ linkSettings.DesiredCapabilities = new Multiple(new List
+ {
+ AmqpProperty.TrackLastEnqueuedEventInformation
+ });
+ }
+
+ var link = new ReceivingAmqpLink(linkSettings);
+ linkSettings.LinkName = $"{ Id };{ connection.Identifier };{ session.Identifier };{ link.Identifier }";
+ link.AttachTo(session);
stopWatch.Stop();
+
+ // Configure refresh for authorization of the link.
+
+ var refreshTimer = default(Timer);
+
+ var refreshHandler = CreateAuthorizationRefreshHandler
+ (
+ connection,
+ link,
+ TokenProvider,
+ endpoint,
+ endpoint.AbsoluteUri,
+ endpoint.AbsoluteUri,
+ authClaims,
+ AuthorizationRefreshTimeout,
+ () => refreshTimer
+ );
+
+ refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
+
+ // Track the link before returning it, so that it can be managed with the scope.
+
+ BeginTrackingLinkAsActive(link, refreshTimer);
return link;
}
catch
@@ -270,6 +494,49 @@ protected virtual async Task OpenManagementLinkAsync(Am
}
}
+ ///
+ /// Performs the actions needed to configure and begin tracking the specified AMQP
+ /// link as an active link bound to this scope.
+ ///
+ ///
+ /// The link to begin tracking.
+ /// The timer used to manage refreshing authorization, if the link requires it.
+ ///
+ ///
+ /// This method does operate on the specified in order to configure it
+ /// for active tracking; no assumptions are made about the open/connected state of the link nor are
+ /// its communication properties modified.
+ ///
+ ///
+ protected virtual void BeginTrackingLinkAsActive(AmqpObject link,
+ Timer authorizationRefreshTimer = null)
+ {
+ // Register the link as active and having authorization automatically refreshed, so that it can be
+ // managed with the scope.
+
+ if (!ActiveLinks.TryAdd(link, authorizationRefreshTimer))
+ {
+ throw new EventHubsException(true, EventHubName, Resources.CouldNotCreateLink);
+ }
+
+ // When the link is closed, stop refreshing authorization and remove it from the
+ // set of associated links.
+
+ var closeHandler = default(EventHandler);
+
+ closeHandler = (snd, args) =>
+ {
+ ActiveLinks.TryRemove(link, out var timer);
+
+ timer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
+ timer?.Dispose();
+
+ link.Closed -= closeHandler;
+ };
+
+ link.Closed += closeHandler;
+ }
+
///
/// Performs the tasks needed to close a connection.
///
@@ -278,6 +545,118 @@ protected virtual async Task OpenManagementLinkAsync(Am
///
protected virtual void CloseConnection(AmqpConnection connection) => connection.SafeClose();
+ ///
+ /// Calculates the interval after which authorization for an AMQP link should be
+ /// refreshed.
+ ///
+ ///
+ /// The date/time, in UTC, that the current authorization is expected to expire.
+ ///
+ /// The interval after which authorization should be refreshed.
+ ///
+ protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval(DateTime expirationTimeUtc)
+ {
+ var refreshDueInterval = (expirationTimeUtc.Subtract(DateTime.UtcNow)).Add(AuthorizationRefreshBuffer);
+ return (refreshDueInterval < MinimumAuthorizationRefresh) ? MinimumAuthorizationRefresh : refreshDueInterval;
+ }
+
+ ///
+ /// Creates the timer event handler to support refreshing AMQP link authorization
+ /// on a recurring basis.
+ ///
+ ///
+ /// The AMQP connection to which the link being refreshed is bound to.
+ /// The AMQO link to refresh authorization for.
+ /// The to use for obtaining access tokens.
+ /// The Event Hubs service endpoint that the AMQP link is communicating with.
+ /// The audience associated with the authorization. This is likely the absolute URI.
+ /// The resource associated with the authorization. This is likely the absolute URI.
+ /// The set of claims required to support the operations of the AMQP link.
+ /// The timeout to apply when requesting authorization refresh.
+ /// A function to allow retrieving the associated with the link authorization.
+ ///
+ /// A delegate to perform the refresh when a timer is due.
+ ///
+ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection,
+ AmqpObject amqpLink,
+ CbsTokenProvider tokenProvider,
+ Uri endpoint,
+ string audience,
+ string resource,
+ string[] requiredClaims,
+ TimeSpan refreshTimeout,
+ Func refreshTimerFactory)
+ {
+ return async _ =>
+ {
+ EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshStart(EventHubName, endpoint.AbsoluteUri);
+ var refreshTimer = refreshTimerFactory();
+
+ try
+ {
+ var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false);
+
+ // Reset the timer for the next refresh.
+
+ if (authExpirationUtc >= DateTimeOffset.UtcNow)
+ {
+ refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
+ }
+
+ EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshComplete(EventHubName, endpoint.AbsoluteUri);
+ }
+ catch (Exception ex)
+ {
+ EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Message);
+ refreshTimer.Change(Timeout.Infinite, Timeout.Infinite);
+ }
+ };
+ }
+
+ ///
+ /// Performs the actions needed to open a generic AMQP object, such
+ /// as a session or link for use.
+ ///
+ ///
+ /// The target AMQP object to open.
+ /// The timeout to apply when opening the link.
+ ///
+ protected virtual Task OpenAmqpObjectAsync(AmqpObject target,
+ TimeSpan timeout) => target.OpenAsync(timeout);
+
+ ///
+ /// Requests authorization for a connection or link using a connection via the CBS mechanism.
+ ///
+ ///
+ /// The AMQP connection for which the authorization is associated.
+ /// The to use for obtaining access tokens.
+ /// The Event Hubs service endpoint that the authorization is requested for.
+ /// The audience associated with the authorization. This is likely the absolute URI.
+ /// The resource associated with the authorization. This is likely the absolute URI.
+ /// The set of claims required to support the operations of the AMQP link.
+ /// The timeout to apply when requesting authorization.
+ ///
+ /// The date/time, in UTC, when the authorization expires.
+ ///
+ ///
+ /// It is assumed that there is a valid already associated
+ /// with the connection; this will be used as the transport for the authorization
+ /// credentials.
+ ///
+ ///
+ protected virtual Task RequestAuthorizationUsingCbsAsync(AmqpConnection connection,
+ CbsTokenProvider tokenProvider,
+ Uri endpoint,
+ string audience,
+ string resource,
+ string[] requiredClaims,
+ TimeSpan timeout)
+ {
+ var authLink = connection.Extensions.Find();
+ return authLink.SendTokenAsync(TokenProvider, endpoint, audience, resource, requiredClaims, timeout);
+ }
+
+
///
/// Creates the settings to use for AMQP communication.
///
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubClient.cs
index d7af7587dd264..a9821fd5049dc 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubClient.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubClient.cs
@@ -146,9 +146,6 @@ protected AmqpEventHubClient(string host,
{
EventHubsEventSource.Log.EventHubClientCreateStart(host, eventHubName);
- _retryPolicy = defaultRetryPolicy;
- _tryTimeout = _retryPolicy.CalculateTryTimeout(0);
-
EventHubName = eventHubName;
Credential = credential;
MessageConverter = messageConverter ?? new AmqpMessageConverter();
@@ -166,6 +163,9 @@ protected AmqpEventHubClient(string host,
ConnectionScope = connectionScope;
ManagementLink = new FaultTolerantAmqpObject(timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None), link => link.SafeClose());
+
+ _retryPolicy = defaultRetryPolicy;
+ _tryTimeout = _retryPolicy.CalculateTryTimeout(0);
}
finally
{
@@ -211,7 +211,6 @@ public override async Task GetPropertiesAsync(CancellationTo
var token = await AquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token);
-
cancellationToken.ThrowIfCancellationRequested();
var stopWatch = Stopwatch.StartNew();
@@ -223,12 +222,11 @@ public override async Task GetPropertiesAsync(CancellationTo
cancellationToken.ThrowIfCancellationRequested();
using AmqpMessage response = await link.RequestAsync(request, _tryTimeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
// Process the response.
- cancellationToken.ThrowIfCancellationRequested();
ThrowIfErrorResponse(response, EventHubName);
-
return MessageConverter.CreateEventHubPropertiesFromResponse(response);
}
catch (Exception ex)
@@ -269,7 +267,6 @@ public override async Task GetPartitionPropertiesAsync(stri
var token = await AquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, partitionId, token);
-
cancellationToken.ThrowIfCancellationRequested();
var stopWatch = Stopwatch.StartNew();
@@ -281,12 +278,12 @@ public override async Task GetPartitionPropertiesAsync(stri
cancellationToken.ThrowIfCancellationRequested();
using AmqpMessage response = await link.RequestAsync(request, _tryTimeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
// Process the response.
- cancellationToken.ThrowIfCancellationRequested();
- ThrowIfErrorResponse(response, EventHubName);
+ ThrowIfErrorResponse(response, EventHubName);
return MessageConverter.CreatePartitionPropertiesFromResponse(response);
}
catch (Exception ex)
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubConsumer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubConsumer.cs
new file mode 100755
index 0000000000000..7094bb7788c3f
--- /dev/null
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventHubConsumer.cs
@@ -0,0 +1,148 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Core;
+using Azure.Messaging.EventHubs.Core;
+using Azure.Messaging.EventHubs.Metadata;
+using Microsoft.Azure.Amqp;
+
+namespace Azure.Messaging.EventHubs.Amqp
+{
+ ///
+ /// A transport client abstraction responsible for brokering operations for AMQP-based connections.
+ /// It is intended that the public make use of an instance
+ /// via containment and delegate operations to it.
+ ///
+ ///
+ ///
+ ///
+ internal class AmqpEventHubConsumer : TransportEventHubConsumer
+ {
+ /// The active retry policy for the consumer.
+ private EventHubRetryPolicy _retryPolicy;
+
+ /// The amount of time to allow for an operation to complete before considering it to have timed out.
+ private TimeSpan _tryTimeout;
+
+ /// Indicates whether or not this instance has been closed.
+ private bool _closed = false;
+
+ ///
+ /// Indicates whether or not this consumer has been closed.
+ ///
+ ///
+ ///
+ /// true if the consumer is closed; otherwise, false.
+ ///
+ ///
+ public override bool Closed => _closed;
+
+ ///
+ /// The converter to use for translating between AMQP messages and client library
+ /// types.
+ ///
+ ///
+ private AmqpMessageConverter MessageConverter { get; }
+
+ ///
+ /// The AMQP connection scope responsible for managing transport constructs for this instance.
+ ///
+ ///
+ private AmqpConnectionScope ConnectionScope { get; }
+
+ ///
+ /// The AMQP link intended for use with receiving operations.
+ ///
+ ///
+ private FaultTolerantAmqpObject ReceiveLink { get; }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The name of the consumer group this consumer is associated with. Events are read in the context of this group.
+ /// The identifier of the Event Hub partition from which events will be received.
+ /// The set of active options for the consumer that will make use of the link.
+ /// The position of the event in the partition where the consumer should begin reading.
+ /// The AMQP connection context for operations .
+ /// The converter to use for translating between AMQP messages and client types.
+ /// The retry policy to consider when an operation fails.
+ /// The set of properties for the last event enqueued in a partition; if not requested in the consumer options, it is expected that this is null.
+ ///
+ ///
+ /// As an internal type, this class performs only basic sanity checks against its arguments. It
+ /// is assumed that callers are trusted and have performed deep validation.
+ ///
+ /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
+ /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
+ /// caller.
+ ///
+ ///
+ public AmqpEventHubConsumer(string consumerGroup,
+ string partitionId,
+ EventPosition eventPosition,
+ EventHubConsumerOptions consumerOptions,
+ AmqpConnectionScope connectionScope,
+ AmqpMessageConverter messageConverter,
+ EventHubRetryPolicy retryPolicy,
+ LastEnqueuedEventProperties lastEnqueuedEventProperties) : base(lastEnqueuedEventProperties)
+ {
+ Argument.AssertNotNullOrEmpty(consumerGroup, nameof(consumerGroup));
+ Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
+ Argument.AssertNotNull(connectionScope, nameof(connectionScope));
+ Argument.AssertNotNull(messageConverter, nameof(messageConverter));
+ Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
+
+ ConnectionScope = connectionScope;
+ MessageConverter = messageConverter;
+ ReceiveLink = new FaultTolerantAmqpObject(timeout => ConnectionScope.OpenConsumerLinkAsync(consumerGroup, partitionId, eventPosition, consumerOptions, timeout, CancellationToken.None), link => link.SafeClose());
+
+ _retryPolicy = retryPolicy;
+ _tryTimeout = retryPolicy.CalculateTryTimeout(0);
+ }
+
+ ///
+ /// Updates the active retry policy for the consumer.
+ ///
+ ///
+ /// The retry policy to set as active.
+ ///
+ public override void UpdateRetryPolicy(EventHubRetryPolicy newRetryPolicy)
+ {
+ Argument.AssertNotNull(newRetryPolicy, nameof(newRetryPolicy));
+
+ _retryPolicy = newRetryPolicy;
+ _tryTimeout = _retryPolicy.CalculateTryTimeout(0);
+ }
+
+ ///
+ /// Receives a batch of from the Event Hub partition.
+ ///
+ ///
+ /// The maximum number of messages to receive in this batch.
+ /// The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ /// The batch of from the Event Hub partition this consumer is associated with. If no events are present, an empty enumerable is returned.
+ ///
+ public override Task> ReceiveAsync(int maximumMessageCount,
+ TimeSpan? maximumWaitTime,
+ CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+
+ ///
+ /// Closes the connection to the transport consumer instance.
+ ///
+ ///
+ /// An optional instance to signal the request to cancel the operation.
+ ///
+ public override Task CloseAsync(CancellationToken cancellationToken) => throw new NotImplementedException();
+ }
+}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs
index 576ee6ba2e387..165919391e6c4 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs
@@ -1,6 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
+using System;
+using Azure.Core;
+using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Framing;
+
namespace Azure.Messaging.EventHubs
{
///
@@ -11,12 +16,66 @@ namespace Azure.Messaging.EventHubs
internal static class AmqpFilter
{
/// Indicates filtering based on the sequence number of a message.
- public const string SeqNumberName = "amqp.annotation.x-opt-sequence-number";
+ public const string SequenceNumberName = "amqp.annotation.x-opt-sequence-number";
/// Indicates filtering based on the offset of a message.
- public const string OffsetPartName = "amqp.annotation.x-opt-offset";
+ public const string OffsetName = "amqp.annotation.x-opt-offset";
/// Indicates filtering based on time that a message was enqueued.
- public const string ReceivedAtName = "amqp.annotation.x-opt-enqueued-time";
+ public const string EnqueuedTimeName = "amqp.annotation.x-opt-enqueued-time";
+
+ /// Identifies the filter type name.
+ public const string ConsumerFilterName = AmqpConstants.Apache + ":selector-filter:string";
+
+ /// Identifies the filter type code.
+ public const ulong ConsumerFilterCode = 0x00000137000000A;
+
+ ///
+ /// Creates an event consumer filter based on the specified expression.
+ ///
+ ///
+ /// The SQL-like expression to use for filtering events in the partition.
+ ///
+ /// An type to use in the filter map for a consumer AMQP link.
+ ///
+ public static AmqpDescribed CreateConsumerFilter(string filterExpression)
+ {
+ Argument.AssertNotNullOrEmpty(filterExpression, nameof(filterExpression));
+ return new AmqpDescribed(ConsumerFilterName, ConsumerFilterCode) { Value = filterExpression };
+ }
+
+ ///
+ /// Builds an AMQP filter expression for the specified event position.
+ ///
+ ///
+ /// The event position to use as the source for filtering.
+ ///
+ /// The AMQP filter expression that corresponds to the .
+ ///
+ public static string BuildFilterExpression(EventPosition eventPosition)
+ {
+ Argument.AssertNotNull(eventPosition, nameof(eventPosition));
+
+ // Build the filter expression, in the order of significance.
+
+ if (!string.IsNullOrEmpty(eventPosition.Offset))
+ {
+ return $"{ OffsetName } { (eventPosition.IsInclusive ? ">=" : ">") } { eventPosition.Offset }";
+ }
+
+ if (eventPosition.SequenceNumber.HasValue)
+ {
+ return $"{ SequenceNumberName } { (eventPosition.IsInclusive ? ">=" : ">") } { eventPosition.SequenceNumber.Value }";
+ }
+
+ if (eventPosition.EnqueuedTime.HasValue)
+ {
+ return $"{ EnqueuedTimeName } > { eventPosition.EnqueuedTime.Value.ToUnixTimeMilliseconds() }";
+ }
+
+ // If no filter was built, than the event position is not valid for filtering.
+
+ throw new ArgumentException(Resources.InvalidEventPositionForFilter, nameof(eventPosition));
+ }
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs
index 47deda6d2d0ed..02e90cf881465 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProperty.cs
@@ -25,6 +25,12 @@ internal static class AmqpProperty
///
public static AmqpSymbol EntityType { get; } = AmqpConstants.Vendor + ":entity-type";
+ ///
+ /// The capability for tracking the last event enqueued in a partition, to associate with a link.
+ ///
+ ///
+ public static AmqpSymbol TrackLastEnqueuedEventInformation { get; } = AmqpConstants.Vendor + ":enable-receiver-runtime-metric";
+
///
/// The timeout to associate with a link.
///
@@ -87,6 +93,31 @@ public static class Descriptor
public static AmqpSymbol DateTimeOffset { get; } = AmqpConstants.Vendor + ":datetime-offset";
}
+ ///
+ /// Represents the entity mapping for AMQP properties between the client library and
+ /// the Event Hubs service.
+ ///
+ ///
+ ///
+ /// WARNING:
+ /// These values are synchronized between the Event Hubs service and the client
+ /// library. You must consult with the Event Hubs service team before making
+ /// changes, including adding a new member.
+ ///
+ /// When adding a new member, remember to always do so before the Unknown
+ /// member.
+ ///
+ ///
+ public enum Entity
+ {
+ Namespace = 4,
+ EventHub = 7,
+ ConsumerGroup = 8,
+ Partition = 9,
+ Checkpoint = 10,
+ Unknown = 0x7FFFFFFE
+ }
+
///
/// Represents the type mapping for AMQP properties between the client library and
/// the Event Hubs service.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/CbsTokenProvider.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/CbsTokenProvider.cs
index 108467e94f39c..1dd19e0ad0ca4 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/CbsTokenProvider.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/CbsTokenProvider.cs
@@ -64,7 +64,9 @@ public CbsTokenProvider(EventHubTokenCredential credential,
/// The set of claims that are required for authorization.
/// The token to use for authorization.
///
- public async Task GetTokenAsync(Uri namespaceAddress, string appliesTo, string[] requiredClaims)
+ public async Task GetTokenAsync(Uri namespaceAddress,
+ string appliesTo,
+ string[] requiredClaims)
{
AccessToken token = await _credential.GetTokenAsync(new TokenRequest(requiredClaims), _cancellationToken);
return new CbsToken(token.Token, _tokenType, token.ExpiresOn.UtcDateTime);
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj b/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj
index ac296ff808d58..404371857dbbd 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj
@@ -29,10 +29,6 @@
-
-
-
-
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Compatibility/TrackOneEventHubConsumer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Compatibility/TrackOneEventHubConsumer.cs
index e6724ce1318b7..69f85a597929b 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Compatibility/TrackOneEventHubConsumer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Compatibility/TrackOneEventHubConsumer.cs
@@ -34,6 +34,16 @@ internal sealed class TrackOneEventHubConsumer : TransportEventHubConsumer
///
private TrackOne.PartitionReceiver TrackOneReceiver => _trackOneReceiver.Value;
+ ///
+ /// Indicates whether or not this consumer has been closed.
+ ///
+ ///
+ ///
+ /// true if the consumer is closed; otherwise, false.
+ ///
+ ///
+ public override bool Closed => (_trackOneReceiver.IsValueCreated) ? _trackOneReceiver.Value.EventHubClient.CloseCalled : false;
+
///
/// Initializes a new instance of the class.
///
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventHubConsumer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventHubConsumer.cs
index 1f7afc61d93e4..5118934c4ea29 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventHubConsumer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventHubConsumer.cs
@@ -18,6 +18,16 @@ namespace Azure.Messaging.EventHubs.Core
///
internal abstract class TransportEventHubConsumer
{
+ ///
+ /// Indicates whether or not this consumer has been closed.
+ ///
+ ///
+ ///
+ /// true if the consumer is closed; otherwise, false.
+ ///
+ ///
+ public virtual bool Closed { get; }
+
///
/// A set of information about the enqueued state of a partition, as observed by the consumer as
/// events are received from the Event Hubs service.
@@ -39,7 +49,7 @@ protected TransportEventHubConsumer(LastEnqueuedEventProperties lastEnqueuedEven
}
///
- /// Updates the active retry policy for the client.
+ /// Updates the active retry policy for the consumer.
///
///
/// The retry policy to set as active.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
index 14fbe28d0c4c6..995a6d94e9d28 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
@@ -397,6 +397,59 @@ public void SubscribeToPartitionError(string eventHubName,
}
}
+ ///
+ /// Indicates that refreshing authorization for an AMQP link has started.
+ ///
+ ///
+ /// The name of the Event Hub that the link is associated with.
+ /// The service endpoint that the link is bound to for communication.
+ ///
+ [Event(21, Level = EventLevel.Informational, Message = "Beginning refresh of AMQP link authorization for Event Hub: {0} (Service Endpoint: '{1}').")]
+ public void AmqpLinkAuthorizationRefreshStart(string eventHubName,
+ string endpoint)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(21, eventHubName ?? string.Empty, endpoint ?? string.Empty);
+ }
+ }
+
+ ///
+ /// Indicates that refreshing authorization for an AMQP link has completed.
+ ///
+ ///
+ /// The name of the Event Hub that the link is associated with.
+ /// The service endpoint that the link is bound to for communication.
+ ///
+ [Event(22, Level = EventLevel.Informational, Message = "Completed refresh of AMQP link authorization for Event Hub: {0} (Service Endpoint: '{1}').")]
+ public void AmqpLinkAuthorizationRefreshComplete(string eventHubName,
+ string endpoint)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(22, eventHubName ?? string.Empty, endpoint ?? string.Empty);
+ }
+ }
+
+ ///
+ /// Indicates that an exception was encountered while refreshing authorization for an AMQP link has started.
+ ///
+ ///
+ /// The name of the Event Hub that the link is associated with.
+ /// The service endpoint that the link is bound to for communication.
+ /// The message for the exception that occurred.
+ ///
+ [Event(23, Level = EventLevel.Error, Message = "An exception occurred while refreshing AMQP link authorization for Event Hub: {0} (Service Endpoint: '{1}'). Error Message: '{2}'")]
+ public void AmqpLinkAuthorizationRefreshError(string eventHubName,
+ string endpoint,
+ string errorMessage)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(23, eventHubName ?? string.Empty, endpoint ?? string.Empty, errorMessage ?? string.Empty);
+ }
+ }
+
///
/// Indicates that an exception was encountered in an unexpected code path, not directly associated with
/// an Event Hubs operation.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventPosition.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventPosition.cs
index bb1881eacf387..2296f5e044326 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventPosition.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventPosition.cs
@@ -183,6 +183,4 @@ private static EventPosition FromOffset(string offset,
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => base.ToString();
}
-
- //TODO: Implement the AMQP-specific methods from track 1 to a new abstraction. (They were not brought forward)
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.Designer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.Designer.cs
index 4e2501ccde736..edf2d6a7a5e78 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.Designer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.Designer.cs
@@ -338,5 +338,23 @@ internal static string ValueOutOfRange {
return ResourceManager.GetString("ValueOutOfRange", resourceCulture);
}
}
+
+ ///
+ /// Looks up a localized string similar to The event position is not valid for filtering. It must have an offset, sequence number, or enqueued time available to filter against..
+ ///
+ internal static string InvalidEventPositionForFilter {
+ get {
+ return ResourceManager.GetString("InvalidEventPositionForFilter", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to Unable to create the items needed to communicate with the Event Hubs service..
+ ///
+ internal static string CouldNotCreateLink {
+ get {
+ return ResourceManager.GetString("CouldNotCreateLink", resourceCulture);
+ }
+ }
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.resx b/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.resx
index ae512459e56d3..8f808f85c3662 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.resx
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Resources.resx
@@ -210,4 +210,10 @@
An invalid message body was encountered. Either the body was null or an incorrect type. Expected: {0}
-
\ No newline at end of file
+
+ The event position is not valid for filtering. It must have an offset, sequence number, or enqueued time available to filter against.
+
+
+ Unable to create the items needed to communicate with the Event Hubs service.
+
+
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
index 44186c89cbcb3..fdcaec62dd3a5 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
@@ -2,13 +2,18 @@
// Licensed under the MIT License.
using System;
+using System.Collections.Concurrent;
+using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.EventHubs.Amqp;
+using Azure.Messaging.EventHubs.Authorization;
using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Encoding;
+using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transport;
using Moq;
using Moq.Protected;
@@ -86,7 +91,7 @@ public async Task ConstructorCreatesTheConnection()
mockScope
.Protected()
- .Setup>("CreateConnectionAsync",
+ .Setup>("CreateAndOpenConnectionAsync",
ItExpr.IsAny(),
ItExpr.Is(value => value == endpoint),
ItExpr.Is(value => value == transport),
@@ -159,6 +164,8 @@ public async Task OpenManagementLinkAsyncRequestsTheLink()
var identifier = "customIdentIFIER";
var cancellationSource = new CancellationTokenSource();
var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+ var mockLink = new RequestResponseAmqpLink("test", "test", mockSession, "test");
var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
{
@@ -167,7 +174,7 @@ public async Task OpenManagementLinkAsyncRequestsTheLink()
mockScope
.Protected()
- .Setup>("CreateConnectionAsync",
+ .Setup>("CreateAndOpenConnectionAsync",
ItExpr.IsAny(),
ItExpr.Is(value => value == endpoint),
ItExpr.Is(value => value == transport),
@@ -179,17 +186,1100 @@ public async Task OpenManagementLinkAsyncRequestsTheLink()
mockScope
.Protected()
- .Setup>("OpenManagementLinkAsync",
+ .Setup>("CreateManagementLinkAsync",
ItExpr.Is(value => value == mockConnection),
ItExpr.IsAny(),
ItExpr.Is(value => value == cancellationSource.Token))
- .Returns(Task.FromResult(default(RequestResponseAmqpLink)))
+ .Returns(Task.FromResult(mockLink))
.Verifiable();
- RequestResponseAmqpLink link = await mockScope.Object.OpenManagementLinkAsync(TimeSpan.FromDays(1), cancellationSource.Token);
- Assert.That(link, Is.Null, "The mock return was null");
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask)
+ .Verifiable();
+
+ var link = await mockScope.Object.OpenManagementLinkAsync(TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.EqualTo(mockLink), "The mock return was incorrect");
+
+ mockScope.VerifyAll();
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenManagementLinkAsyncManagesActiveLinks()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var activeLinks = GetActiveLinks(mockScope.Object);
+ Assert.That(activeLinks, Is.Not.Null, "The set of active links was null.");
+ Assert.That(activeLinks.Count, Is.Zero, "There should be no active links when none have been created.");
+
+ var link = await mockScope.Object.OpenManagementLinkAsync(TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+
+ Assert.That(activeLinks.Count, Is.EqualTo(1), "There should be an active link being tracked.");
+ Assert.That(activeLinks.ContainsKey(link), Is.True, "The management link should be tracked as active.");
+
+ activeLinks.TryGetValue(link, out var refreshTimer);
+ Assert.That(refreshTimer, Is.Null, "The link should have a null timer since it has no authorization refresh needs.");
+
+ link.SafeClose();
+ Assert.That(activeLinks.Count, Is.Zero, "Closing the link should stop tracking it as active.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ [TestCase(null)]
+ [TestCase("")]
+ public void OpenConsumerLinkAsyncValidatesTheConsumerGroup(string consumerGroup)
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ using var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ [TestCase(null)]
+ [TestCase("")]
+ public void OpenConsumerLinkAsyncValidatesThePartitionId(string partitionId)
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "$Default";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ using var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void OpenConsumerLinkAsyncValidatesTheEventPosition()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "$Default";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ using var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, null, options, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void OpenConsumerLinkAsyncValidatesTheConsumerOptions()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "$Default";
+ var partitionId = "0";
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ using var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, null, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void OpenConsumerLinkAsyncRespectsTokenCancellation()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ using var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+
+ var cancellationSource = new CancellationTokenSource();
+ cancellationSource.Cancel();
+
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public void OpenConsumerLinkAsyncRespectsDisposal()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+
+ var scope = new AmqpConnectionScope(endpoint, eventHub, credential, transport, null, identifier);
+ scope.Dispose();
+
+ Assert.That(() => scope.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), CancellationToken.None), Throws.InstanceOf());
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncRequestsTheLink()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+ var mockLink = new ReceivingAmqpLink(new AmqpLinkSettings());
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection))
+ .Verifiable();
+
+ mockScope
+ .Protected()
+ .Setup>("CreateReceivingLinkAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.Is(value => value == position),
+ ItExpr.Is(value => value == options),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == cancellationSource.Token))
+ .Returns(Task.FromResult(mockLink))
+ .Verifiable();
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask)
+ .Verifiable();
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.EqualTo(mockLink), "The mock return was incorrect");
+
+ mockScope.VerifyAll();
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncConfiguresTheLink()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+
+ var options = new EventHubConsumerOptions
+ {
+ Identifier = "testIdentifier123",
+ OwnerLevel = 459,
+ PrefetchCount = 697,
+ TrackLastEnqueuedEventInformation = true
+ };
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.IsAny(),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Listen),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1)));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+
+ var linkSource = (Source)link.Settings.Source;
+ Assert.That(linkSource.FilterSet.Any(item => item.Key.Key.ToString() == AmqpFilter.ConsumerFilterName), Is.True, "There should have been a consumer filter set.");
+ Assert.That(linkSource.Address.ToString(), Contains.Substring($"/{ partitionId }"), "The partition identifier should have been part of the link address.");
+ Assert.That(linkSource.Address.ToString(), Contains.Substring($"/{ consumerGroup }"), "The consumer group should have been part of the link address.");
+
+ Assert.That(link.Settings.TotalLinkCredit, Is.EqualTo((uint)options.PrefetchCount), "The prefetch count should have been used to set the credits.");
+ Assert.That(link.Settings.Properties.Any(item => item.Key.Key.ToString() == AmqpProperty.EntityType.ToString()), Is.True, "There should be an entity type specified.");
+ Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.ConsumerIdentifier, null), Is.EqualTo(options.Identifier), "The consumer identifier should have been used.");
+ Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.OwnerLevel, -1), Is.EqualTo(options.OwnerLevel.Value), "The owner level should have been used.");
+
+ Assert.That(link.Settings.DesiredCapabilities, Is.Not.Null, "There should have been a set of desired capabilities created.");
+ Assert.That(link.Settings.DesiredCapabilities.Contains(AmqpProperty.TrackLastEnqueuedEventInformation), Is.True, "Last event tracking should be requested.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ [TestCase(null)]
+ [TestCase("")]
+ public async Task OpenConsumerLinkAsyncRespectsTheIdentifierOption(string consumerIdentifier)
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+
+ var options = new EventHubConsumerOptions
+ {
+ Identifier = consumerIdentifier,
+ OwnerLevel = 459,
+ PrefetchCount = 697,
+ TrackLastEnqueuedEventInformation = true
+ };
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.IsAny(),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Listen),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1)));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+ Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.ConsumerIdentifier, "NONE"), Is.EqualTo("NONE"), "The consumer identifier should not have been set.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncRespectsTheOwnerLevelOption()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+
+ var options = new EventHubConsumerOptions
+ {
+ Identifier = "testIdentifier123",
+ OwnerLevel = null,
+ PrefetchCount = 697,
+ TrackLastEnqueuedEventInformation = true
+ };
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.IsAny(),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Listen),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1)));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+ Assert.That(link.GetSettingPropertyOrDefault(AmqpProperty.OwnerLevel, long.MinValue), Is.EqualTo(long.MinValue), "The owner level should have been used.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncRespectsTheTrackLastEventOption()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+
+ var options = new EventHubConsumerOptions
+ {
+ Identifier = "testIdentifier123",
+ OwnerLevel = 9987,
+ PrefetchCount = 697,
+ TrackLastEnqueuedEventInformation = false
+ };
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.IsAny(),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Listen),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1)));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+ Assert.That(link.Settings.DesiredCapabilities, Is.Null, "There should have not have been a set of desired capabilities created, as we're not tracking the last event.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncManagesActiveLinks()
+ {
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
+
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.Is(value => value == mockConnection),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.AbsoluteUri.StartsWith(endpoint.AbsoluteUri)),
+ ItExpr.IsAny(),
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value.SingleOrDefault() == EventHubsClaim.Listen),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(DateTime.UtcNow.AddDays(1)));
+
+ mockScope
+ .Protected()
+ .Setup("OpenAmqpObjectAsync",
+ ItExpr.IsAny(),
+ ItExpr.IsAny())
+ .Returns(Task.CompletedTask);
+
+ var activeLinks = GetActiveLinks(mockScope.Object);
+ Assert.That(activeLinks, Is.Not.Null, "The set of active links was null.");
+ Assert.That(activeLinks.Count, Is.Zero, "There should be no active links when none have been created.");
+
+ var link = await mockScope.Object.OpenConsumerLinkAsync(consumerGroup, partitionId, position, options, TimeSpan.FromDays(1), cancellationSource.Token);
+ Assert.That(link, Is.Not.Null, "The link produced was null");
+
+ Assert.That(activeLinks.Count, Is.EqualTo(1), "There should be an active link being tracked.");
+ Assert.That(activeLinks.ContainsKey(link), Is.True, "The consumer link should be tracked as active.");
+
+ activeLinks.TryGetValue(link, out var refreshTimer);
+ Assert.That(refreshTimer, Is.Not.Null, "The link should have a non-null timer.");
+
+ link.SafeClose();
+ Assert.That(activeLinks.Count, Is.Zero, "Closing the link should stop tracking it as active.");
+ }
+
+ ///
+ /// Verifies functionality of the
+ /// method.
+ ///
+ ///
+ [Test]
+ public async Task OpenConsumerLinkAsyncConfiguresAuthorizationRefresh()
+ {
+ var timerCallbackInvoked = false;
+ var endpoint = new Uri("amqp://test.service.gov");
+ var eventHub = "myHub";
+ var consumerGroup = "group";
+ var partitionId = "0";
+ var options = new EventHubConsumerOptions();
+ var position = EventPosition.Latest;
+ var credential = Mock.Of();
+ var transport = TransportType.AmqpTcp;
+ var identifier = "customIdentIFIER";
+ var cancellationSource = new CancellationTokenSource();
+ var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
+ var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of());
- mockScope.VerifyAll();
+ var mockScope = new Mock(endpoint, eventHub, credential, transport, null, identifier)
+ {
+ CallBase = true
+ };
+
+ mockScope
+ .Protected()
+ .Setup>("CreateAndOpenConnectionAsync",
+ ItExpr.IsAny(),
+ ItExpr.Is(value => value == endpoint),
+ ItExpr.Is(value => value == transport),
+ ItExpr.Is(value => value == null),
+ ItExpr.Is(value => value == identifier),
+ ItExpr.IsAny())
+ .Returns(Task.FromResult(mockConnection));
+
+ mockScope
+ .Protected()
+ .Setup>("RequestAuthorizationUsingCbsAsync",
+ ItExpr.IsAny