Skip to content

Commit

Permalink
[Event Hubs Client] Ttrack Two: Third Preview (AMQP Foundation)
Browse files Browse the repository at this point in the history
  Amqp
    - Implemented the foundation for managing AMQP connections and links
    - Implemented the foundation for identifying and transforming AMQP errors
    - Enabled creation of an AMQP connection using the settings to match client options
    - Enabled creation of a session and link for the management path
    - Enhanced the message converter to handle Event Hub property request/response
    - Enhanced the message converter to handle partition proeprty request/response

  Event Hub Client
    - Created an Amqp transport client, with basic infrastructure
    - Implemented the APIs for requesting metadata from the service

  General
    - Formatting pass over the project
    - Fix naming for a member of hte basic retry policy for consistency
  • Loading branch information
jsquire committed Sep 3, 2019
1 parent bf777bb commit 4b073b9
Show file tree
Hide file tree
Showing 31 changed files with 2,979 additions and 64 deletions.
395 changes: 395 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs

Large diffs are not rendered by default.

215 changes: 190 additions & 25 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using Azure.Messaging.EventHubs.Errors;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;

namespace Azure.Messaging.EventHubs
{
Expand All @@ -16,21 +21,6 @@ internal static class AmqpError
/// <summary>Indicates that a timeout occurred on the link.</summary>
public static readonly AmqpSymbol TimeoutError = AmqpConstants.Vendor + ":timeout";

/// <summary>Indicates that a message is no longer available.</summary>
public static readonly AmqpSymbol MessageLockLostError = AmqpConstants.Vendor + ":message-lock-lost";

/// <summary>Indicates that a session is no longer available.</summary>
public static readonly AmqpSymbol SessionLockLostError = AmqpConstants.Vendor + ":session-lock-lost";

/// <summary>Indicates that a store is no longer available.</summary>
public static readonly AmqpSymbol StoreLockLostError = AmqpConstants.Vendor + ":store-lock-lost";

/// <summary>Indicates that a session is no longer available.</summary>
public static readonly AmqpSymbol SessionCannotBeLockedError = AmqpConstants.Vendor + ":session-cannot-be-locked";

/// <summary>Indicates that a referenced subscription is no longer available.</summary>
public static readonly AmqpSymbol NoMatchingSubscriptionError = AmqpConstants.Vendor + ":no-matching-subscription";

/// <summary>Indicates that the server was busy and could not allow the requested operation.</summary>
public static readonly AmqpSymbol ServerBusyError = AmqpConstants.Vendor + ":server-busy";

Expand All @@ -40,19 +30,194 @@ internal static class AmqpError
/// <summary>Indicates that an argument provided to the Event Hubs service was incorrect.</summary>
public static readonly AmqpSymbol ArgumentOutOfRangeError = AmqpConstants.Vendor + ":argument-out-of-range";

/// <summary>Indicates that the consumer requesting an operation does not own the associated partition.</summary>
public static readonly AmqpSymbol PartitionNotOwnedError = AmqpConstants.Vendor + ":partition-not-owned";
/// <summary>The status text that appears when an AMQP error was due to a missing resource.</summary>
private const string NotFoundStatusText = "status-code: 404";

/// <summary>The expression to test for when the service returns a "Not Found" response to determine the context.</summary>
private static readonly Regex NotFoundExpression = new Regex("The messaging entity .* could not be found", RegexOptions.IgnoreCase | RegexOptions.Compiled | RegexOptions.CultureInvariant);

/// <summary>The set of mappings from AMQP error conditions to response status codes.</summary>
private static readonly IReadOnlyDictionary<AmqpResponseStatusCode, AmqpSymbol> StatusCodeMap = new Dictionary<AmqpResponseStatusCode, AmqpSymbol>()
{
{ AmqpResponseStatusCode.NotFound, AmqpErrorCode.NotFound },
{ AmqpResponseStatusCode.NotImplemented, AmqpErrorCode.NotImplemented},
{ AmqpResponseStatusCode.Unauthorized, AmqpErrorCode.UnauthorizedAccess },
{ AmqpResponseStatusCode.Forbidden, AmqpErrorCode.ResourceLimitExceeded },
{ AmqpResponseStatusCode.Gone, AmqpErrorCode.Stolen.Value },
{ AmqpResponseStatusCode.InternalServerError, AmqpErrorCode.InternalError },
{ AmqpResponseStatusCode.BadRequest, ArgumentError },
{ AmqpResponseStatusCode.RequestTimeout, TimeoutError },
{ AmqpResponseStatusCode.ServiceUnavailable, ServerBusyError }
};

/// <summary>
/// Creates the exception that corresponds to a given AMQP response message.
/// </summary>
///
/// <param name="response">The response to consider.</param>
/// <param name="eventHubsResource">The Event Hubs resource associated with the request.</param>
///
/// <returns>The exception that most accurately represents the response failure.</returns>
///
public static Exception CreateExceptionForResponse(AmqpMessage response,
string eventHubsResource)
{
if (response == null)
{
return new EventHubsException(true, eventHubsResource, Resources.UnknownCommunicationException);
}

if (!response.ApplicationProperties.Map.TryGetValue<string>(AmqpResponse.StatusDescription, out var description))
{
description = Resources.UnknownCommunicationException;
}

return CreateException(DetermineErrorCondition(response).Value, description, eventHubsResource);
}

/// <summary>
/// Creates the exception that corresponds to a given AMQP error.
/// </summary>
///
/// <param name="error">The AMQP error to consider.</param>
/// <param name="eventHubsResource">The Event Hubs resource associated with the operation.</param>
///
/// <returns>The exception that most accurately represents the error that was encountered.</returns>
///
public static Exception CreateExceptionForError(Error error,
string eventHubsResource)
{
if (error == null)
{
return new EventHubsException(true, eventHubsResource, Resources.UnknownCommunicationException);
}

return CreateException(error.Condition.Value, error.Description, eventHubsResource);
}

/// <summary>
/// Creates the exception that corresponds to a given AMQP failure scenario.
/// </summary>
///
/// <param name="condition">The error condition that represents the failure scenario.</param>
/// <param name="description">The descriptive text to use for messaging the scenario.</param>
/// <param name="eventHubsResource">The Event Hubs resource associated with the failure.</param>
///
/// <returns>The exception that most accurately represents the failure scenario.</returns>
///
private static Exception CreateException(string condition,
string description,
string eventHubsResource)
{
// The request timed out.

if (String.Equals(condition, TimeoutError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new EventHubsTimeoutException(eventHubsResource, description);
}

// The Event Hubs service was busy.

if (String.Equals(condition, ServerBusyError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusyException(eventHubsResource, description);
}

// An argument was rejected by the Event Hubs service.

if (String.Equals(condition, ArgumentError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentException(description);
}

if (String.Equals(condition, ArgumentOutOfRangeError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentOutOfRangeException(description);
}

// The consumer was superseded by one with a higher owner level.

if (String.Equals(condition, AmqpErrorCode.Stolen.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ConsumerDisconnectedException(eventHubsResource, description);
}

// Authorization was denied.

if (String.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new UnauthorizedAccessException(description);
}

// Requests are being throttled due to exceeding the service quota.

if (String.Equals(condition, AmqpErrorCode.ResourceLimitExceeded.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new QuotaExceededException(eventHubsResource, description);
}

// The service does not understand how to process the request.

if (String.Equals(condition, AmqpErrorCode.NotAllowed.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new InvalidOperationException(description);
}

if (String.Equals(condition, AmqpErrorCode.NotImplemented.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new NotSupportedException(description);
}

// The Event Hubs resource was not valid or communication with the service was interrupted.

if (String.Equals(condition, AmqpErrorCode.NotFound.Value, StringComparison.InvariantCultureIgnoreCase))
{
if (NotFoundExpression.IsMatch(description)
|| (description.IndexOf(NotFoundStatusText, StringComparison.InvariantCultureIgnoreCase) >= 0))
{
return new EventHubsResourceNotFoundException(eventHubsResource, description);
}


return new EventHubsCommunicationException(eventHubsResource, description);
}

// There was no specific exception that could be determined; fall back to a generic one.

return new EventHubsException(true, eventHubsResource, description);
}

/// <summary>
/// Determines the applicable error condition for a given response message.
/// </summary>
///
/// <param name="response">The AMQP response message to consider.</param>
///
/// <returns>The AMQP error condition that best represents the response.</returns>
///
private static AmqpSymbol DetermineErrorCondition(AmqpMessage response)
{
AmqpSymbol condition;

// If there was an error condition present, use that.

if (response.ApplicationProperties.Map.TryGetValue(AmqpResponse.ErrorCondition, out condition))
{
return condition;
}

/// <summary>Indicates that the requested Event Hubs resource is disabled.</summary>
public static readonly AmqpSymbol ResourceDisabledError = AmqpConstants.Vendor + ":entity-disabled";
// If no error condition was present, perform a reverse lookup in the mappings to determine the
// condition from the response status code.

/// <summary>Indicates that the producer requesting an operation is not allowed to publish events to the requested resource.</summary>
public static readonly AmqpSymbol PublisherRevokedError = AmqpConstants.Vendor + ":publisher-revoked";
if ((response.ApplicationProperties.Map.TryGetValue<int>(AmqpResponse.StatusCode, out var statusCode))
&& (StatusCodeMap.TryGetValue((AmqpResponseStatusCode)statusCode, out condition)))
{
return condition;
}

/// <summary>Indicates that an operation was canceled by the Event Hubs service.</summary>
public static readonly AmqpSymbol OperationCancelledError = AmqpConstants.Vendor + ":operation-cancelled";
// If no specific value could be determined, fall back to a generic condition.

/// <summary>Indicates that the requested resource cannot be created because it already exists.</summary>
public static readonly AmqpSymbol ResourceAlreadyExistsError = AmqpConstants.Vendor + ":entity-already-exists";
return AmqpErrorCode.InternalError;
}
}
}
Loading

0 comments on commit 4b073b9

Please sign in to comment.